Skip to content

Reactive Agent

reactive_agent

ReactiveAgent

ReactiveAgent(config: ReactiveAgentConfig, context: Optional[AgentContext] = None, event_bus: Optional[EventBus] = None)

Bases: Agent

Clean, reactive agent using the unified execution engine.

Provides a simple, efficient agent implementation that: - Uses the ExecutionEngine for task execution - Supports strategy configuration (static or adaptive) - Handles events and lifecycle management - Focuses on core functionality without complexity

Source code in reactive_agents/app/agents/reactive_agent.py
def __init__(
    self,
    config: ReactiveAgentConfig,
    context: Optional[AgentContext] = None,
    event_bus: Optional[EventBus] = None,
):
    self.config = config
    # Create context if not provided (for builder pattern compatibility)
    if context is None:
        # Create AgentConfig from ReactiveAgentConfig fields
        config_data = config.model_dump(
            exclude={
                "mcp_client",
                "mcp_config",
                "mcp_server_filter",
                "confirmation_callback",
                "confirmation_config",
                "workflow_context_shared",
                "tools",
                "kwargs",
                "initial_task",
            }
        )
        agent_config = AgentConfig.from_dict(config_data)

        # Create AgentContext with the proper config
        context = AgentContext(config=agent_config)

        # Transfer runtime-specific fields from ReactiveAgentConfig to AgentContext
        if config.mcp_client:
            context.mcp_client = config.mcp_client
        if config.workflow_context_shared:
            context.workflow_context_shared = config.workflow_context_shared
        if config.confirmation_callback:
            context.confirmation_callback = config.confirmation_callback
        if config.confirmation_config:
            context.confirmation_config = config.confirmation_config
        if config.tools:
            context.tools = config.tools

        # Ensure event_bus is initialized if not present
        if context.event_bus is None:
            context.event_bus = EventBus(config.agent_name)

    super().__init__(context)

    # Initialize event bus if not provided
    if event_bus is None:
        self._event_bus = context.event_bus
    else:
        self._event_bus = event_bus

    # Initialize execution engine
    self.execution_engine = ExecutionEngine(self)

events property

events

Get the events interface for subscribing to agent events.

run async

run(initial_task: str, cancellation_event: Optional[asyncio.Event] = None) -> ExecutionResult

Run the agent with the given task.

This is the main entry point for agent execution.

Source code in reactive_agents/app/agents/reactive_agent.py
async def run(
    self,
    initial_task: str,
    cancellation_event: Optional[asyncio.Event] = None,
) -> ExecutionResult:
    """
    Run the agent with the given task.

    This is the main entry point for agent execution.
    """
    result: Optional[ExecutionResult] = None

    if self.agent_logger:
        self.agent_logger.info(
            f"🚀 Starting reactive agent with task: {initial_task}..."
        )

    try:
        # Use the clean execution engine
        if self.execution_engine:
            result = await self.execution_engine.execute(
                initial_task, cancellation_event
            )
        else:
            raise RuntimeError("Execution engine not initialized")

        if self.agent_logger:
            self.agent_logger.info(f"✅ Task completed: {result.status.value}")

        return result

    except Exception as e:
        if self.agent_logger:
            self.agent_logger.error(f"❌ Agent execution failed: {e}")

        return ExecutionResult(
            status=TaskStatus.ERROR,
            final_answer=None,
            session=self.context.session,
            strategy_used=self.context.reasoning_strategy,
            execution_details=result.model_dump() if result else {},
            task_metrics={},
        )

run_with_strategy async

run_with_strategy(initial_task: str, strategy: str, cancellation_event: Optional[asyncio.Event] = None) -> ExecutionResult

Run the agent with a specific strategy.

Temporarily overrides the configured strategy settings.

Source code in reactive_agents/app/agents/reactive_agent.py
async def run_with_strategy(
    self,
    initial_task: str,
    strategy: str,
    cancellation_event: Optional[asyncio.Event] = None,
) -> ExecutionResult:
    """
    Run the agent with a specific strategy.

    Temporarily overrides the configured strategy settings.
    """
    # Store original settings
    original_switching = getattr(
        self.context, "enable_dynamic_strategy_switching", True
    )
    original_strategy = getattr(
        self.context, "reasoning_strategy", "reflect_decide_act"
    )

    # Set to use specific strategy with dynamic switching disabled
    setattr(self.context, "enable_dynamic_strategy_switching", False)
    setattr(self.context, "reasoning_strategy", strategy)

    try:
        if self.agent_logger:
            self.agent_logger.info(f"🎯 Running with forced strategy: {strategy}")

        result = await self.run(initial_task, cancellation_event=cancellation_event)
        return result

    finally:
        # Restore original settings
        setattr(
            self.context, "enable_dynamic_strategy_switching", original_switching
        )
        setattr(self.context, "reasoning_strategy", original_strategy)

initialize async

initialize() -> None

Initialize the agent.

Source code in reactive_agents/app/agents/reactive_agent.py
async def initialize(self) -> None:
    """Initialize the agent."""
    if self.agent_logger:
        self.agent_logger.info("🔧 Initializing reactive agent...")

    # Basic initialization
    await super().initialize()

    if self.agent_logger:
        self.agent_logger.info("✅ Reactive agent initialized")

cleanup async

cleanup() -> None

Cleanup resources.

Source code in reactive_agents/app/agents/reactive_agent.py
async def cleanup(self) -> None:
    """Cleanup resources."""
    if self.agent_logger:
        self.agent_logger.info("🧹 Cleaning up reactive agent...")

    # Basic cleanup - no parent cleanup method exists
    pass

    if self.agent_logger:
        self.agent_logger.info("✅ Reactive agent cleaned up")

pause async

pause()

Pause the agent execution.

Source code in reactive_agents/app/agents/reactive_agent.py
async def pause(self):
    """Pause the agent execution."""
    if self.execution_engine:
        await self.execution_engine.pause()

resume async

resume()

Resume the agent execution.

Source code in reactive_agents/app/agents/reactive_agent.py
async def resume(self):
    """Resume the agent execution."""
    if self.execution_engine:
        await self.execution_engine.resume()

stop async

stop()

Stop the agent execution.

Source code in reactive_agents/app/agents/reactive_agent.py
async def stop(self):
    """Stop the agent execution."""
    if self.execution_engine:
        await self.execution_engine.stop()

terminate async

terminate()

Terminate the agent execution.

Source code in reactive_agents/app/agents/reactive_agent.py
async def terminate(self):
    """Terminate the agent execution."""
    if self.execution_engine:
        await self.execution_engine.terminate()

is_paused

is_paused() -> bool

Check if the agent is currently paused.

Source code in reactive_agents/app/agents/reactive_agent.py
def is_paused(self) -> bool:
    """Check if the agent is currently paused."""
    if self.execution_engine:
        return self.execution_engine.is_paused()
    return False

is_terminating

is_terminating() -> bool

Check if termination has been requested.

Source code in reactive_agents/app/agents/reactive_agent.py
def is_terminating(self) -> bool:
    """Check if termination has been requested."""
    if self.execution_engine:
        return self.execution_engine.is_terminating()
    return False

is_stopping

is_stopping() -> bool

Check if stop has been requested.

Source code in reactive_agents/app/agents/reactive_agent.py
def is_stopping(self) -> bool:
    """Check if stop has been requested."""
    if self.execution_engine:
        return self.execution_engine.is_stopping()
    return False

get_control_state

get_control_state() -> Dict[str, Any]

Get current control state for monitoring.

Source code in reactive_agents/app/agents/reactive_agent.py
def get_control_state(self) -> Dict[str, Any]:
    """Get current control state for monitoring."""
    if self.execution_engine:
        return self.execution_engine.get_control_state()
    return {
        "paused": False,
        "terminate_requested": False,
        "stop_requested": False,
        "pause_event_set": True,
    }

get_status

get_status() -> Dict[str, Any]

Get current agent status.

Source code in reactive_agents/app/agents/reactive_agent.py
def get_status(self) -> Dict[str, Any]:
    """Get current agent status."""
    ts = (
        getattr(self.context.session, "task_status", "unknown")
        if self.context.session
        else "unknown"
    )
    return {
        "agent_name": self.context.agent_name,
        "reasoning_strategy": getattr(
            self.context, "reasoning_strategy", "reflect_decide_act"
        ),
        "dynamic_strategy_switching": getattr(
            self.context, "enable_dynamic_strategy_switching", True
        ),
        "session_active": self.context.session is not None,
        "iterations": (
            getattr(self.context.session, "iterations", 0)
            if self.context.session
            else 0
        ),
        "task_status": (
            ts.value if not isinstance(ts, str) and hasattr(ts, "value") else ts
        ),
    }

get_available_strategies

get_available_strategies() -> List[str]

Get list of available strategies.

Source code in reactive_agents/app/agents/reactive_agent.py
def get_available_strategies(self) -> List[str]:
    """Get list of available strategies."""
    if (
        self.execution_engine
        and hasattr(self.execution_engine, "strategy_manager")
        and self.execution_engine.strategy_manager
    ):
        return list(
            self.execution_engine.strategy_manager.get_available_strategies().keys()
        )
    return []

get_strategy_info

get_strategy_info(strategy_name: str) -> Dict[str, Any]

Get information about a specific strategy.

Source code in reactive_agents/app/agents/reactive_agent.py
def get_strategy_info(self, strategy_name: str) -> Dict[str, Any]:
    """Get information about a specific strategy."""
    if (
        self.execution_engine
        and hasattr(self.execution_engine, "strategy_manager")
        and self.execution_engine.strategy_manager
    ):
        return self.execution_engine.strategy_manager.get_strategy_info(
            strategy_name
        )
    return {"error": "No strategy manager available"}

on_session_started

on_session_started(callback)

Register a callback for session started events.

Source code in reactive_agents/app/agents/reactive_agent.py
def on_session_started(self, callback):
    """Register a callback for session started events."""
    return self.events.on_session_started().subscribe(callback)

on_session_ended

on_session_ended(callback)

Register a callback for session ended events.

Source code in reactive_agents/app/agents/reactive_agent.py
def on_session_ended(self, callback):
    """Register a callback for session ended events."""
    return self.events.on_session_ended().subscribe(callback)

on_iteration_started

on_iteration_started(callback)

Register a callback for iteration started events.

Source code in reactive_agents/app/agents/reactive_agent.py
def on_iteration_started(self, callback):
    """Register a callback for iteration started events."""
    return self.events.on_iteration_started().subscribe(callback)

on_iteration_completed

on_iteration_completed(callback)

Register a callback for iteration completed events.

Source code in reactive_agents/app/agents/reactive_agent.py
def on_iteration_completed(self, callback):
    """Register a callback for iteration completed events."""
    return self.events.on_iteration_completed().subscribe(callback)

on_tool_called

on_tool_called(callback)

Register a callback for tool called events.

Source code in reactive_agents/app/agents/reactive_agent.py
def on_tool_called(self, callback):
    """Register a callback for tool called events."""
    return self.events.on_tool_called().subscribe(callback)

on_tool_completed

on_tool_completed(callback)

Register a callback for tool completed events.

Source code in reactive_agents/app/agents/reactive_agent.py
def on_tool_completed(self, callback):
    """Register a callback for tool completed events."""
    return self.events.on_tool_completed().subscribe(callback)

on_tool_failed

on_tool_failed(callback)

Register a callback for tool failed events.

Source code in reactive_agents/app/agents/reactive_agent.py
def on_tool_failed(self, callback):
    """Register a callback for tool failed events."""
    return self.events.on_tool_failed().subscribe(callback)

on_task_status_changed

on_task_status_changed(callback)

Register a callback for task status changed events.

Source code in reactive_agents/app/agents/reactive_agent.py
def on_task_status_changed(self, callback):
    """Register a callback for task status changed events."""
    return self.events.on_task_status_changed().subscribe(callback)

on_reflection_generated

on_reflection_generated(callback)

Register a callback for reflection generated events.

Source code in reactive_agents/app/agents/reactive_agent.py
def on_reflection_generated(self, callback):
    """Register a callback for reflection generated events."""
    return self.events.on_reflection_generated().subscribe(callback)

on_final_answer_set

on_final_answer_set(callback)

Register a callback for final answer set events.

Source code in reactive_agents/app/agents/reactive_agent.py
def on_final_answer_set(self, callback):
    """Register a callback for final answer set events."""
    return self.events.on_final_answer_set().subscribe(callback)

on_metrics_updated

on_metrics_updated(callback)

Register a callback for metrics updated events.

Source code in reactive_agents/app/agents/reactive_agent.py
def on_metrics_updated(self, callback):
    """Register a callback for metrics updated events."""
    return self.events.on_metrics_updated().subscribe(callback)

on_error_occurred

on_error_occurred(callback)

Register a callback for error occurred events.

Source code in reactive_agents/app/agents/reactive_agent.py
def on_error_occurred(self, callback):
    """Register a callback for error occurred events."""
    return self.events.on_error_occurred().subscribe(callback)

on_pause_requested

on_pause_requested(callback)

Register a callback for pause requested events.

Source code in reactive_agents/app/agents/reactive_agent.py
def on_pause_requested(self, callback):
    """Register a callback for pause requested events."""
    return self.events.on_pause_requested().subscribe(callback)

on_paused

on_paused(callback)

Register a callback for paused events.

Source code in reactive_agents/app/agents/reactive_agent.py
def on_paused(self, callback):
    """Register a callback for paused events."""
    return self.events.on_paused().subscribe(callback)

on_resume_requested

on_resume_requested(callback)

Register a callback for resume requested events.

Source code in reactive_agents/app/agents/reactive_agent.py
def on_resume_requested(self, callback):
    """Register a callback for resume requested events."""
    return self.events.on_resume_requested().subscribe(callback)

on_resumed

on_resumed(callback)

Register a callback for resumed events.

Source code in reactive_agents/app/agents/reactive_agent.py
def on_resumed(self, callback):
    """Register a callback for resumed events."""
    return self.events.on_resumed().subscribe(callback)

on_stop_requested

on_stop_requested(callback)

Register a callback for stop requested events.

Source code in reactive_agents/app/agents/reactive_agent.py
def on_stop_requested(self, callback):
    """Register a callback for stop requested events."""
    return self.events.on_stop_requested().subscribe(callback)

on_stopped

on_stopped(callback)

Register a callback for stopped events.

Source code in reactive_agents/app/agents/reactive_agent.py
def on_stopped(self, callback):
    """Register a callback for stopped events."""
    return self.events.on_stopped().subscribe(callback)

on_terminate_requested

on_terminate_requested(callback)

Register a callback for terminate requested events.

Source code in reactive_agents/app/agents/reactive_agent.py
def on_terminate_requested(self, callback):
    """Register a callback for terminate requested events."""
    return self.events.on_terminate_requested().subscribe(callback)

on_terminated

on_terminated(callback)

Register a callback for terminated events.

Source code in reactive_agents/app/agents/reactive_agent.py
def on_terminated(self, callback):
    """Register a callback for terminated events."""
    return self.events.on_terminated().subscribe(callback)

on_cancelled

on_cancelled(callback)

Register a callback for cancelled events.

Source code in reactive_agents/app/agents/reactive_agent.py
def on_cancelled(self, callback):
    """Register a callback for cancelled events."""
    return self.events.on_cancelled().subscribe(callback)

on_context_changed

on_context_changed(callback)

Register a callback for context changed events.

Source code in reactive_agents/app/agents/reactive_agent.py
def on_context_changed(self, callback):
    """Register a callback for context changed events."""
    # This event type may need to be added to the EventBus
    if hasattr(self.context, "event_bus") and self.context.event_bus:
        from reactive_agents.core.types.event_types import AgentStateEvent

        # For now, we'll use a generic event type or skip
        pass
    return None

on_operation_completed

on_operation_completed(callback)

Register a callback for operation completed events.

Source code in reactive_agents/app/agents/reactive_agent.py
def on_operation_completed(self, callback):
    """Register a callback for operation completed events."""
    # This event type may need to be added to the EventBus
    if hasattr(self.context, "event_bus") and self.context.event_bus:
        from reactive_agents.core.types.event_types import AgentStateEvent

        # For now, we'll use a generic event type or skip
        pass
    return None

on_tokens_used

on_tokens_used(callback)

Register a callback for tokens used events.

Source code in reactive_agents/app/agents/reactive_agent.py
def on_tokens_used(self, callback):
    """Register a callback for tokens used events."""
    # This event type may need to be added to the EventBus
    if hasattr(self.context, "event_bus") and self.context.event_bus:
        from reactive_agents.core.types.event_types import AgentStateEvent

        # For now, we'll use a generic event type or skip
        pass
    return None

on_snapshot_taken

on_snapshot_taken(callback)

Register a callback for snapshot taken events.

Source code in reactive_agents/app/agents/reactive_agent.py
def on_snapshot_taken(self, callback):
    """Register a callback for snapshot taken events."""
    # This event type may need to be added to the EventBus
    if hasattr(self.context, "event_bus") and self.context.event_bus:
        from reactive_agents.core.types.event_types import AgentStateEvent

        # For now, we'll use a generic event type or skip
        pass
    return None