Documentation
Bridge SDK
Event Streaming

Event Streaming

The Bridge SDK provides a real-time event system for monitoring session lifecycle and task execution. Subscribe to events on a session handle to receive structured updates as Nity processes your tasks.

Subscribing to Events

import { createNitySession } from '@mariozechner/pi-coding-agent/extensions/nity/bridge';
 
const session = await createNitySession({ costTier: 'free' });
 
const unsubscribe = session.subscribe((event) => {
  console.log(`[${event.type}]`, event.payload);
});
 
const result = await session.sendTask('execute', 'Add input validation to the forms');
 
// Unsubscribe when done
unsubscribe();
await session.close();

The subscribe method returns an unsubscribe function. Call it to stop receiving events.

Event Types

Every event has the shape:

interface NityEvent {
  type: NityEventType;
  timestamp: number;  // Date.now() at emission
  payload: Record<string, unknown>;
}

session_start

Emitted when a session is created.

{
  type: 'session_start',
  timestamp: 1710931200000,
  payload: {
    sessionId: 'nity-bridge-1-1710931200000',
    costTier: 'free',
    orchestratorModel: 'ollama',
    executorModel: 'ollama',
  }
}

task_start

Emitted when sendTask begins execution.

{
  type: 'task_start',
  timestamp: 1710931200500,
  payload: {
    taskType: 'execute',
    prompt: 'Add input validation to the forms',  // Truncated to 200 chars
    priority: 'normal',
  }
}

task_progress

Emitted during task execution as intermediate events arrive from the pi-coding-agent runtime.

{
  type: 'task_progress',
  timestamp: 1710931201200,
  payload: {
    taskType: 'execute',
    category: 'tool_call',
    tool: 'nity_execute',
    message: undefined,
    progress: {
      phase: 'Calling nity_execute',
      percentage: 25,
      delta: '{"task":"Add input validation..."}',
    },
  }
}

task_complete

Emitted when a task finishes successfully.

{
  type: 'task_complete',
  timestamp: 1710931245000,
  payload: {
    taskType: 'execute',
    duration: 44500,
    // When degraded mode was used:
    degraded: true,
  }
}

task_error

Emitted on task failure, retry attempts, or cancellation.

{
  type: 'task_error',
  timestamp: 1710931210000,
  payload: {
    taskType: 'execute',
    error: 'Connection timeout',
    duration: 9500,
    // On retry:
    operation: 'sendTask(execute)',
    attempt: 1,
    maxAttempts: 3,
    errorCount: 1,
    // On fallback:
    fallback: true,
  }
}

session_close

Emitted when the session is closed.

{
  type: 'session_close',
  timestamp: 1710931300000,
  payload: {
    sessionId: 'nity-bridge-1-1710931200000',
  }
}

Progress Extraction

Use extractProgress to pull structured progress from raw pi-coding-agent events:

import {
  createNitySession,
  extractProgress,
  categorizeEvent,
} from '@mariozechner/pi-coding-agent/extensions/nity/bridge';
 
const session = await createNitySession({ costTier: 'free' });
 
session.subscribe((event) => {
  if (event.type === 'task_progress' && event.payload.progress) {
    const { phase, percentage, delta } = event.payload.progress;
    console.log(`[${percentage}%] ${phase}`);
    if (delta) console.log(`  ${delta}`);
  }
});

Progress is derived from the event category:

CategoryPhasePercentage
tool_callCalling {tool}25
messageProcessing50
tool_resultResult from {tool}75

Quality Gate Detection

extractQualityGates inspects tool result events for quality gate indicators:

import { extractQualityGates } from '@mariozechner/pi-coding-agent/extensions/nity/bridge';
 
// Returns { passed: boolean, details: string } | null
const gate = extractQualityGates(rawEvent);
if (gate) {
  console.log(`Quality gates: ${gate.passed ? 'PASSED' : 'FAILED'}`);
  console.log(gate.details);
}

Detection looks for keywords (quality, gate, passed, failed) in tool output. A result is marked passed unless it contains fail or block.

The NityResult also carries qualityGatePassed when quality gates are enabled in the session config.

Reflection Extraction

extractReflection parses bullet-pointed lines from tool results into structured lessons and follow-ups:

import { extractReflection } from '@mariozechner/pi-coding-agent/extensions/nity/bridge';
 
// Returns { lessons: string[], followUps: string[] } | null
const reflection = extractReflection(rawEvent);
if (reflection) {
  console.log('Lessons:', reflection.lessons);
  console.log('Follow-ups:', reflection.followUps);
}

Lines containing keywords like follow-up, next, or todo are classified as follow-ups; others become lessons.

Episode Reference Extraction

extractEpisodeRefs finds episode ID references in tool output:

import { extractEpisodeRefs } from '@mariozechner/pi-coding-agent/extensions/nity/bridge';
 
// Returns string[]
const refs = extractEpisodeRefs(rawEvent);
// ['ep-041', 'episode: auth-fix']

Matches patterns: ep-{id}, episode: {id}, [ep-{id}] where the ID is 3–32 alphanumeric characters.

Event Categorization

categorizeEvent classifies raw events by type:

import { categorizeEvent } from '@mariozechner/pi-coding-agent/extensions/nity/bridge';
 
// Returns 'tool_call' | 'tool_result' | 'message' | 'error' | 'session' | 'unknown'
const category = categorizeEvent(rawEvent);
CategoryMatched By
tool_calltype contains "tool" + "call"
tool_resulttype contains "tool" + "result"
messagetype contains "message" or "content"
errortype contains "error" or "fail"
sessiontype contains "session", "start", or "end"
unknownNo match

Event Transformer

createEventTransformer creates a reusable function that converts raw events to typed NityEvent objects:

import {
  createEventTransformer,
  buildResult,
} from '@mariozechner/pi-coding-agent/extensions/nity/bridge';
 
const transform = createEventTransformer('execute');
 
// Use in a streaming pipeline
const events: PiAgentEvent[] = [];
for await (const raw of rawEventStream) {
  const nityEvent = transform(raw);
  if (nityEvent) {
    console.log(`[${nityEvent.type}]`, nityEvent.payload);
  }
  events.push(raw);
}
 
// Build final result from accumulated events
const result = buildResult('execute', events, Date.now() - startTime);

Progress Throttling

For high-frequency event streams, use createProgressThrottler to batch rapid updates:

import { createProgressThrottler } from '@mariozechner/pi-coding-agent/extensions/nity/bridge';
 
const throttler = createProgressThrottler(
  (progress) => {
    // This is called at most once per 100ms
    updateProgressBar(progress.percentage);
    console.log(progress.phase);
  },
  100  // minimum interval in ms
);
 
// Push rapid updates — only the latest within each interval is emitted
throttler.push({ phase: 'Processing', percentage: 30 });
throttler.push({ phase: 'Processing', percentage: 45 });
throttler.push({ phase: 'Processing', percentage: 60 });
 
// Flush remaining immediately (e.g., on task complete)
throttler.flush();
 
// Cancel pending (e.g., on task error)
throttler.cancel();

Progress Persistence

Use ProgressStore to persist progress updates for long-running tasks:

import { ProgressStore } from '@mariozechner/pi-coding-agent/extensions/nity/bridge';
 
const store = new ProgressStore();
 
store.record('task-001', { phase: 'Analyzing', percentage: 25 });
store.record('task-001', { phase: 'Executing', percentage: 50 });
store.record('task-001', { phase: 'Validating', percentage: 75 });
 
store.getLatest('task-001');
// { phase: 'Validating', percentage: 75 }
 
store.getHistory('task-001');
// [{ phase: 'Analyzing', ... }, { phase: 'Executing', ... }, { phase: 'Validating', ... }]
 
store.clear('task-001');

Complete Example

import {
  createNitySession,
  createProgressThrottler,
  ProgressStore,
} from '@mariozechner/pi-coding-agent/extensions/nity/bridge';
 
const session = await createNitySession({ costTier: 'hybrid' });
const store = new ProgressStore();
 
const throttler = createProgressThrottler((progress) => {
  store.record('current-task', progress);
  console.log(`[${progress.percentage}%] ${progress.phase}`);
}, 200);
 
const unsubscribe = session.subscribe((event) => {
  switch (event.type) {
    case 'session_start':
      console.log('Session started:', event.payload.sessionId);
      break;
    case 'task_start':
      console.log('Task started:', event.payload.taskType);
      break;
    case 'task_progress':
      if (event.payload.progress) {
        throttler.push(event.payload.progress as any);
      }
      break;
    case 'task_complete':
      throttler.flush();
      console.log('Task completed in', event.payload.duration, 'ms');
      break;
    case 'task_error':
      throttler.cancel();
      console.error('Task error:', event.payload.error);
      break;
    case 'session_close':
      console.log('Session closed');
      break;
  }
});
 
try {
  const result = await session.sendTask('validate', 'Check the auth module');
  console.log('Result:', result.output);
  console.log('Quality gates:', result.qualityGatePassed);
} finally {
  unsubscribe();
  await session.close();
}