Skip to content

Events

Snippbot uses an internal async publish/subscribe event bus (EventBus) that decouples subsystems. Events flow from producers (executor, scheduler, channels) to consumers (hooks engine, monitoring, SSE streams).

Producers EventBus Consumers
───────── ──────── ─────────
Task Executor ──────► publish(event) ──────► Hook Engine
Scheduler Engine ──────► ──────► SSE Streams
Channel Adapter ──────► ──────► Monitor / Analytics
Sub-agent Manager ──────► ──────► UI Live Updates
Security System ──────► ──────► Webhook Deliveries

The event bus is an in-process async queue. Events are not persisted by default, but the hooks engine and monitor subsystem record relevant events to their respective databases.

In Python code (for built-in subsystems):

from snippbot_core.event_bus import get_event_bus
bus = get_event_bus()
def my_handler(event: dict) -> None:
print(f"Event: {event['type']}, data: {event['data']}")
bus.subscribe("task.completed", my_handler)
bus.subscribe("*", my_handler) # Subscribe to ALL events

Unsubscribe:

bus.unsubscribe("task.completed", my_handler)

All events share this structure:

{
"type": "task.completed",
"data": {...},
"timestamp": "2026-03-02T14:00:00Z",
"source": "executor"
}
EventFired whenKey data fields
task.queuedTask added to queuetask_id, project_id
task.startedTask begins executiontask_id, agent_id
task.completedTask succeedstask_id, result, duration_seconds
task.failedTask failstask_id, error, failure_class
task.retryingTask retrying after failuretask_id, attempt
task.skippedTask skippedtask_id
EventFired whenKey data fields
approval.requestedAgent requires human approvaltask_id, action, reason
approval.grantedHuman approvestask_id, approver
approval.deniedHuman rejectstask_id, approver, reason
EventFired whenKey data fields
project.createdNew project createdproject_id, name, goal
project.updatedStatus or config changesproject_id, old_status, new_status
project.completedAll tasks doneproject_id, duration_seconds
project.failedProject failsproject_id, error
EventFired whenKey data fields
scheduler.startedScheduler engine startstick_interval_seconds
scheduler.stoppedScheduler engine stops
scheduler.tickScheduler checks jobsjobs_checked
scheduler.errorScheduler encounters errorerror
job_run.createdJob run is createdjob_id, run_id
job_run.startedJob run begins executionjob_id, run_id
job_run.completedJob run finishes successfullyjob_id, run_id, output, duration_seconds
job_run.failedJob run failsjob_id, run_id, error, attempt
job_run.cancelledJob run is cancelledjob_id, run_id
job_run.skippedRun skipped (condition unmet)job_id, run_id
job_run.retryingJob run retrying after failurejob_id, run_id, attempt
job_run.deliveredOutput delivered to channeljob_id, run_id, channel
EventFired whenKey data fields
workflow.createdNew workflowworkflow_id, name
workflow.run.startedRun beginsworkflow_id, run_id
workflow.run.completedAll steps donerun_id, duration_seconds
workflow.run.failedRun failsrun_id, error
workflow.run.cancelledRun cancelledrun_id
workflow.step.startedStep beginsrun_id, step_id
workflow.step.completedStep finishesrun_id, step_id, output
workflow.step.failedStep failsrun_id, step_id, error
workflow.step.skippedStep skippedrun_id, step_id
EventFired whenKey data fields
channel.message.receivedInbound messageplatform, channel_id, user_id, text
channel.message.sentOutbound messageplatform, channel_id, text
channel.message.failedOutbound delivery failedplatform, channel_id, error
channel.adapter.startedAdapter comes onlineplatforms
channel.adapter.stoppedAdapter goes offlinereason
EventFired whenKey data fields
subagent.spawnedSub-agent createdsubagent_id, parent_id, role
subagent.startedSub-agent begins worksubagent_id
subagent.completedSub-agent finishessubagent_id, result
subagent.failedSub-agent failssubagent_id, error
subagent.cancelledSub-agent cancelledsubagent_id
subagent.approval_requestedApproval gate triggeredsubagent_id, action
subagent.approval_grantedApproval grantedsubagent_id
subagent.approval_deniedApproval deniedsubagent_id, reason
EventFired whenKey data fields
hook.triggeredHook fireshook_id, event_type
hook.completedHook execution succeedshook_id, duration_ms
hook.failedHook execution failshook_id, error
hook.timeoutHook exceeds time limithook_id, timeout_ms
hook.skippedHook condition not methook_id, reason
EventFired whenKey data fields
system.startupDaemon startsversion, config
system.shutdownDaemon stopsreason
system.errorUnhandled exceptionerror, traceback

Subscribe with "*" to receive all events. The hook engine uses this to check each event against registered hooks:

bus.subscribe("*", hook_engine.on_event)

Prefix matching is also supported:

bus.subscribe("task.*", handler) # All task events
bus.subscribe("scheduler.*", handler) # All scheduler events

Events are transient by default (in-memory only). Persistence happens at the consumer level:

  • Hook engine: Stores delivery history in hooks.db
  • Monitor: Records events to activity log in main.db
  • Memory system: Persists agent conversation episodes to memory_{id}.db