# Copyright 2024 mbodi ai
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

"""Run a LanguageAgent with memory, optional remote acting, and optional automatic dataset creation capabilities.

While it is always recommended to explicitly define your observation and action spaces,
which can be set with a gym.Space object or any python object using the Sample class
(see examples/ for a tutorial), you can have the recorder infer the spaces
by setting recorder="default" for automatic dataset recording.

For example:
    >>> agent = LanguageAgent(context=SYSTEM_PROMPT, model_src=backend, recorder="default")
    >>> agent.act_and_record("pick up the fork", image)

Alternatively, you can define the recorder separately to record the space you want.
For example, to record the dataset with the image and instruction observation and AnswerAndActionsList as action:
    >>> observation_space = spaces.Dict({"image": Image(size=(224, 224)).space(), "instruction": spaces.Text(1000)})
    >>> action_space = AnswerAndActionsList(actions=[HandControl()] * 6).space()
    >>> recorder = Recorder(
    ...     'example_recorder',
    ...     out_dir='saved_datasets',
    ...     observation_space=observation_space,
    ...     action_space=action_space

To record:
    >>> recorder.record(
    ...     observation={
    ...         "image": image,
    ...         "instruction": instruction,
    ...     },
    ...     action=answer_actions,
    ... )

import asyncio
import logging
import os
from dataclasses import dataclass
from typing import List, Literal, TypeAlias

from art import text2art
from pydantic import AnyUrl, DirectoryPath, FilePath, NewPath

from mbodied.agents import Agent
from mbodied.agents.backends import OpenAIBackend
from mbodied.types.message import Message
from mbodied.types.sample import Sample
from import Image

SupportsOpenAI: TypeAlias = OpenAIBackend

[docs] @dataclass class Reminder: """A reminder to show the agent a prompt every n messages.""" prompt: str | Image | Message n: int
[docs] def make_context_list(context: list[str | Image | Message] | Image | str | Message | None) -> List[Message]: """Convert the context to a list of messages.""" if isinstance(context, list): return [Message(content=c) if not isinstance(c, Message) else c for c in context] if isinstance(context, Message): return [context] if isinstance(context, str | Image): return [Message(role="user", content=[context]), Message(role="assistant", content="Understood.")] return []
[docs] class LanguageAgent(Agent): """An agent that can interact with users using natural language. This class extends the functionality of a base Agent to handle natural language interactions. It manages memory, dataset-recording, and asynchronous remote inference, supporting multiple platforms including OpenAI, Anthropic, and Gradio. Attributes: reminders (List[Reminder]): A list of reminders that prompt the agent every n messages. context (List[Message]): The current context of the conversation. Inherits all attributes from the parent class `Agent`. Examples: Basic usage with OpenAI: >>> cognitive_agent = LanguageAgent(api_key="...", model_src="openai", recorder="default") >>> cognitive_agent.act("your instruction", image) Automatically act and record to dataset: >>> cognitive_agent.act_and_record("your instruction", image) """ _art_printed = False def __init__( # noqa self, model_src: Literal["openai", "anthropic"] | SupportsOpenAI | AnyUrl | FilePath | DirectoryPath | NewPath = "openai", context: list | Image | str | Message = None, api_key: str | None = os.getenv("OPENAI_API_KEY"), model_kwargs: dict = None, # noqa recorder: Literal["default", "omit"] | str = "omit", recorder_kwargs: dict = None, # noqa ) -> None: """LanguageAgent with memory, dataset-recording, and remote infererence support. Always returns a string. Supported datasets: HDF5, Datasets, JSON, CSV, Parquet. Supported inference backends: OpenAI, Anthropic, Gradio, Ollama, and OpenAI-compatible http APIs. Methods: - act(instruction: str, image: Image = None, context: list | str | Image | Message = None, model=None, **kwargs) -> str - forget_last() -> Message - forget(everything=False, last_n: int = -1) -> None - remind_every(prompt: str | Image | Message, n: int) -> None Args: context: The starting context to use for the conversation. Can be a list of messages, an image, a string, or a message. If a string is provided, it will be interpreted as a user message. api_key: The API key to use for the remote actor (if applicable). model_src: Any of: 1. A local path to a model's weights in which case model_kwargs will be used to load the model. 2. A supported backend key (openai, anthropic, ollama, http, gradio). The url must then be provided in model_kwargs. 3. Any huggingface spaces path, URL hosting a gradio server, or custom HTTP API URL. **Note**: If a url endpoint is provided, Gradio will be used first, then Httpx if Gradio fails. model_kwargs: Additional keyword arguments to pass to the model source. See mbodied.agents.backends.backend. Backend for more details. recorder: The recorder config or name to use for the agent to record observations and actions. recorder_kwargs: Additional keyword arguments to pass to the recorder such as push_to_cloud. """ if not LanguageAgent._art_printed: print("Welcome to") # noqa: T201 print(text2art("mbodi")) # noqa: T201 print("A platform for intelligent embodied agents.\n\n") # noqa: T201 LanguageAgent._art_printed = True self.reminders: List[Reminder] = [] print(f"Initializing language agent for robot using : {model_src}") # noqa: T201 super().__init__( recorder=recorder, recorder_kwargs=recorder_kwargs, model_src=model_src, model_kwargs=model_kwargs, api_key=api_key, ) self.context = make_context_list(context)
[docs] def forget_last(self) -> Message: """Forget the last message in the context.""" try: return self.context.pop(-1) except IndexError: logging.warning("No message to forget in the context")
[docs] def forget(self, everything=False, last_n: int = -1) -> None: """Forget the last n messages in the context.""" if everything: self.context = [] return for _ in range(last_n): self.forget_last()
[docs] def history(self) -> List[Message]: """Return the conversation history.""" return self.context
[docs] def remind_every(self, prompt: str | Image | Message, n: int) -> None: """Remind the agent of the prompt every n messages.""" message = Message([prompt]) if not isinstance(prompt, Message) else prompt self.reminders.append(Reminder(message, n))
def _check_for_reminders(self) -> None: """Check if there are any reminders to show.""" for reminder, n in self.reminders: if len(self.context) % n == 0: self.context.append(reminder)
[docs] def act_and_parse( self, instruction: str, image: Image = None, parse_target: Sample = Sample, context: list | str | Image | Message = None, model=None, **kwargs, ) -> Sample: """Responds to the given instruction, image, and context and parses the response into a Sample object.""" response = self.act(instruction, image, context, model, **kwargs) response = response.replace("```json", "").replace("```", "").replace("\n", "").strip() try: response = parse_target.model_validate_json(response) except Exception as e: error = f"Error parsing response: {e}" logging.error(error)"Recieved response: {response}. Retrying with error message.") instruction = instruction + "avoid the following error: " + error response = self.act(instruction, image, context, model, **kwargs) response = response.replace("```json", "").replace("```", "").replace("\n", "").strip() response = parse_target.model_validate_json(response) return response
[docs] async def async_act_and_parse( self, instruction: str, image: Image = None, parse_target: Sample = Sample, context: list | str | Image | Message = None, model=None, **kwargs, ) -> Sample: """Responds to the given instruction, image, and context asynchronously and parses the response into a Sample object.""" return await asyncio.to_thread( self.act_and_parse, instruction, image, parse_target, context, model=model, **kwargs, )
[docs] def act( self, instruction: str, image: Image = None, context: list | str | Image | Message = None, model=None, **kwargs, ) -> str: """Responds to the given instruction, image, and context. Uses the given instruction and image to perform an action. Args: instruction: The instruction to be processed. image: The image to be processed. context: Additonal context to include in the response. If context is a list of messages, it will be interpreted as new memory. model: The model to use for the response. **kwargs: Additional keyword arguments. Returns: str: The response to the instruction. Example: >>> agent.act("Hello, world!", Image("scene.jpeg")) "Hello! What can I do for you today?" >>> agent.act("Return a plan to pickup the object as a python list.", Image("scene.jpeg")) "['Move left arm to the object', 'Move right arm to the object']" """ self._check_for_reminders() memory = self.context if context and all(isinstance(c, Message) for c in context): memory += context context = [] # Prepare the inputs inputs = [instruction] if image is not None: inputs.append(image) if context: inputs.extend(context if isinstance(context, list) else [context]) message = Message(role="user", content=inputs) model = model or kwargs.pop("model", None) response =, memory, model=model, **kwargs) self.context.append(message) self.context.append(Message(role="assistant", content=response)) return response