Source code for mbodied.agents.sense.audio.audio_agent

# Copyright 2024 mbodi ai
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import os
import platform
import threading
import wave

try:
    import playsound
    import pyaudio
except ImportError:
    logging.warning("playsound or pyaudio is not installed. Please run `pip install pyaudio playsound` to install.")

from openai import OpenAI
from typing_extensions import Literal

from mbodied.agents import Agent


[docs] class AudioAgent(Agent): """Handles audio recording, playback, and speech-to-text transcription. This module uses OpenAI's API to transcribe audio input and synthesize speech. Set Environment Variable NO_AUDIO=1 to disable audio recording and playback. It will then take input from the terminal. Usage: audio_agent = AudioAgent(api_key="your-openai-api-key", use_pyaudio=False) audio_agent.speak("How can I help you?") message = audio_agent.listen() """ mode = Literal["speak", "type", "speak_or_type"] def __init__( self, listen_filename: str = "tmp_listen.wav", tmp_speak_filename: str = "tmp_speak.mp3", use_pyaudio: bool = True, client: OpenAI = None, api_key: str = None, ): """Initializes the AudioAgent with specified parameters. Args: listen_filename: The filename for storing recorded audio. tmp_speak_filename: The filename for storing synthesized speech. use_pyaudio: Whether to use PyAudio for playback. Prefer setting to False for Mac. client: An optional OpenAI client instance. api_key: The API key for OpenAI. """ self.recording = False self.record_lock = threading.Lock() self.listen_filename = listen_filename self.speak_filename = tmp_speak_filename self.use_pyaudio = use_pyaudio if os.getenv("NO_AUDIO"): return if api_key: self.client = OpenAI(api_key=api_key) else: self.client = client if self.client is None: self.client = OpenAI(api_key=api_key or os.getenv("OPENAI_API_KEY")) logging.info("OpenAI API key fetched from the environment key.")
[docs] def act(self, *args, **kwargs): return self.listen(*args, **kwargs)
[docs] def listen(self, keep_audio: bool = False, mode: str = "speak") -> str: """Listens for audio input and transcribes it using OpenAI's API. Args: keep_audio: Whether to keep the recorded audio file. mode: The mode of input (speak, type, speak_or_type). Returns: The transcribed text from the audio input. """ logging.debug(f"Listening with mode: {mode}") if os.getenv("NO_AUDIO") or mode in ["type", "speak_or_type"]: user_input = input("Please type your input [Type 'exit' to exit]: ") + "\n##\n" if os.getenv("NO_AUDIO") or mode == "type": return user_input else: user_input = "" typed_input = user_input thread = threading.Thread(target=self.record_audio) user_input = input("Press ENTER to speak [Type 'exit' to exit]") if user_input.lower() == "exit": exit() with self.record_lock: self.recording = True thread.start() input("Press ENTER to stop recording") with self.record_lock: self.recording = False thread.join() transcription = None try: with open(self.listen_filename, "rb") as audio_file: transcription = self.client.audio.transcriptions.create(model="whisper-1", file=audio_file) return typed_input + transcription.text except Exception as e: logging.error(f"Failed to read or transcribe audio file: {e}") return "" finally: if not keep_audio and os.path.exists(self.listen_filename): os.remove(self.listen_filename) return typed_input + transcription.text if transcription else ""
[docs] def record_audio(self) -> None: """Records audio from the microphone and saves it to a file.""" chunk = 1024 sample_format = pyaudio.paInt16 channels = 1 fs = 44100 p = pyaudio.PyAudio() stream = p.open(format=sample_format, channels=channels, rate=fs, frames_per_buffer=chunk, input=True) frames = [] try: while self.recording: data = stream.read(chunk) frames.append(data) finally: stream.stop_stream() stream.close() p.terminate() try: with wave.open(self.listen_filename, "wb") as wf: wf.setnchannels(channels) wf.setsampwidth(p.get_sample_size(sample_format)) wf.setframerate(fs) wf.writeframes(b"".join(frames)) except Exception as e: logging.error(f"Failed to save audio: {e}")
[docs] def speak(self, message: str, voice: str = "onyx", api_key: str = None) -> None: """Synthesizes speech from text using OpenAI's API and plays it back. Args: message: The text message to synthesize. voice: The voice model to use for synthesis. api_key: The API key for OpenAI. """ if os.environ.get("NO_AUDIO"): return try: client = self.client or OpenAI(api_key=api_key or os.environ.get("OPENAI_API_KEY")) with ( client.with_streaming_response.audio.speech.create( model="tts-1", voice=voice, input=message, ) as response, open(self.speak_filename, "wb") as out_file, ): for chunk in response.iter_bytes(): out_file.write(chunk) except Exception as e: logging.error(f"Failed to create or save speech: {e}") return self.playback_thread = threading.Thread(target=self.play_audio, args=(self.speak_filename,)) self.playback_thread.start()
[docs] def play_audio(self, filename: str) -> None: """Plays an audio file. Args: filename: The filename of the audio file to play. """ try: if platform.system() == "Darwin" and not self.use_pyaudio: # Only works on mac. os.system("afplay " + filename) else: playsound.playsound(filename) except Exception as e: logging.error(f"Error playing audio file {filename}: {e}") finally: if os.path.exists(filename): os.remove(filename)