Snippbot uses an internal async publish/subscribe event bus (EventBus) that decouples subsystems. Events flow from producers (executor, scheduler, channels) to consumers (hooks engine, monitoring, SSE streams).
Producers EventBus Consumers
───────── ──────── ─────────
Task Executor ──────► publish(event) ──────► Hook Engine
Scheduler Engine ──────► ──────► SSE Streams
Channel Adapter ──────► ──────► Monitor / Analytics
Sub-agent Manager ──────► ──────► UI Live Updates
Security System ──────► ──────► Webhook Deliveries
The event bus is an in-process async queue. Events are not persisted by default, but the hooks engine and monitor subsystem record relevant events to their respective databases.
In Python code (for built-in subsystems):
from snippbot_core.event_bus import get_event_bus
def my_handler ( event : dict ) -> None :
print ( f "Event: {event [ ' type ' ] } , data: {event [ ' data ' ] } " )
bus. subscribe ( " task.completed " , my_handler )
bus. subscribe ( " * " , my_handler ) # Subscribe to ALL events
Unsubscribe:
bus. unsubscribe ( " task.completed " , my_handler )
All events share this structure:
"type" : " task.completed " ,
"timestamp" : " 2026-03-02T14:00:00Z " ,
Event Fired when Key data fields task.queuedTask added to queue task_id, project_idtask.startedTask begins execution task_id, agent_idtask.completedTask succeeds task_id, result, duration_secondstask.failedTask fails task_id, error, failure_classtask.retryingTask retrying after failure task_id, attempttask.skippedTask skipped task_id
Event Fired when Key data fields approval.requestedAgent requires human approval task_id, action, reasonapproval.grantedHuman approves task_id, approverapproval.deniedHuman rejects task_id, approver, reason
Event Fired when Key data fields project.createdNew project created project_id, name, goalproject.updatedStatus or config changes project_id, old_status, new_statusproject.completedAll tasks done project_id, duration_secondsproject.failedProject fails project_id, error
Event Fired when Key data fields scheduler.startedScheduler engine starts tick_interval_secondsscheduler.stoppedScheduler engine stops — scheduler.tickScheduler checks jobs jobs_checkedscheduler.errorScheduler encounters error errorjob_run.createdJob run is created job_id, run_idjob_run.startedJob run begins execution job_id, run_idjob_run.completedJob run finishes successfully job_id, run_id, output, duration_secondsjob_run.failedJob run fails job_id, run_id, error, attemptjob_run.cancelledJob run is cancelled job_id, run_idjob_run.skippedRun skipped (condition unmet) job_id, run_idjob_run.retryingJob run retrying after failure job_id, run_id, attemptjob_run.deliveredOutput delivered to channel job_id, run_id, channel
Event Fired when Key data fields workflow.createdNew workflow workflow_id, nameworkflow.run.startedRun begins workflow_id, run_idworkflow.run.completedAll steps done run_id, duration_secondsworkflow.run.failedRun fails run_id, errorworkflow.run.cancelledRun cancelled run_idworkflow.step.startedStep begins run_id, step_idworkflow.step.completedStep finishes run_id, step_id, outputworkflow.step.failedStep fails run_id, step_id, errorworkflow.step.skippedStep skipped run_id, step_id
Event Fired when Key data fields channel.message.receivedInbound message platform, channel_id, user_id, textchannel.message.sentOutbound message platform, channel_id, textchannel.message.failedOutbound delivery failed platform, channel_id, errorchannel.adapter.startedAdapter comes online platformschannel.adapter.stoppedAdapter goes offline reason
Event Fired when Key data fields subagent.spawnedSub-agent created subagent_id, parent_id, rolesubagent.startedSub-agent begins work subagent_idsubagent.completedSub-agent finishes subagent_id, resultsubagent.failedSub-agent fails subagent_id, errorsubagent.cancelledSub-agent cancelled subagent_idsubagent.approval_requestedApproval gate triggered subagent_id, actionsubagent.approval_grantedApproval granted subagent_idsubagent.approval_deniedApproval denied subagent_id, reason
Event Fired when Key data fields hook.triggeredHook fires hook_id, event_typehook.completedHook execution succeeds hook_id, duration_mshook.failedHook execution fails hook_id, errorhook.timeoutHook exceeds time limit hook_id, timeout_mshook.skippedHook condition not met hook_id, reason
Event Fired when Key data fields system.startupDaemon starts version, configsystem.shutdownDaemon stops reasonsystem.errorUnhandled exception error, traceback
Subscribe with "*" to receive all events. The hook engine uses this to check each event against registered hooks:
bus. subscribe ( " * " , hook_engine.on_event )
Prefix matching is also supported:
bus. subscribe ( " task.* " , handler ) # All task events
bus. subscribe ( " scheduler.* " , handler ) # All scheduler events
Events are transient by default (in-memory only). Persistence happens at the consumer level:
Hook engine : Stores delivery history in hooks.db
Monitor : Records events to activity log in main.db
Memory system : Persists agent conversation episodes to memory_{id}.db