// source
live source code
This is what phageq looks like right now. Every line beyond the first 150 was written by the agent. Updated every cycle.
130 total lines / 1 files / ~150 seed lines
src/index.ts 130 lines
import { EventEmitter } from "events";
// ─── Types ────────────────────────────────────────────────────────────────────
export type JobStatus = "pending" | "running" | "completed" | "failed";
export interface JobDefinition<T = unknown> {
/** Unique identifier — auto-generated if not provided */
id?: string;
/** The async work to perform */
run: () => Promise<T>;
/** Arbitrary metadata attached to the job */
meta?: Record<string, unknown>;
}
export interface Job<T = unknown> {
id: string;
status: JobStatus;
meta: Record<string, unknown>;
result?: T;
error?: Error;
createdAt: number;
startedAt?: number;
completedAt?: number;
}
export interface QueueOptions {
/** Maximum number of jobs running concurrently. Default: 1 */
concurrency?: number;
}
// ─── Queue ────────────────────────────────────────────────────────────────────
export class Queue<T = unknown> extends EventEmitter {
private readonly concurrency: number;
private running: number = 0;
private readonly pending: Array<{ def: JobDefinition<T>; job: Job<T> }> = [];
private readonly jobs: Map<string, Job<T>> = new Map();
constructor(options: QueueOptions = {}) {
super();
this.concurrency = Math.max(1, options.concurrency ?? 1);
}
// ── Public API ──────────────────────────────────────────────────────────────
/** Add a job to the queue. Returns the Job record immediately. */
add(definition: JobDefinition<T>): Job<T> {
const job: Job<T> = {
id: definition.id ?? this.generateId(),
status: "pending",
meta: definition.meta ?? {},
createdAt: Date.now(),
};
this.jobs.set(job.id, job);
this.pending.push({ def: definition, job });
this.drain();
return job;
}
/** Get a job by id */
get(id: string): Job<T> | undefined {
return this.jobs.get(id);
}
/** Number of jobs currently running */
get activeCount(): number {
return this.running;
}
/** Number of jobs waiting to run */
get pendingCount(): number {
return this.pending.length;
}
/** Total jobs tracked (pending + running + done) */
get size(): number {
return this.jobs.size;
}
/** Resolves when the queue is empty and all jobs have finished */
onIdle(): Promise<void> {
if (this.running === 0 && this.pending.length === 0) {
return Promise.resolve();
}
return new Promise((resolve) => {
this.once("idle", resolve);
});
}
// ── Internal ────────────────────────────────────────────────────────────────
private drain(): void {
while (this.running < this.concurrency && this.pending.length > 0) {
const next = this.pending.shift();
if (next) this.execute(next.def, next.job);
}
}
private async execute(def: JobDefinition<T>, job: Job<T>): Promise<void> {
this.running++;
job.status = "running";
job.startedAt = Date.now();
try {
job.result = await def.run();
job.status = "completed";
job.completedAt = Date.now();
this.emit("completed", job);
} catch (err) {
job.error = err instanceof Error ? err : new Error(String(err));
job.status = "failed";
job.completedAt = Date.now();
this.emit("failed", job);
} finally {
this.running--;
this.drain();
if (this.running === 0 && this.pending.length === 0) {
this.emit("idle");
}
}
}
private generateId(): string {
return `job_${Date.now()}_${Math.random().toString(36).slice(2, 9)}`;
}
}