Skip to content

Base

base

AgentLifecycleProtocol

Bases: Protocol

Protocol for agent lifecycle management.

initialize async

initialize() -> 'Agent'

Initialize the agent.

Source code in reactive_agents/app/agents/base.py
async def initialize(self) -> "Agent":
    """Initialize the agent."""
    ...

close async

close() -> None

Close the agent and clean up resources.

Source code in reactive_agents/app/agents/base.py
async def close(self) -> None:
    """Close the agent and clean up resources."""
    ...

run async

run(initial_task: str, **kwargs) -> Dict[str, Any]

Run the agent with a task.

Source code in reactive_agents/app/agents/base.py
async def run(self, initial_task: str, **kwargs) -> Dict[str, Any]:
    """Run the agent with a task."""
    ...

chat async

chat(initial_task: str, **kwargs) -> Dict[str, Any]

Run the agent with a task.

Source code in reactive_agents/app/agents/base.py
async def chat(self, initial_task: str, **kwargs) -> Dict[str, Any]:
    """Run the agent with a task."""
    ...

AgentControlProtocol

Bases: Protocol

Protocol for agent control operations.

pause async

pause() -> None

Pause the agent execution.

Source code in reactive_agents/app/agents/base.py
async def pause(self) -> None:
    """Pause the agent execution."""
    ...

resume async

resume() -> None

Resume the agent execution.

Source code in reactive_agents/app/agents/base.py
async def resume(self) -> None:
    """Resume the agent execution."""
    ...

stop async

stop() -> None

Stop the agent execution.

Source code in reactive_agents/app/agents/base.py
async def stop(self) -> None:
    """Stop the agent execution."""
    ...

terminate async

terminate() -> None

Terminate the agent execution.

Source code in reactive_agents/app/agents/base.py
async def terminate(self) -> None:
    """Terminate the agent execution."""
    ...

Agent

Agent(context: AgentContext)

Bases: ABC, AgentLifecycleProtocol, AgentControlProtocol

Enhanced base class for AI agents with comprehensive lifecycle and control management.

This base class provides: - Unified context management - Lifecycle protocols (initialize, run, close) - Control protocols (pause, resume, stop, terminate) - Event system integration - Tool management - Metrics and logging - Extensible architecture for future enhancements

Initializes the Agent with a pre-configured AgentContext.

Parameters:

Name Type Description Default
context AgentContext

The AgentContext instance holding configuration, state, and managers.

required
Source code in reactive_agents/app/agents/base.py
def __init__(self, context: AgentContext):
    """
    Initializes the Agent with a pre-configured AgentContext.

    Args:
        context: The AgentContext instance holding configuration, state, and managers.
    """
    self.context = context
    self._closed = False
    self._initialized = False

    # Set agent reference in context for engine to use
    self.context._agent = self  # type: ignore

    # Ensure logger is initialized before using it
    if not self.context.agent_logger:
        self.context._initialize_loggers()
    assert self.context.agent_logger is not None

    self.agent_logger.info(
        f"Base Agent '{self.context.agent_name}' initialized with context."
    )

agent_logger property

agent_logger: Logger

Get the agent logger.

tool_logger property

tool_logger: Logger

Get the tool logger.

result_logger property

result_logger: Logger

Get the result logger.

model_provider property

model_provider: BaseModelProvider

Get the model provider.

is_initialized property

is_initialized: bool

Check if the agent is initialized.

is_closed property

is_closed: bool

Check if the agent is closed.

initialize async

initialize() -> 'Agent'

Initialize the agent and its components.

This method should be called before using the agent. Subclasses can override this to add custom initialization logic.

Source code in reactive_agents/app/agents/base.py
async def initialize(self) -> "Agent":
    """
    Initialize the agent and its components.

    This method should be called before using the agent.
    Subclasses can override this to add custom initialization logic.
    """
    if self._initialized:
        self.agent_logger.warning("Agent already initialized")
        return self

    try:
        self.agent_logger.info(f"Initializing {self.context.agent_name}...")

        # Initialize MCP if configured and not already initialized by builder
        if (
            not self.context.mcp_client
            and self.config
            and hasattr(self.config, "mcp_config")
            and (
                self.config.mcp_config
                or getattr(self.config, "mcp_server_filter", None)
            )
        ):
            from reactive_agents.config.mcp_config import MCPConfig
            from reactive_agents.providers.external.client import MCPClient

            self.context.mcp_config = (
                MCPConfig.model_validate(self.config.mcp_config, strict=False)
                if self.config.mcp_config
                else None
            )
            self.context.mcp_client = await MCPClient(
                server_config=self.context.mcp_config,
                server_filter=getattr(self.config, "mcp_server_filter", None),
            ).initialize()

        # Initialize tool manager
        if self.context.tool_manager:
            await self.context.tool_manager.initialize()

        self._initialized = True
        self.agent_logger.info(
            f"{self.context.agent_name} initialized successfully"
        )
        return self

    except Exception as e:
        self.agent_logger.error(
            f"Error initializing {self.context.agent_name}: {e}"
        )
        raise e

close async

close() -> None

Close the agent and clean up resources.

This method should be called when done with the agent.

Source code in reactive_agents/app/agents/base.py
async def close(self) -> None:
    """
    Close the agent and clean up resources.

    This method should be called when done with the agent.
    """
    if self._closed:
        return

    try:
        self.agent_logger.info(f"Closing {self.context.agent_name}...")

        # Close context
        await self.context.close()

        # Close MCP client if exists
        if self.context.mcp_client is not None:
            self.agent_logger.info(
                f"Closing MCPClient for {self.context.agent_name}..."
            )
            await self.context.mcp_client.close()
            self.agent_logger.info(
                f"{self.context.agent_name} MCPClient closed successfully."
            )

        self._closed = True
        self.agent_logger.info(f"{self.context.agent_name} closed successfully.")

    except Exception as e:
        self.agent_logger.error(f"Error closing {self.context.agent_name}: {e}")
        raise e

pause async

pause() -> None

Pause the agent execution. Override in subclasses for specific behavior.

Source code in reactive_agents/app/agents/base.py
async def pause(self) -> None:
    """Pause the agent execution. Override in subclasses for specific behavior."""
    self.agent_logger.info(f"{self.context.agent_name} pause requested")
    if hasattr(self, "execution_engine") and self.execution_engine:
        await self.execution_engine.pause()

resume async

resume() -> None

Resume the agent execution. Override in subclasses for specific behavior.

Source code in reactive_agents/app/agents/base.py
async def resume(self) -> None:
    """Resume the agent execution. Override in subclasses for specific behavior."""
    self.agent_logger.info(f"{self.context.agent_name} resume requested")
    if hasattr(self, "execution_engine") and self.execution_engine:
        await self.execution_engine.resume()

stop async

stop() -> None

Stop the agent execution. Override in subclasses for specific behavior.

Source code in reactive_agents/app/agents/base.py
async def stop(self) -> None:
    """Stop the agent execution. Override in subclasses for specific behavior."""
    self.agent_logger.info(f"{self.context.agent_name} stop requested")
    if hasattr(self, "execution_engine") and self.execution_engine:
        await self.execution_engine.stop()

terminate async

terminate() -> None

Terminate the agent execution. Override in subclasses for specific behavior.

Source code in reactive_agents/app/agents/base.py
async def terminate(self) -> None:
    """Terminate the agent execution. Override in subclasses for specific behavior."""
    self.agent_logger.info(f"{self.context.agent_name} terminate requested")
    if hasattr(self, "execution_engine") and self.execution_engine:
        await self.execution_engine.terminate()

run async

run(initial_task: str, **kwargs) -> Dict[str, Any]

Run the agent with a task.

This is the main entry point for agent execution. Subclasses should implement _execute_task for specific behavior.

Parameters:

Name Type Description Default
initial_task str

The task to execute

required
**kwargs

Additional parameters for execution

{}

Returns:

Type Description
Dict[str, Any]

Execution results with comprehensive metrics

Source code in reactive_agents/app/agents/base.py
async def run(self, initial_task: str, **kwargs) -> Dict[str, Any]:
    """
    Run the agent with a task.

    This is the main entry point for agent execution.
    Subclasses should implement _execute_task for specific behavior.

    Args:
        initial_task: The task to execute
        **kwargs: Additional parameters for execution

    Returns:
        Execution results with comprehensive metrics
    """
    if not self._initialized:
        await self.initialize()

    if self._closed:
        raise RuntimeError("Cannot run a closed agent")

    # Initialize session tracking
    if self.context.session:
        self.context.session.initial_task = initial_task
        self.context.session.current_task = initial_task
        self.context.session.start_time = time.time()

    try:
        self.agent_logger.info(
            f"🚀 {self.context.agent_name} starting task: {initial_task[:100]}..."
        )
        result = await self._execute_task(initial_task, **kwargs)
        self.agent_logger.info(f"✅ {self.context.agent_name} completed task")

        # Finalize metrics and prepare comprehensive result
        return self._prepare_final_result_with_metrics(
            result, initial_task, success=True
        )

    except Exception as e:
        self.agent_logger.error(
            f"❌ {self.context.agent_name} execution failed: {e}"
        )

        # Prepare error result with metrics
        error_result = {
            "status": "error",
            "error": str(e),
            "final_answer": None,
            "completion_score": 0.0,
            "iterations": 0,
        }
        return self._prepare_final_result_with_metrics(
            error_result, initial_task, success=False
        )

extract_json_from_string

extract_json_from_string(s: str)

Try to extract and parse the first valid JSON object or array from a string. Returns the parsed object (dict/list) or {} if not found/invalid.

Source code in reactive_agents/app/agents/base.py
def extract_json_from_string(s: str):
    """
    Try to extract and parse the first valid JSON object or array from a string.
    Returns the parsed object (dict/list) or {} if not found/invalid.
    """
    s = s.strip()

    # Try parsing the whole string first
    try:
        return json.loads(s)
    except Exception:
        pass

    # Try to find the first { ... } or [ ... ] block (greedy)
    import re

    match = re.search(r"({.*})|(\[.*\])", s, re.DOTALL)
    if match:
        json_str = match.group(0)
        try:
            return json.loads(json_str)
        except Exception:
            return {}
    return {}