Event Streaming
The Bridge SDK provides a real-time event system for monitoring session lifecycle and task execution. Subscribe to events on a session handle to receive structured updates as Nity processes your tasks.
Subscribing to Events
import { createNitySession } from '@mariozechner/pi-coding-agent/extensions/nity/bridge';
const session = await createNitySession({ costTier: 'free' });
const unsubscribe = session.subscribe((event) => {
console.log(`[${event.type}]`, event.payload);
});
const result = await session.sendTask('execute', 'Add input validation to the forms');
// Unsubscribe when done
unsubscribe();
await session.close();The subscribe method returns an unsubscribe function. Call it to stop receiving events.
Event Types
Every event has the shape:
interface NityEvent {
type: NityEventType;
timestamp: number; // Date.now() at emission
payload: Record<string, unknown>;
}session_start
Emitted when a session is created.
{
type: 'session_start',
timestamp: 1710931200000,
payload: {
sessionId: 'nity-bridge-1-1710931200000',
costTier: 'free',
orchestratorModel: 'ollama',
executorModel: 'ollama',
}
}task_start
Emitted when sendTask begins execution.
{
type: 'task_start',
timestamp: 1710931200500,
payload: {
taskType: 'execute',
prompt: 'Add input validation to the forms', // Truncated to 200 chars
priority: 'normal',
}
}task_progress
Emitted during task execution as intermediate events arrive from the pi-coding-agent runtime.
{
type: 'task_progress',
timestamp: 1710931201200,
payload: {
taskType: 'execute',
category: 'tool_call',
tool: 'nity_execute',
message: undefined,
progress: {
phase: 'Calling nity_execute',
percentage: 25,
delta: '{"task":"Add input validation..."}',
},
}
}task_complete
Emitted when a task finishes successfully.
{
type: 'task_complete',
timestamp: 1710931245000,
payload: {
taskType: 'execute',
duration: 44500,
// When degraded mode was used:
degraded: true,
}
}task_error
Emitted on task failure, retry attempts, or cancellation.
{
type: 'task_error',
timestamp: 1710931210000,
payload: {
taskType: 'execute',
error: 'Connection timeout',
duration: 9500,
// On retry:
operation: 'sendTask(execute)',
attempt: 1,
maxAttempts: 3,
errorCount: 1,
// On fallback:
fallback: true,
}
}session_close
Emitted when the session is closed.
{
type: 'session_close',
timestamp: 1710931300000,
payload: {
sessionId: 'nity-bridge-1-1710931200000',
}
}Progress Extraction
Use extractProgress to pull structured progress from raw pi-coding-agent events:
import {
createNitySession,
extractProgress,
categorizeEvent,
} from '@mariozechner/pi-coding-agent/extensions/nity/bridge';
const session = await createNitySession({ costTier: 'free' });
session.subscribe((event) => {
if (event.type === 'task_progress' && event.payload.progress) {
const { phase, percentage, delta } = event.payload.progress;
console.log(`[${percentage}%] ${phase}`);
if (delta) console.log(` ${delta}`);
}
});Progress is derived from the event category:
| Category | Phase | Percentage |
|---|---|---|
tool_call | Calling {tool} | 25 |
message | Processing | 50 |
tool_result | Result from {tool} | 75 |
Quality Gate Detection
extractQualityGates inspects tool result events for quality gate indicators:
import { extractQualityGates } from '@mariozechner/pi-coding-agent/extensions/nity/bridge';
// Returns { passed: boolean, details: string } | null
const gate = extractQualityGates(rawEvent);
if (gate) {
console.log(`Quality gates: ${gate.passed ? 'PASSED' : 'FAILED'}`);
console.log(gate.details);
}Detection looks for keywords (quality, gate, passed, failed) in tool output. A result is marked passed unless it contains fail or block.
The NityResult also carries qualityGatePassed when quality gates are enabled in the session config.
Reflection Extraction
extractReflection parses bullet-pointed lines from tool results into structured lessons and follow-ups:
import { extractReflection } from '@mariozechner/pi-coding-agent/extensions/nity/bridge';
// Returns { lessons: string[], followUps: string[] } | null
const reflection = extractReflection(rawEvent);
if (reflection) {
console.log('Lessons:', reflection.lessons);
console.log('Follow-ups:', reflection.followUps);
}Lines containing keywords like follow-up, next, or todo are classified as follow-ups; others become lessons.
Episode Reference Extraction
extractEpisodeRefs finds episode ID references in tool output:
import { extractEpisodeRefs } from '@mariozechner/pi-coding-agent/extensions/nity/bridge';
// Returns string[]
const refs = extractEpisodeRefs(rawEvent);
// ['ep-041', 'episode: auth-fix']Matches patterns: ep-{id}, episode: {id}, [ep-{id}] where the ID is 3–32 alphanumeric characters.
Event Categorization
categorizeEvent classifies raw events by type:
import { categorizeEvent } from '@mariozechner/pi-coding-agent/extensions/nity/bridge';
// Returns 'tool_call' | 'tool_result' | 'message' | 'error' | 'session' | 'unknown'
const category = categorizeEvent(rawEvent);| Category | Matched By |
|---|---|
tool_call | type contains "tool" + "call" |
tool_result | type contains "tool" + "result" |
message | type contains "message" or "content" |
error | type contains "error" or "fail" |
session | type contains "session", "start", or "end" |
unknown | No match |
Event Transformer
createEventTransformer creates a reusable function that converts raw events to typed NityEvent objects:
import {
createEventTransformer,
buildResult,
} from '@mariozechner/pi-coding-agent/extensions/nity/bridge';
const transform = createEventTransformer('execute');
// Use in a streaming pipeline
const events: PiAgentEvent[] = [];
for await (const raw of rawEventStream) {
const nityEvent = transform(raw);
if (nityEvent) {
console.log(`[${nityEvent.type}]`, nityEvent.payload);
}
events.push(raw);
}
// Build final result from accumulated events
const result = buildResult('execute', events, Date.now() - startTime);Progress Throttling
For high-frequency event streams, use createProgressThrottler to batch rapid updates:
import { createProgressThrottler } from '@mariozechner/pi-coding-agent/extensions/nity/bridge';
const throttler = createProgressThrottler(
(progress) => {
// This is called at most once per 100ms
updateProgressBar(progress.percentage);
console.log(progress.phase);
},
100 // minimum interval in ms
);
// Push rapid updates — only the latest within each interval is emitted
throttler.push({ phase: 'Processing', percentage: 30 });
throttler.push({ phase: 'Processing', percentage: 45 });
throttler.push({ phase: 'Processing', percentage: 60 });
// Flush remaining immediately (e.g., on task complete)
throttler.flush();
// Cancel pending (e.g., on task error)
throttler.cancel();Progress Persistence
Use ProgressStore to persist progress updates for long-running tasks:
import { ProgressStore } from '@mariozechner/pi-coding-agent/extensions/nity/bridge';
const store = new ProgressStore();
store.record('task-001', { phase: 'Analyzing', percentage: 25 });
store.record('task-001', { phase: 'Executing', percentage: 50 });
store.record('task-001', { phase: 'Validating', percentage: 75 });
store.getLatest('task-001');
// { phase: 'Validating', percentage: 75 }
store.getHistory('task-001');
// [{ phase: 'Analyzing', ... }, { phase: 'Executing', ... }, { phase: 'Validating', ... }]
store.clear('task-001');Complete Example
import {
createNitySession,
createProgressThrottler,
ProgressStore,
} from '@mariozechner/pi-coding-agent/extensions/nity/bridge';
const session = await createNitySession({ costTier: 'hybrid' });
const store = new ProgressStore();
const throttler = createProgressThrottler((progress) => {
store.record('current-task', progress);
console.log(`[${progress.percentage}%] ${progress.phase}`);
}, 200);
const unsubscribe = session.subscribe((event) => {
switch (event.type) {
case 'session_start':
console.log('Session started:', event.payload.sessionId);
break;
case 'task_start':
console.log('Task started:', event.payload.taskType);
break;
case 'task_progress':
if (event.payload.progress) {
throttler.push(event.payload.progress as any);
}
break;
case 'task_complete':
throttler.flush();
console.log('Task completed in', event.payload.duration, 'ms');
break;
case 'task_error':
throttler.cancel();
console.error('Task error:', event.payload.error);
break;
case 'session_close':
console.log('Session closed');
break;
}
});
try {
const result = await session.sendTask('validate', 'Check the auth module');
console.log('Result:', result.output);
console.log('Quality gates:', result.qualityGatePassed);
} finally {
unsubscribe();
await session.close();
}