Unified WebSocket System mit Rooms/Channels #264

Closed
opened 2026-01-25 11:37:13 +00:00 by jack · 2 comments
Owner

Motivation

Aktuell haben wir eine Hybrid-Architektur:

  • SSE (/api/stream) für UI-Updates (Server → Client only)
  • WebSocket (/ws) für Worker-Kommunikation (bidirektional)

Das führt zu:

  • Zwei verschiedene Systeme zu maintainen
  • UI kann nicht filtern/subscriben (bekommt alle 35+ Events)
  • Keine bidirektionale Kommunikation für Browser
  • Erschwertes Federation-Setup (Hubs müssten beide Protokolle sprechen)

Vorschlag: Unified WebSocket mit Rooms/Channels

Ein einziges WebSocket-System für alle Clients (Browser, Worker, Hubs).

Architektur

┌─────────────────────────────────────────────────────────────────────┐
│                     UNIFIED WEBSOCKET SYSTEM                        │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   Browser ──────┐                                                  │
│     ├ subscribe('session:*')                                       │
│     ├ subscribe('observation:*')                                   │
│     └ unsubscribe('worker:*')                                      │
│                 │                                                  │
│                 │        ┌─────────────────────────────┐           │
│                 ├───────►│    WebSocket Server         │           │
│                 │        │         /ws                 │           │
│                 │        │                             │           │
│   Worker ───────┤        │  ┌─────────────────────┐   │           │
│     ├ register()│        │  │   Channel Manager   │   │           │
│     └ caps: []  │        │  │                     │   │           │
│                 │        │  │  • session:*        │   │           │
│                 │        │  │  • task:*           │   │           │
│   Hub ──────────┘        │  │  • worker:*         │   │           │
│     ├ federate()         │  │  • observation:*    │   │           │
│     └ sync tasks         │  │  • claudemd:*       │   │           │
│                          │  └─────────────────────┘   │           │
│                          └─────────────────────────────┘           │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

Client-Typen

type ClientType = 'browser' | 'worker' | 'hub';

interface WSClient {
  id: string;
  type: ClientType;
  subscriptions: Set<string>;      // Channel-Pattern
  capabilities?: string[];         // Nur Worker
  hubId?: string;                  // Nur Hub
  permissions: ClientPermission[]; // Was darf der Client?
  authenticatedAt?: Date;
}

type ClientPermission = 
  | 'subscribe'      // Channels abonnieren
  | 'broadcast'      // Events senden (nur Worker/Hub)
  | 'task:receive'   // Tasks empfangen (nur Worker)
  | 'task:complete'  // Tasks abschließen (nur Worker)
  | 'federate';      // Hub-Federation (nur Hub)

Protokoll

1. Connection & Auth

// Client → Server
{ type: 'auth', token?: string }

// Server → Client
{ type: 'auth:success', clientId: string, permissions: string[] }
{ type: 'auth:failed', reason: string }

2. Subscription (Browser/Hub)

// Client → Server
{ type: 'subscribe', channels: ['session:*', 'observation:*'] }
{ type: 'unsubscribe', channels: ['worker:*'] }

// Server → Client
{ type: 'subscribed', channels: ['session:*', 'observation:*'] }

3. Worker Registration

// Worker → Server
{ 
  type: 'register',
  capabilities: ['mistral', 'summarize', 'claude-md'],
  maxConcurrency: 2,
  labels: { region: 'eu', gpu: 'true' }
}

// Server → Worker
{ type: 'registered', workerId: string }

4. Task Assignment (Worker only)

// Server → Worker
{ type: 'task:assign', taskId: string, payload: {...} }

// Worker → Server
{ type: 'task:progress', taskId: string, progress: 0.5 }
{ type: 'task:complete', taskId: string, result: {...} }
{ type: 'task:error', taskId: string, error: string }

5. Events (Broadcast)

// Server → subscribed Clients
{ 
  type: 'event',
  channel: 'observation:created',
  data: { id: 123, title: '...' },
  timestamp: '2026-01-25T...'
}

Channel-Patterns

Pattern Events Typische Subscriber
session:* started, ended, pre-compact Browser, Hub
task:* queued, assigned, completed, failed, progress Browser, Hub
worker:* connected, disconnected, spawned, exited Browser, Hub
observation:* created, queued Browser, Hub
claudemd:* ready Browser, SSE-Writer
summary:* created Browser
prompt:* new Browser
subagent:* start, stop Browser

Permissions nach Client-Typ

Client Default Permissions
Browser subscribe
Worker subscribe, task:receive, task:complete
Hub subscribe, broadcast, federate

Migration

  1. Phase 1: WebSocket-Server erweitern um Channel-Support
  2. Phase 2: Browser-Client von SSE auf WebSocket migrieren
  3. Phase 3: SSE-Endpoints deprecaten
  4. Phase 4: SSE-Endpoints entfernen

UI-Integration

// Neuer useWebSocket Hook
function useWebSocket(channels: string[]) {
  const [events, setEvents] = useState<WSEvent[]>([]);
  const wsRef = useRef<WebSocket>();
  
  useEffect(() => {
    const ws = new WebSocket('/ws');
    
    ws.onopen = () => {
      ws.send(JSON.stringify({ 
        type: 'auth' 
      }));
      ws.send(JSON.stringify({ 
        type: 'subscribe', 
        channels 
      }));
    };
    
    ws.onmessage = (e) => {
      const msg = JSON.parse(e.data);
      if (msg.type === 'event') {
        setEvents(prev => [...prev, msg]);
      }
    };
    
    wsRef.current = ws;
    return () => ws.close();
  }, [channels]);
  
  return { events, ws: wsRef.current };
}

// Usage
function Dashboard() {
  const { events } = useWebSocket(['session:*', 'task:*', 'worker:*']);
  // ...
}

function LiveView() {
  const { events } = useWebSocket(['*']); // Alle Events
  // ...
}

Vorteile

  • Ein System statt zwei (SSE + WS)
  • Bidirektional - UI kann mit Backend kommunizieren
  • Filtering - Clients subscriben nur relevante Channels
  • Weniger Traffic - Kein Broadcast aller Events an alle
  • Federation-ready - Hubs können gleichen Kanal nutzen
  • Permissions - Klare Rechte pro Client-Typ
  • Debugging - Ein Protokoll zu analysieren

Offene Fragen

  • WebSocket-Reconnect-Logic im Browser (mit Backoff)?
  • Heartbeat für Browser-Clients nötig?
  • Rate-Limiting für Subscriptions?
  • Message-Buffer bei kurzer Disconnect?

Abhängigkeiten

  • Blockt: #263 (WorkerHub Federation) - sollte auf unified WS aufbauen

Aufwand

  • Backend: WebSocket-Server erweitern, Channel-Manager
  • UI: Neuer useWebSocket Hook, SSE-Hook ersetzen
  • Worker: Minimal (nutzt bereits WS)
  • Tests: WebSocket-Mocking
## Motivation Aktuell haben wir eine Hybrid-Architektur: - **SSE** (`/api/stream`) für UI-Updates (Server → Client only) - **WebSocket** (`/ws`) für Worker-Kommunikation (bidirektional) Das führt zu: - Zwei verschiedene Systeme zu maintainen - UI kann nicht filtern/subscriben (bekommt alle 35+ Events) - Keine bidirektionale Kommunikation für Browser - Erschwertes Federation-Setup (Hubs müssten beide Protokolle sprechen) ## Vorschlag: Unified WebSocket mit Rooms/Channels Ein einziges WebSocket-System für alle Clients (Browser, Worker, Hubs). ### Architektur ``` ┌─────────────────────────────────────────────────────────────────────┐ │ UNIFIED WEBSOCKET SYSTEM │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ Browser ──────┐ │ │ ├ subscribe('session:*') │ │ ├ subscribe('observation:*') │ │ └ unsubscribe('worker:*') │ │ │ │ │ │ ┌─────────────────────────────┐ │ │ ├───────►│ WebSocket Server │ │ │ │ │ /ws │ │ │ │ │ │ │ │ Worker ───────┤ │ ┌─────────────────────┐ │ │ │ ├ register()│ │ │ Channel Manager │ │ │ │ └ caps: [] │ │ │ │ │ │ │ │ │ │ • session:* │ │ │ │ │ │ │ • task:* │ │ │ │ Hub ──────────┘ │ │ • worker:* │ │ │ │ ├ federate() │ │ • observation:* │ │ │ │ └ sync tasks │ │ • claudemd:* │ │ │ │ │ └─────────────────────┘ │ │ │ └─────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────┘ ``` ### Client-Typen ```typescript type ClientType = 'browser' | 'worker' | 'hub'; interface WSClient { id: string; type: ClientType; subscriptions: Set<string>; // Channel-Pattern capabilities?: string[]; // Nur Worker hubId?: string; // Nur Hub permissions: ClientPermission[]; // Was darf der Client? authenticatedAt?: Date; } type ClientPermission = | 'subscribe' // Channels abonnieren | 'broadcast' // Events senden (nur Worker/Hub) | 'task:receive' // Tasks empfangen (nur Worker) | 'task:complete' // Tasks abschließen (nur Worker) | 'federate'; // Hub-Federation (nur Hub) ``` ### Protokoll #### 1. Connection & Auth ```typescript // Client → Server { type: 'auth', token?: string } // Server → Client { type: 'auth:success', clientId: string, permissions: string[] } { type: 'auth:failed', reason: string } ``` #### 2. Subscription (Browser/Hub) ```typescript // Client → Server { type: 'subscribe', channels: ['session:*', 'observation:*'] } { type: 'unsubscribe', channels: ['worker:*'] } // Server → Client { type: 'subscribed', channels: ['session:*', 'observation:*'] } ``` #### 3. Worker Registration ```typescript // Worker → Server { type: 'register', capabilities: ['mistral', 'summarize', 'claude-md'], maxConcurrency: 2, labels: { region: 'eu', gpu: 'true' } } // Server → Worker { type: 'registered', workerId: string } ``` #### 4. Task Assignment (Worker only) ```typescript // Server → Worker { type: 'task:assign', taskId: string, payload: {...} } // Worker → Server { type: 'task:progress', taskId: string, progress: 0.5 } { type: 'task:complete', taskId: string, result: {...} } { type: 'task:error', taskId: string, error: string } ``` #### 5. Events (Broadcast) ```typescript // Server → subscribed Clients { type: 'event', channel: 'observation:created', data: { id: 123, title: '...' }, timestamp: '2026-01-25T...' } ``` ### Channel-Patterns | Pattern | Events | Typische Subscriber | |---------|--------|---------------------| | `session:*` | started, ended, pre-compact | Browser, Hub | | `task:*` | queued, assigned, completed, failed, progress | Browser, Hub | | `worker:*` | connected, disconnected, spawned, exited | Browser, Hub | | `observation:*` | created, queued | Browser, Hub | | `claudemd:*` | ready | Browser, SSE-Writer | | `summary:*` | created | Browser | | `prompt:*` | new | Browser | | `subagent:*` | start, stop | Browser | ### Permissions nach Client-Typ | Client | Default Permissions | |--------|---------------------| | **Browser** | `subscribe` | | **Worker** | `subscribe`, `task:receive`, `task:complete` | | **Hub** | `subscribe`, `broadcast`, `federate` | ### Migration 1. **Phase 1**: WebSocket-Server erweitern um Channel-Support 2. **Phase 2**: Browser-Client von SSE auf WebSocket migrieren 3. **Phase 3**: SSE-Endpoints deprecaten 4. **Phase 4**: SSE-Endpoints entfernen ### UI-Integration ```typescript // Neuer useWebSocket Hook function useWebSocket(channels: string[]) { const [events, setEvents] = useState<WSEvent[]>([]); const wsRef = useRef<WebSocket>(); useEffect(() => { const ws = new WebSocket('/ws'); ws.onopen = () => { ws.send(JSON.stringify({ type: 'auth' })); ws.send(JSON.stringify({ type: 'subscribe', channels })); }; ws.onmessage = (e) => { const msg = JSON.parse(e.data); if (msg.type === 'event') { setEvents(prev => [...prev, msg]); } }; wsRef.current = ws; return () => ws.close(); }, [channels]); return { events, ws: wsRef.current }; } // Usage function Dashboard() { const { events } = useWebSocket(['session:*', 'task:*', 'worker:*']); // ... } function LiveView() { const { events } = useWebSocket(['*']); // Alle Events // ... } ``` ### Vorteile - **Ein System** statt zwei (SSE + WS) - **Bidirektional** - UI kann mit Backend kommunizieren - **Filtering** - Clients subscriben nur relevante Channels - **Weniger Traffic** - Kein Broadcast aller Events an alle - **Federation-ready** - Hubs können gleichen Kanal nutzen - **Permissions** - Klare Rechte pro Client-Typ - **Debugging** - Ein Protokoll zu analysieren ### Offene Fragen - [ ] WebSocket-Reconnect-Logic im Browser (mit Backoff)? - [ ] Heartbeat für Browser-Clients nötig? - [ ] Rate-Limiting für Subscriptions? - [ ] Message-Buffer bei kurzer Disconnect? ## Abhängigkeiten - Blockt: #263 (WorkerHub Federation) - sollte auf unified WS aufbauen ## Aufwand - Backend: WebSocket-Server erweitern, Channel-Manager - UI: Neuer useWebSocket Hook, SSE-Hook ersetzen - Worker: Minimal (nutzt bereits WS) - Tests: WebSocket-Mocking
Author
Owner

Partial Implementation: useWebSocket Hook

Created packages/ui/src/hooks/useWebSocket.ts with:

  • Channel-based subscriptions (e.g., session:*, task:*)
  • Authentication protocol (auth → auth:success flow)
  • Reconnect with exponential backoff
  • Global state sharing across React components

Message Types:

  • Auth: auth, auth:success, auth:failed
  • Subscriptions: subscribe, unsubscribe, subscribed
  • Events: event (with channel routing)
  • Heartbeat: ping, pong

Commit: 6ef5efe


Next step: Implement backend WebSocket handler with channel routing.

**Partial Implementation: useWebSocket Hook** Created `packages/ui/src/hooks/useWebSocket.ts` with: - Channel-based subscriptions (e.g., `session:*`, `task:*`) - Authentication protocol (auth → auth:success flow) - Reconnect with exponential backoff - Global state sharing across React components **Message Types:** - Auth: `auth`, `auth:success`, `auth:failed` - Subscriptions: `subscribe`, `unsubscribe`, `subscribed` - Events: `event` (with channel routing) - Heartbeat: `ping`, `pong` Commit: `6ef5efe` --- Next step: Implement backend WebSocket handler with channel routing.
Author
Owner

Implementation Complete

Backend:

  • ChannelManager (packages/backend/src/websocket/channel-manager.ts) - Wildcard pattern matching, subscription management
  • WorkerHub integration - subscribe/unsubscribe message handling, channel-based event routing

UI:

  • useWebSocket hook (packages/ui/src/hooks/useWebSocket.ts) - Channel subscriptions, auto-reconnect with exponential backoff, global state sharing

Features implemented:

  • Wildcard patterns (session:*, task:*, *)
  • Auth protocol (auth → auth:success)
  • Subscribe/unsubscribe protocol
  • Heartbeat (ping/pong)
  • Event broadcasting via channels
  • SSE-Writer client type for backward compatibility

Migration Note:
SSE endpoints remain active for backward compatibility. UI components can use either useSSE (legacy) or useWebSocket (new). Full SSE deprecation can be done in a separate cleanup issue.

## Implementation Complete ✅ **Backend:** - `ChannelManager` (`packages/backend/src/websocket/channel-manager.ts`) - Wildcard pattern matching, subscription management - `WorkerHub` integration - subscribe/unsubscribe message handling, channel-based event routing **UI:** - `useWebSocket` hook (`packages/ui/src/hooks/useWebSocket.ts`) - Channel subscriptions, auto-reconnect with exponential backoff, global state sharing **Features implemented:** - ✅ Wildcard patterns (`session:*`, `task:*`, `*`) - ✅ Auth protocol (auth → auth:success) - ✅ Subscribe/unsubscribe protocol - ✅ Heartbeat (ping/pong) - ✅ Event broadcasting via channels - ✅ SSE-Writer client type for backward compatibility **Migration Note:** SSE endpoints remain active for backward compatibility. UI components can use either `useSSE` (legacy) or `useWebSocket` (new). Full SSE deprecation can be done in a separate cleanup issue.
jack closed this issue 2026-01-25 18:34:46 +00:00
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Reference
customable/claude-mem#264
No description provided.