feat(worker): Task deduplication to prevent queue overload #207

Closed
opened 2026-01-24 17:15:00 +00:00 by jack · 0 comments
Owner

Problem

CLAUDE.md Generation queued zu viele Tasks:

  • Jede N Observations queued Root + bis zu 5 Subdirectories
  • Keine Deduplizierung: gleiche Subdirectory mehrfach gequeued
  • Queue füllt sich mit Low-Priority Tasks, blockiert wichtigere Arbeit
// Aktuell: Jedes Mal neue Tasks
await taskQueue.create({ type: 'claude-md', payload: { path: '/project' } });
await taskQueue.create({ type: 'claude-md', payload: { path: '/project/src' } });
// ... gleiche Tasks werden wiederholt erstellt

Hinweis: Dieses Issue verwendet die neuen Tabellennamen aus #197 (Database Schema Redesign).

Abhängigkeit

Sollte nach #197 umgesetzt werden, da dort task_queue zu tasks umbenannt wird.

Lösung

1. Task Entity mit Deduplication Key erweitern

// packages/database/src/entities/Task.ts
@Entity({ tableName: 'tasks' })
export class Task {
  // ... existing fields
  
  @Property({ fieldName: 'deduplication_key', nullable: true })
  @Index()
  deduplicationKey?: string;
}

2. Migration mit MikroORM CLI generieren

cd packages/database

# Migration aus Entity-Änderung generieren
npx mikro-orm migration:create --name=add_task_deduplication

# Output:
# Migration20260126000001_add_task_deduplication.ts successfully created

# Pending Migrations anzeigen
npx mikro-orm migration:pending

# ┌─────────────────────────────────────────────────────┐
# │ Name                                                │
# ├─────────────────────────────────────────────────────┤
# │ Migration20260126000001_add_task_deduplication      │
# └─────────────────────────────────────────────────────┘

Die generierte Migration sollte etwa so aussehen:

import { Migration } from '@mikro-orm/migrations';

export class Migration20260126000001_add_task_deduplication extends Migration {
  override async up(): Promise<void> {
    this.addSql('ALTER TABLE `tasks` ADD COLUMN `deduplication_key` TEXT');
    this.addSql('CREATE INDEX `idx_tasks_deduplication_key` ON `tasks` (`deduplication_key`)');
  }

  override async down(): Promise<void> {
    this.addSql('DROP INDEX `idx_tasks_deduplication_key`');
    // SQLite unterstützt kein DROP COLUMN direkt, ggf. Workaround nötig
  }
}

3. Migration ausführen

# Migration anwenden
npx mikro-orm migration:up

# Output:
# Processing 'Migration20260126000001_add_task_deduplication'
# Applied 'Migration20260126000001_add_task_deduplication'
# Successfully migrated up to the latest version

4. Dedup Key Generierung

// packages/backend/src/utils/task-dedup.ts
import { createHash } from 'crypto';

function generateDeduplicationKey(type: TaskType, payload: unknown): string {
  const normalized = JSON.stringify(sortKeys(payload));
  return `${type}:${createHash('sha256').update(normalized).digest('hex').slice(0, 16)}`;
}

5. Create with Deduplication

// TaskRepository
async createIfNotExists(input: CreateTaskInput): Promise<Task | null> {
  const dedupKey = generateDeduplicationKey(input.type, input.payload);
  
  const existing = await this.findOne({
    deduplicationKey: dedupKey,
    status: { $in: ['pending', 'assigned', 'processing'] }
  });
  
  if (existing) {
    logger.debug('Task deduplicated', { type: input.type, existingId: existing.id });
    return null;
  }
  
  return this.create({ ...input, deduplicationKey: dedupKey });
}

6. Sliding Window für CLAUDE.md

const claudeMdCooldowns = new Map<string, number>();
const COOLDOWN_MS = 60000;  // 1 Minute

async maybeQueueClaudeMd(sessionId: string, paths: string[]): Promise<void> {
  const lastQueued = claudeMdCooldowns.get(sessionId) || 0;
  
  if (Date.now() - lastQueued < COOLDOWN_MS) {
    logger.debug('CLAUDE.md task skipped (cooldown)', { sessionId });
    return;
  }
  
  await this.queueClaudeMdTasks(sessionId, paths);
  claudeMdCooldowns.set(sessionId, Date.now());
}

Auswirkung

Metrik Vorher Nachher
claude-md Tasks/Session 10-50 1-5
Queue Depth 100+ 10-20
Worker Utilization Verschwendet Effizient

Akzeptanzkriterien

  • Task Entity mit deduplicationKey Feld erweitert
  • Migration mit npx mikro-orm migration:create generiert
  • createIfNotExists Methode im Repository
  • Index für effiziente Dedup-Lookups
  • Sliding Window für CLAUDE.md
  • Konfigurierbare Cooldown-Zeit
  • Logging für deduplizierte Tasks
## Problem CLAUDE.md Generation queued zu viele Tasks: - Jede N Observations queued Root + bis zu 5 Subdirectories - Keine Deduplizierung: gleiche Subdirectory mehrfach gequeued - Queue füllt sich mit Low-Priority Tasks, blockiert wichtigere Arbeit ```typescript // Aktuell: Jedes Mal neue Tasks await taskQueue.create({ type: 'claude-md', payload: { path: '/project' } }); await taskQueue.create({ type: 'claude-md', payload: { path: '/project/src' } }); // ... gleiche Tasks werden wiederholt erstellt ``` > **Hinweis:** Dieses Issue verwendet die neuen Tabellennamen aus #197 (Database Schema Redesign). ## Abhängigkeit Sollte nach #197 umgesetzt werden, da dort `task_queue` zu `tasks` umbenannt wird. ## Lösung ### 1. Task Entity mit Deduplication Key erweitern ```typescript // packages/database/src/entities/Task.ts @Entity({ tableName: 'tasks' }) export class Task { // ... existing fields @Property({ fieldName: 'deduplication_key', nullable: true }) @Index() deduplicationKey?: string; } ``` ### 2. Migration mit MikroORM CLI generieren ```bash cd packages/database # Migration aus Entity-Änderung generieren npx mikro-orm migration:create --name=add_task_deduplication # Output: # Migration20260126000001_add_task_deduplication.ts successfully created # Pending Migrations anzeigen npx mikro-orm migration:pending # ┌─────────────────────────────────────────────────────┐ # │ Name │ # ├─────────────────────────────────────────────────────┤ # │ Migration20260126000001_add_task_deduplication │ # └─────────────────────────────────────────────────────┘ ``` Die generierte Migration sollte etwa so aussehen: ```typescript import { Migration } from '@mikro-orm/migrations'; export class Migration20260126000001_add_task_deduplication extends Migration { override async up(): Promise<void> { this.addSql('ALTER TABLE `tasks` ADD COLUMN `deduplication_key` TEXT'); this.addSql('CREATE INDEX `idx_tasks_deduplication_key` ON `tasks` (`deduplication_key`)'); } override async down(): Promise<void> { this.addSql('DROP INDEX `idx_tasks_deduplication_key`'); // SQLite unterstützt kein DROP COLUMN direkt, ggf. Workaround nötig } } ``` ### 3. Migration ausführen ```bash # Migration anwenden npx mikro-orm migration:up # Output: # Processing 'Migration20260126000001_add_task_deduplication' # Applied 'Migration20260126000001_add_task_deduplication' # Successfully migrated up to the latest version ``` ### 4. Dedup Key Generierung ```typescript // packages/backend/src/utils/task-dedup.ts import { createHash } from 'crypto'; function generateDeduplicationKey(type: TaskType, payload: unknown): string { const normalized = JSON.stringify(sortKeys(payload)); return `${type}:${createHash('sha256').update(normalized).digest('hex').slice(0, 16)}`; } ``` ### 5. Create with Deduplication ```typescript // TaskRepository async createIfNotExists(input: CreateTaskInput): Promise<Task | null> { const dedupKey = generateDeduplicationKey(input.type, input.payload); const existing = await this.findOne({ deduplicationKey: dedupKey, status: { $in: ['pending', 'assigned', 'processing'] } }); if (existing) { logger.debug('Task deduplicated', { type: input.type, existingId: existing.id }); return null; } return this.create({ ...input, deduplicationKey: dedupKey }); } ``` ### 6. Sliding Window für CLAUDE.md ```typescript const claudeMdCooldowns = new Map<string, number>(); const COOLDOWN_MS = 60000; // 1 Minute async maybeQueueClaudeMd(sessionId: string, paths: string[]): Promise<void> { const lastQueued = claudeMdCooldowns.get(sessionId) || 0; if (Date.now() - lastQueued < COOLDOWN_MS) { logger.debug('CLAUDE.md task skipped (cooldown)', { sessionId }); return; } await this.queueClaudeMdTasks(sessionId, paths); claudeMdCooldowns.set(sessionId, Date.now()); } ``` ## Auswirkung | Metrik | Vorher | Nachher | |--------|--------|---------| | claude-md Tasks/Session | 10-50 | 1-5 | | Queue Depth | 100+ | 10-20 | | Worker Utilization | Verschwendet | Effizient | ## Akzeptanzkriterien - [ ] Task Entity mit `deduplicationKey` Feld erweitert - [ ] Migration mit `npx mikro-orm migration:create` generiert - [ ] `createIfNotExists` Methode im Repository - [ ] Index für effiziente Dedup-Lookups - [ ] Sliding Window für CLAUDE.md - [ ] Konfigurierbare Cooldown-Zeit - [ ] Logging für deduplizierte Tasks
jack closed this issue 2026-01-25 10:51:25 +00:00
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Reference
customable/claude-mem#207
No description provided.