• 回复
  • 收藏
  • 点赞
  • 分享
  • 发新帖

【 DigiKey DIY原创大赛】在嵌入式终端上基于本地大模型实现的离线语音聊天机器人(2)

https://www.dianyuan.com/bbs/2778146.html

------无奈的起始分割线(受论坛帖子字数限制,前文见上一帖子)-----

主要线程代码

模型推理线程

def llama_thread():
    global bool_Chinese_tts
    model = llama_cpp.Llama(
        model_path=f"{current_dir}/models/llama/qwen1_5-0_5b-chat-q4_0.gguf",
        # n_ctx = 4096,
        verbose=False,
    )
    ch_punctuations_re = "[,。?;]"
    llama_load_done.set()
    messages_history = []
    max_msg_history = 2
    print("Load llama model done")

    while True:
        trig_llama_event.wait()
        trig_llama_event.clear()
        ask_text = ask_text_q.get()
        print(ask_text)
        messages_history.append({"role": "user", "content": ask_text})
        if len(messages_history) > max_msg_history:
            messages_history = messages_history[-max_msg_history:]
        ans_text = model.create_chat_completion(
            messages=messages_history,
            logprobs=False,
            # stream=True,
            repeat_penalty = 1.2,
            max_tokens=100,
        )
        ans_text = ans_text["choices"][0]["message"]["content"]
        messages_history.append({"role": "assistant", "content": ans_text})
        if len(messages_history) > max_msg_history:
            messages_history = messages_history[-max_msg_history:]
        print(ans_text)
        ans_text_tts = ans_text.replace(",", "。")
        bool_Chinese_tts = bool(re.search(r"[\u4e00-\u9fff]", ans_text_tts))  # Chinese?
        ans_text_q.put(ans_text_tts)
        model_doing_event.clear()

模型推理部分,做了一些特殊处理:

-模型传入的输入信息,是包含前一次模型的输入及输出的,以便让模型每次推理具有一定的上下文信息。但增加上下文长度,会牺牲模型的推理速度,所以应该根据实际算力情况,合理规划上下文长度。

-模型做了一些参数设置,例如限制了最大输出Tokens数,以及重复性惩罚repeat_penalty等,避免模型一次输出太多信息,甚至重复输出一些无效信息。

-模式输出文本做了处理,将中文逗号全部替换成中文句号,这主要是简单解决Piper TTS对中文逗号几乎没有语音停顿的局限性。

-判断模型输出的语言类型,以便让Piper TTS对应加载不同的语言模型。

注:

-如果需要更换模型,只需要简单修改代码中model_path指定到对应的gguf文件即可

-由于增加了1次对话上下文信息,所以一定程度牺牲了模型推理速度

-考虑后续TTS输出的连贯性,模型未采用流式输出,所以感官上从模型输入到完整输出,推理会有较长的等待时间

文本转语音线程

def tts_thread():
    global bool_Chinese_tts
    piper_cmd_zh = f"{current_dir}/piper/piper --model {current_dir}/models/piper/zh_CN-huayan-medium.onnx --output-raw | aplay -r 22050 -f S16_LE -t raw -"
    piper_cmd_en = f"{current_dir}/piper/piper --model {current_dir}/models/piper/en_GB-jenny_dioco-medium.onnx --output-raw | aplay -r 22050 -f S16_LE -t raw -"
    while True:
        tts_text = ans_text_q.get()
        if bool_Chinese_tts:
            command = f'echo "{tts_text}" | {piper_cmd_zh}'
        else:
            command = f"echo {shlex.quote(tts_text)} | {piper_cmd_en}"
        process = subprocess.Popen(
            command, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
        )
        while True:
            if stop_tts_event.is_set():
                terminate_process(process.pid)
                break
            if process.poll() is not None:
                break
            time.sleep(0.01)
        process.wait()

Piper TTS通过命令行进行调用,所以对于英文的文本要使用shlex.quote()函数,对特殊字符进行预处理,避免文本里的字符影响命令行的结构。同时,在播放过程中,如果录音KEY被按下,则TTS会主动中断退出。

显示线程

def oled_thread(oled_device, dir):
    with Image.open(f"{current_dir}/img/{dir}_logo.bmp") as img:
        img_resized = img.convert("1").resize((128, 64))
        img_resized = ImageOps.invert(img_resized)
        oled_device.display(img_resized)
        llama_load_done.wait()
        senvc_load_done.wait()
    frames_eye = []
    durations_eye = []
    with Image.open(f"{current_dir}/img/{dir}_eye.gif") as img:
        for frame in ImageSequence.Iterator(img):
            frames_eye.append(frame.convert("1").resize((128, 64)))
            durations_eye.append(frame.info.get("duration", 100) / 1000.0)
    frames_rcd = []
    durations_rcd = []
    with Image.open(f"{current_dir}/img/record.gif") as img:
        for frame in ImageSequence.Iterator(img):
            frames_rcd.append(frame.convert("1").resize((128, 64)))
            durations_rcd.append(frame.info.get("duration", 100) / 1000.0)

    while True:
        if show_record_event.is_set():
            for frame, duration in zip(frames_rcd, durations_rcd):
                oled_device.display(frame)
                time.sleep(duration)
                if not show_record_event.is_set():
                    break
        else:
            for frame, duration in zip(frames_eye, durations_eye):
                if model_doing_event.is_set() and duration > 1:
                    continue
                if duration > 1:
                    duration = duration * 2
                oled_device.display(frame)
                show_record_event.wait(timeout=duration)
                if show_record_event.is_set():
                    break
                else:
                    if dir == "left":
                        oled_events["left"].set()
                        oled_events["right"].wait()
                        oled_events["right"].clear()
                    else:
                        oled_events["right"].set()
                        oled_events["left"].wait()
                        oled_events["left"].clear()

显示线程主要用于协同显示当前的程序运行状态,例如录音时显示音频波形、模型推理时显示闭眼思考状态,推理完成后睁开眼睛并眨眼等。

效果演示

图片展示:

视频演示:

B站链接:https://www.bilibili.com/video/BV1dSDzY6EHw/?spm_id_from=333.999.0.0&vd_source=b3875874dae83ca6abad9d14e2e3d762

代码开源地址:

Gitee源代码:https://gitee.com/ETRD/pi-ai-robot

附加模型文件:https://pan.huang1111.cn/s/MNgwOhx

小结

本文介绍了在嵌入式终端上,基于本地大模型实现的多语言离线语音聊天机器人。本项目在树莓派5上具体实现,代码完全开源,理论上可以运行于任何具有相当算力和资源的嵌入式终端。

全部回复(0)
正序查看
倒序查看
现在还没有回复呢,说说你的想法