#!/usr/local/bin/php
<?php
/**
 * Pherb — NATS JetStream Consumer Daemon
 *
 * Long-running process that consumes transcription jobs from NATS JetStream,
 * orchestrates whisper/pyannote/wav2vec2 services, and stores results.
 *
 * @author Daniel Morante
 * @copyright 2026 The Daniel Morante Company, Inc.
 * @license BSD-2-Clause
 */

declare(ticks=1);

require_once __DIR__ . '/system/bootstrap.inc.php';

use Enchilada\Config\IniConfig;
use Basis\Nats\Client as NatsClient;
use Basis\Nats\Configuration as NatsConfiguration;
use Pherb\JobStore;
use Pherb\PipelineDispatcher;
use Pherb\TranscriptMerger;
use Pherb\OutputFormatter;

/**
 * Pherb Job Consumer
 */
class PherbConsumer
{
    use \Enchilada\Daemon\DaemonBehavior;

    private JobStore $jobStore;
    private PipelineDispatcher $dispatcher;
    private TranscriptMerger $merger;
    private OutputFormatter $formatter;
    private string $audioBasePath;
    private string $outputPath;
    private string $natsHost;
    private int $natsPort;
    private int $cleanupTtlDays;
    private ?IniConfig $settings;
    private ?EnchiladaOTLP $otlp = null;

    /** @var array<string, object> JetStream messages held until pipeline completes */
    private array $activeMessages = [];
    private int $lastCleanupTime = 0;

    /** @var array<string, float> Stage start timestamps for duration tracking */
    private array $stageStartTimes = [];

    public function __construct(?IniConfig $settings = null)
    {
        $this->settings = $settings;

        // Storage paths
        $this->audioBasePath = getenv('PHERB_AUDIO_PATH') ?: ($settings ? $settings->getString('storage', 'audio_path', '/data/audio') : '/data/audio');
        $this->outputPath = getenv('PHERB_OUTPUT_PATH') ?: ($settings ? $settings->getString('storage', 'output_path', '/data/audio/outputs') : '/data/audio/outputs');

        // NATS
        $this->natsHost = getenv('NATS_HOST') ?: ($settings ? $settings->getString('nats', 'host', '127.0.0.1') : '127.0.0.1');
        $this->natsPort = (int)(getenv('NATS_PORT') ?: ($settings ? $settings->getInt('nats', 'port', 4222) : 4222));

        // Cleanup
        $this->cleanupTtlDays = (int)(getenv('CLEANUP_TTL_DAYS') ?: ($settings ? $settings->getInt('cleanup', 'ttl_days', 7) : 7));

        // Initialize service clients
        $this->dispatcher = new PipelineDispatcher($this->natsHost, $this->natsPort);
        $this->merger = new TranscriptMerger();
        $this->formatter = new OutputFormatter();

        // Initialize daemon behavior
        $this->initDaemon([
            'health_file' => getenv('HEALTH_FILE') ?: '/var/run/pherb-consumer.health',
            'max_memory_mb' => (int)(getenv('MAX_MEMORY_MB') ?: 256),
        ]);

        // Initialize OTLP telemetry (optional — disabled if endpoint is empty)
        $otlpEndpoint = getenv('OTLP_ENDPOINT') ?: ($settings ? $settings->getString('otlp', 'endpoint', '') : '');
        $otlpEnv = getenv('OTLP_ENVIRONMENT') ?: ($settings ? $settings->getString('otlp', 'environment', 'production') : 'production');
        if (!empty($otlpEndpoint)) {
            $this->otlp = new EnchiladaOTLP(
                endpoint: $otlpEndpoint,
                resource: [
                    'service.name' => 'pherb-consumer',
                    'service.namespace' => 'pherb',
                    'deployment.environment' => $otlpEnv,
                    'host.name' => gethostname() ?: 'unknown',
                ],
                scopeName: 'pherb.pipeline',
                scopeVersion: '1.0.0',
                batchSize: 1,
                timeout: 3,
                suppressErrors: true
            );
        }
    }

    /**
     * Dispatch a new job — starts or resumes the pipeline.
     *
     * Checks for existing intermediate stage outputs on disk.
     * If found, resumes from the last completed stage instead of
     * restarting from convert. This avoids re-running expensive
     * stages (e.g., 40+ minute whisper inference) after a failure
     * in a later stage.
     */
    private function dispatchJob(string $jobId, string $audioPath, array $options): void
    {
        $fullPath = $this->audioBasePath . '/' . ltrim($audioPath, '/');

        if (!file_exists($fullPath)) {
            throw new \RuntimeException("Audio file not found: {$fullPath}");
        }

        // Track pipeline start time
        $this->stageStartTimes["{$jobId}:pipeline"] = microtime(true);

        // Check for existing intermediate outputs to enable resume
        $convertOutput = $this->outputPath . "/{$jobId}.convert.wav";
        $whisperOutput = $this->outputPath . "/{$jobId}.whisper.json";
        $pyannoteOutput = $this->outputPath . "/{$jobId}.pyannote.json";
        $alignOutput = $this->outputPath . "/{$jobId}.align.json";

        $opts = is_array($options) ? $options : [];
        $diarize = (bool)($opts['diarize'] ?? true);
        $align = (bool)($opts['align'] ?? false);

        // Find the latest completed stage by checking outputs on disk
        if (file_exists($pyannoteOutput) && filesize($pyannoteOutput) > 0) {
            // Pyannote done — resume at alignment or finalize
            if ($align && !file_exists($alignOutput)) {
                $this->logDaemon("[{$jobId}] Resuming from pyannote (alignment pending)");
                $this->advancePipeline($jobId, 'pyannote', $pyannoteOutput);
            } else {
                $this->logDaemon("[{$jobId}] All stages present, finalizing");
                $this->finalizePipeline($jobId);
            }
        } elseif (file_exists($whisperOutput) && filesize($whisperOutput) > 0) {
            // Whisper done — resume at pyannote (or alignment/finalize)
            $this->logDaemon("[{$jobId}] Resuming from whisper (skipping convert+whisper)");
            $this->advancePipeline($jobId, 'whisper', $whisperOutput);
        } elseif (file_exists($convertOutput) && filesize($convertOutput) > 0) {
            // Convert done — resume at whisper
            $this->logDaemon("[{$jobId}] Resuming from convert (skipping convert)");
            $this->advancePipeline($jobId, 'convert', $convertOutput);
        } else {
            // No intermediate outputs — start from scratch
            $this->logDaemon("[{$jobId}] Dispatching convert stage: {$audioPath}");
            $this->dispatcher->dispatchConvert($jobId, $fullPath, $convertOutput);
            $this->jobStore->updateStage($jobId, 'convert');
            $this->stageStartTimes["{$jobId}:convert"] = microtime(true);
        }

        $this->otlpEmit('Pipeline started', EnchiladaOTLP::SEVERITY_INFO, [
            'job.id' => $jobId,
            'job.audio_path' => $audioPath,
            'job.stage' => 'convert',
            'job.event' => 'pipeline.started',
        ]);
    }

    /**
     * Handle pipeline stage completion events from JetStream.
     *
     * The worker publishes to pherb.pipeline.completed with a stage field.
     * JetStream captures it durably. This method routes to the next stage
     * or finalizes the job, then ACKs the completion message.
     *
     * On terminal events (failure or final stage), also ACKs and removes
     * the held job message from $activeMessages.
     */
    private function handlePipelineCompleted(array $event, object $completionMessage): void
    {
        $jobId = $event['job_id'] ?? '';
        $status = $event['status'] ?? '';
        $stage = $event['stage'] ?? '';
        $outputPath = $event['output_path'] ?? '';

        if (empty($jobId)) {
            $this->logDaemon("Ignoring completion event with missing job_id");
            $completionMessage->ack();
            return;
        }

        // Handle stage failure — terminal event
        if ($status === 'failed') {
            $error = $event['error'] ?? 'unknown error';
            $this->logDaemon("[{$jobId}] Stage '{$stage}' failed: {$error}");
            $this->jobStore->markFailed($jobId, ucfirst($stage) . ": {$error}");
            $this->incrementMetric('errors');

            $stageDuration = $this->popStageDuration($jobId, $stage);
            $pipelineDuration = $this->popStageDuration($jobId, 'pipeline');
            $this->otlpEmit('Stage failed', EnchiladaOTLP::SEVERITY_ERROR, [
                'job.id' => $jobId,
                'job.stage' => $stage,
                'job.event' => 'stage.failed',
                'job.error' => $error,
                'job.stage_duration_ms' => $stageDuration,
                'job.pipeline_duration_ms' => $pipelineDuration,
            ]);

            $this->ackAndRelease($jobId, $completionMessage);
            return;
        }

        if ($status !== 'completed' || empty($outputPath)) {
            $this->logDaemon("[{$jobId}] Unexpected status from stage '{$stage}': {$status}");
            $completionMessage->ack();
            return;
        }

        $this->logDaemon("[{$jobId}] Stage '{$stage}' completed, output at {$outputPath}");

        $stageDuration = $this->popStageDuration($jobId, $stage);
        $this->otlpEmit('Stage completed', EnchiladaOTLP::SEVERITY_INFO, [
            'job.id' => $jobId,
            'job.stage' => $stage,
            'job.event' => 'stage.completed',
            'job.stage_duration_ms' => $stageDuration,
        ]);

        try {
            $this->advancePipeline($jobId, $stage, $outputPath);
        } catch (\Throwable $e) {
            $this->logDaemon("[{$jobId}] Pipeline advance FAILED: " . $e->getMessage());
            $this->jobStore->markFailed($jobId, $e->getMessage());
            $this->incrementMetric('errors');
            $this->ackAndRelease($jobId, $completionMessage);
            return;
        }

        // ACK the completion event (stage processed successfully)
        $completionMessage->ack();
        $this->recordActivity();
    }

    /**
     * ACK completion message and release the held job message.
     *
     * Called on terminal events (pipeline complete or failed) to ACK
     * both the completion event and the original job message, allowing
     * JetStream to remove them from the stream.
     */
    private function ackAndRelease(string $jobId, object $completionMessage): void
    {
        $completionMessage->ack();

        if (isset($this->activeMessages[$jobId])) {
            $this->activeMessages[$jobId]->ack();
            unset($this->activeMessages[$jobId]);
        }
    }

    /**
     * Advance the pipeline after a stage completes.
     *
     * State machine:
     *   convert completed  → dispatch whisper
     *   whisper completed  → dispatch pyannote (if diarize=true) OR finalize
     *   pyannote completed → dispatch alignment (if align=true) OR finalize
     *   alignment completed → finalize
     */
    private function advancePipeline(string $jobId, string $completedStage, string $outputPath): void
    {
        // Retrieve job record for options
        $job = $this->jobStore->get($jobId);
        if (!$job) {
            throw new \RuntimeException("Job record not found: {$jobId}");
        }

        $opts = is_array($job['options']) ? $job['options'] : (json_decode($job['options'] ?? '{}', true) ?: []);
        $diarize = (bool)($opts['diarize'] ?? true);
        $align = (bool)($opts['align'] ?? false);
        $model = $opts['model'] ?? 'medium.en';
        $threads = (int)($opts['threads'] ?? 8);
        $modelsDir = $opts['models_dir'] ?? '/models/whisper';

        // The converted WAV is the audio input for all subsequent stages
        $wavPath = $this->outputPath . "/{$jobId}.convert.wav";

        switch ($completedStage) {
            case 'convert':
                $whisperOutput = $this->outputPath . "/{$jobId}.whisper.json";
                $this->logDaemon("[{$jobId}] Dispatching whisper stage (model={$model}, threads={$threads})");
                $this->dispatcher->dispatchWhisper($jobId, $wavPath, $whisperOutput, $model, $modelsDir, $threads);
                $this->jobStore->updateStage($jobId, 'whisper');
                $this->stageStartTimes["{$jobId}:whisper"] = microtime(true);
                break;

            case 'whisper':
                if ($diarize) {
                    $pyannoteOutput = $this->outputPath . "/{$jobId}.pyannote.json";
                    $this->logDaemon("[{$jobId}] Dispatching pyannote stage");
                    $this->dispatcher->dispatchPyannote($jobId, $wavPath, $pyannoteOutput);
                    $this->jobStore->updateStage($jobId, 'pyannote');
                    $this->stageStartTimes["{$jobId}:pyannote"] = microtime(true);
                } elseif ($align) {
                    $alignOutput = $this->outputPath . "/{$jobId}.align.json";
                    $this->logDaemon("[{$jobId}] Dispatching alignment stage");
                    $this->dispatcher->dispatchAlignment($jobId, $wavPath, $alignOutput);
                    $this->jobStore->updateStage($jobId, 'alignment');
                    $this->stageStartTimes["{$jobId}:alignment"] = microtime(true);
                } else {
                    $this->finalizePipeline($jobId);
                }
                break;

            case 'pyannote':
                if ($align) {
                    $alignOutput = $this->outputPath . "/{$jobId}.align.json";
                    $this->logDaemon("[{$jobId}] Dispatching alignment stage");
                    $this->dispatcher->dispatchAlignment($jobId, $wavPath, $alignOutput);
                    $this->jobStore->updateStage($jobId, 'alignment');
                    $this->stageStartTimes["{$jobId}:alignment"] = microtime(true);
                } else {
                    $this->finalizePipeline($jobId);
                }
                break;

            case 'alignment':
                $this->finalizePipeline($jobId);
                break;

            default:
                $this->logDaemon("[{$jobId}] Unknown completed stage: {$completedStage}");
                break;
        }
    }

    /**
     * Finalize the pipeline — merge all stage outputs, format, and write result.
     *
     * Reads whisper + pyannote + alignment outputs from shared disk,
     * merges them, formats the final output, and marks the job completed.
     */
    private function finalizePipeline(string $jobId): void
    {
        $this->logDaemon("[{$jobId}] Finalizing pipeline");

        // Retrieve job record for options
        $job = $this->jobStore->get($jobId);
        if (!$job) {
            throw new \RuntimeException("Job record not found: {$jobId}");
        }

        $opts = is_array($job['options']) ? $job['options'] : (json_decode($job['options'] ?? '{}', true) ?: []);
        $audioPath = $job['audio_path'] ?? '';
        $diarize = (bool)($opts['diarize'] ?? true);
        $format = $opts['format'] ?? 'json';
        $model = $opts['model'] ?? 'medium.en';

        // Read whisper output (always present)
        $whisperFile = $this->outputPath . "/{$jobId}.whisper.json";
        if (!file_exists($whisperFile)) {
            throw new \RuntimeException("Whisper output not found: {$whisperFile}");
        }

        $rawData = json_decode(file_get_contents($whisperFile), true);
        if (!is_array($rawData)) {
            throw new \RuntimeException("Invalid whisper output JSON");
        }

        $whisperResult = $this->normalizeWhisperOutput($rawData);
        $this->logDaemon("[{$jobId}] Normalized: " . count($whisperResult['segments'] ?? []) . " segments");

        // Read pyannote output (if exists)
        $speakerSegments = [];
        $pyannoteFile = $this->outputPath . "/{$jobId}.pyannote.json";
        if (file_exists($pyannoteFile)) {
            $pyannoteData = json_decode(file_get_contents($pyannoteFile), true);
            $speakerSegments = $pyannoteData['segments'] ?? $pyannoteData ?? [];
            $this->logDaemon("[{$jobId}] Pyannote: " . count($speakerSegments) . " speaker segments");
        }

        // Read alignment output (if exists)
        $alignFile = $this->outputPath . "/{$jobId}.align.json";
        if (file_exists($alignFile)) {
            $alignData = json_decode(file_get_contents($alignFile), true);
            $alignedWords = $alignData['words'] ?? $alignData ?? [];
            $whisperResult = $this->applyAlignment($whisperResult, $alignedWords);
            $this->logDaemon("[{$jobId}] Applied alignment data");
        }

        // Merge transcript with speaker info
        $segments = ($diarize && !empty($speakerSegments))
            ? $this->merger->merge($whisperResult, $speakerSegments)
            : $this->segmentsWithoutDiarization($whisperResult);

        // Format output
        $outputContent = match ($format) {
            'srt' => $this->formatter->toSrt($segments),
            'vtt' => $this->formatter->toVtt($segments),
            default => $this->formatter->toJson($segments, [
                'job_id' => $jobId,
                'audio' => $audioPath,
                'model' => $model,
                'duration_seconds' => $this->estimateDuration($segments),
            ]),
        };

        // Write final output file
        $ext = match ($format) {
            'srt' => 'srt',
            'vtt' => 'vtt',
            default => 'json',
        };

        if (!is_dir($this->outputPath)) {
            @mkdir($this->outputPath, 0755, true);
        }

        $outputFile = $this->outputPath . "/{$jobId}.{$ext}";
        file_put_contents($outputFile, $outputContent);

        $this->logDaemon("[{$jobId}] Final output written: {$outputFile}");

        // Clean up intermediate stage outputs
        $convertFile = $this->outputPath . "/{$jobId}.convert.wav";
        @unlink($convertFile);
        @unlink($whisperFile);
        @unlink($pyannoteFile);
        @unlink($alignFile);

        // Update job status
        $this->jobStore->markCompleted($jobId, $outputFile);
        $this->incrementMetric('processed');
        echo "[" . date('H:i:s') . "] Job {$jobId} completed\n";

        $pipelineDuration = $this->popStageDuration($jobId, 'pipeline');
        $this->otlpEmit('Pipeline completed', EnchiladaOTLP::SEVERITY_INFO, [
            'job.id' => $jobId,
            'job.event' => 'pipeline.completed',
            'job.pipeline_duration_ms' => $pipelineDuration,
            'job.format' => $format,
            'job.model' => $model,
            'job.diarize' => $diarize,
            'job.segments' => count($segments),
            'job.duration_seconds' => $this->estimateDuration($segments),
        ]);

        // ACK and release the held JetStream job message — pipeline complete
        if (isset($this->activeMessages[$jobId])) {
            $this->activeMessages[$jobId]->ack();
            unset($this->activeMessages[$jobId]);
        }

        // Webhook callback
        $job = $this->jobStore->get($jobId);
        if (!empty($job['callback_url'])) {
            $this->sendCallback($job['callback_url'], $job);
        }
    }

    /**
     * Normalize whisper-cli --output-json-full format to standard segments format.
     */
    private function normalizeWhisperOutput(array $data): array
    {
        if (isset($data['segments'])) {
            return $data;
        }

        if (!isset($data['transcription'])) {
            return $data;
        }

        $segments = [];
        foreach ($data['transcription'] as $seg) {
            $words = [];
            foreach ($seg['tokens'] ?? [] as $token) {
                $text = trim($token['text'] ?? '');
                if ($text === '' || $text === '[BLANK_AUDIO]') {
                    continue;
                }
                $words[] = [
                    'word' => $text,
                    'start' => ($token['offsets']['from'] ?? 0) / 1000.0,
                    'end' => ($token['offsets']['to'] ?? 0) / 1000.0,
                    'probability' => (float)($token['p'] ?? 0),
                ];
            }

            $segments[] = [
                'start' => ($seg['offsets']['from'] ?? 0) / 1000.0,
                'end' => ($seg['offsets']['to'] ?? 0) / 1000.0,
                'text' => $seg['text'] ?? '',
                'words' => $words,
            ];
        }

        return ['segments' => $segments];
    }

    /**
     * Convert Whisper result to segments without diarization (single speaker).
     */
    private function segmentsWithoutDiarization(array $whisperResult): array
    {
        $segments = [];
        foreach ($whisperResult['segments'] ?? [] as $seg) {
            $words = [];
            foreach ($seg['words'] ?? [] as $w) {
                $words[] = [
                    'word' => trim($w['word'] ?? $w['text'] ?? ''),
                    'start' => (float)($w['start'] ?? 0),
                    'end' => (float)($w['end'] ?? 0),
                    'probability' => (float)($w['probability'] ?? $w['confidence'] ?? 0),
                    'speaker' => 'SPEAKER_00',
                ];
            }
            $segments[] = [
                'start' => (float)($seg['start'] ?? 0),
                'end' => (float)($seg['end'] ?? 0),
                'speaker' => 'SPEAKER_00',
                'text' => $seg['text'] ?? '',
                'words' => $words,
            ];
        }
        return $segments;
    }

    /**
     * Extract words for alignment service.
     */
    private function extractWordsForAlignment(array $whisperResult): array
    {
        $words = [];
        foreach ($whisperResult['segments'] ?? [] as $seg) {
            foreach ($seg['words'] ?? [] as $w) {
                $words[] = [
                    'word' => trim($w['word'] ?? $w['text'] ?? ''),
                    'start' => (float)($w['start'] ?? 0),
                    'end' => (float)($w['end'] ?? 0),
                ];
            }
        }
        return $words;
    }

    /**
     * Apply alignment results back to whisper output.
     */
    private function applyAlignment(array $whisperResult, array $alignedWords): array
    {
        $alignIndex = 0;
        foreach ($whisperResult['segments'] as &$seg) {
            foreach ($seg['words'] as &$w) {
                if (isset($alignedWords[$alignIndex])) {
                    $w['start'] = $alignedWords[$alignIndex]['start'] ?? $w['start'];
                    $w['end'] = $alignedWords[$alignIndex]['end'] ?? $w['end'];
                }
                $alignIndex++;
            }
        }
        return $whisperResult;
    }

    /**
     * Estimate total audio duration from segments.
     */
    private function estimateDuration(array $segments): float
    {
        if (empty($segments)) return 0.0;
        $last = end($segments);
        return (float)($last['end'] ?? 0);
    }

    /**
     * Send webhook callback.
     */
    private function sendCallback(string $url, array $job): void
    {
        $payload = json_encode([
            'job_id' => $job['id'],
            'status' => $job['status'],
            'result_path' => $job['result_path'],
            'completed_at' => $job['completed_at'],
        ]);

        $ch = curl_init($url);
        curl_setopt_array($ch, [
            CURLOPT_POST => true,
            CURLOPT_POSTFIELDS => $payload,
            CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_TIMEOUT => 10,
            CURLOPT_USERAGENT => APPLICATION_USERAGENT,
        ]);

        $response = curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        curl_close($ch);

        if ($httpCode >= 200 && $httpCode < 300) {
            $this->logDaemon("[{$job['id']}] Callback sent to {$url}: HTTP {$httpCode}");
        } else {
            $this->logDaemon("[{$job['id']}] Callback failed to {$url}: HTTP {$httpCode}");
        }
    }

    /**
     * Emit an OTLP log record (no-op if OTLP is not configured).
     */
    private function otlpEmit(string $body, int $severity = EnchiladaOTLP::SEVERITY_INFO, array $attributes = []): void
    {
        if ($this->otlp !== null) {
            $this->otlp->emit($body, $severity, $attributes);
        }
    }

    /**
     * Pop a stage start time and return the elapsed duration in milliseconds.
     * Returns 0 if no start time was recorded.
     */
    private function popStageDuration(string $jobId, string $stage): int
    {
        $key = "{$jobId}:{$stage}";
        if (!isset($this->stageStartTimes[$key])) {
            return 0;
        }
        $elapsed = (microtime(true) - $this->stageStartTimes[$key]) * 1000;
        unset($this->stageStartTimes[$key]);
        return (int)round($elapsed);
    }

    /**
     * Create and connect NATS JetStream client.
     */
    private function createNatsClient(): NatsClient
    {
        $config = new NatsConfiguration(
            host: $this->natsHost,
            port: $this->natsPort,
            timeout: 2.0,
        );

        $client = new NatsClient($config);
        $client->setName('pherb-consumer');
        $client->skipInvalidMessages(true);

        // Verify connectivity
        $client->ping();

        return $client;
    }

    /**
     * Main run loop.
     *
     * Architecture: NATS-idiomatic with deferred ACK.
     *
     * - Job messages are held (not ACKed) until the entire pipeline completes.
     *   JetStream ack_wait + InProgress() heartbeats prevent redelivery while
     *   the pipeline is alive. If the consumer dies, ack_wait expires and
     *   JetStream redelivers automatically — no stale check needed.
     *
     * - Completion events are durable via JetStream (not ephemeral plain NATS).
     *   If the consumer disconnects mid-pipeline, completions wait in the stream
     *   and are delivered when the consumer reconnects.
     */
    public function run(): int
    {
        echo "Pherb Transcription Consumer\n";
        echo "============================\n";
        echo "Pipeline: dispatch via NATS (PipelineDispatcher)\n";
        echo "NATS: {$this->natsHost}:{$this->natsPort}\n";
        echo "Audio: {$this->audioBasePath}\n";
        echo "Output: {$this->outputPath}\n";
        echo "PID: " . getmypid() . "\n\n";

        // Auto-migrate database
        $pdo = pherb_create_mariadb($this->settings);
        $schemaDir = APPLICATION_ROOT . 'schema';
        $runner = new \Enchilada\SQL\MigrationRunner($pdo, $schemaDir);
        $result = $runner->applyPending();
        if (!empty($result['applied'])) {
            foreach ($result['applied'] as $name) {
                echo "Migration applied: {$name}\n";
            }
        }

        $this->jobStore = new JobStore($pdo);

        // Connect to NATS
        $client = null;
        $jobQueue = null;
        $completionQueue = null;
        $reconnectDelay = 1;

        while ($this->daemonTick()) {
            // Ensure NATS connection + JetStream consumers
            if ($client === null || $jobQueue === null || $completionQueue === null) {
                try {
                    $client = $this->createNatsClient();

                    // Get or create stream capturing both jobs and pipeline events
                    $stream = $client->getApi()->getStream('PHERB');
                    $stream->getConfiguration()->setSubjects(['pherb.jobs.>', 'pherb.pipeline.>']);
                    $stream->createIfNotExists();
                    $stream->update();

                    // Durable consumer for new jobs — deferred ACK (ack_wait=120s)
                    $jobConsumer = $stream->getConsumer('pherb-jobs');
                    $jobConfig = $jobConsumer->getConfiguration();
                    $jobConfig->setAckWait(120_000_000_000); // 120s — InProgress extends this
                    $jobConfig->setMaxDeliver(3);
                    $jobConfig->setSubjectFilter('pherb.jobs.transcribe');
                    $jobConsumer->create();
                    $jobConsumer->setExpires(2.0);
                    $jobConsumer->setBatching(1);
                    $jobQueue = $jobConsumer->getQueue();

                    // Durable consumer for pipeline completion events
                    $completionConsumer = $stream->getConsumer('pherb-pipeline');
                    $completionConfig = $completionConsumer->getConfiguration();
                    $completionConfig->setAckWait(30_000_000_000); // 30s
                    $completionConfig->setMaxDeliver(5);
                    $completionConfig->setSubjectFilter('pherb.pipeline.completed');
                    $completionConsumer->create();
                    $completionConsumer->setExpires(2.0);
                    $completionConsumer->setBatching(5);
                    $completionQueue = $completionConsumer->getQueue();

                    $reconnectDelay = 1;
                    echo "Connected to NATS JetStream, waiting for jobs + completions...\n\n";
                } catch (\Throwable $e) {
                    $this->logDaemon("NATS connect failed: " . $e->getMessage() . " (retry in {$reconnectDelay}s)");
                    $client = null;
                    $jobQueue = null;
                    $completionQueue = null;
                    sleep(min($reconnectDelay, 30));
                    $reconnectDelay *= 2;
                    continue;
                }
            }

            // 1. Process pipeline completion events from JetStream (durable)
            try {
                $completions = $completionQueue->fetchAll(5);
            } catch (\Throwable $e) {
                $this->logDaemon("Completion fetch error: " . $e->getMessage() . " (reconnecting)");
                $client = null;
                $jobQueue = null;
                $completionQueue = null;
                continue;
            }

            foreach ($completions as $completionMsg) {
                $payload = $completionMsg->payload;
                if ($payload->isEmpty()) {
                    continue;
                }

                $event = json_decode($payload->body, true);
                if (is_array($event)) {
                    $this->handlePipelineCompleted($event, $completionMsg);
                } else {
                    $completionMsg->ack();
                }
            }

            // 2. Send InProgress heartbeats for all held job messages
            foreach ($this->activeMessages as $jobId => $heldMessage) {
                try {
                    $heldMessage->progress();
                } catch (\Throwable $e) {
                    $this->logDaemon("[{$jobId}] InProgress failed: " . $e->getMessage());
                }
            }

            // 3. Pull new jobs from JetStream and dispatch first pipeline stage
            try {
                $messages = $jobQueue->fetchAll(1);
            } catch (\Throwable $e) {
                $this->logDaemon("Job fetch error: " . $e->getMessage() . " (reconnecting)");
                $client = null;
                $jobQueue = null;
                $completionQueue = null;
                continue;
            }

            foreach ($messages as $message) {
                $payload = $message->payload;
                if ($payload->isEmpty()) {
                    continue;
                }

                $data = json_decode($payload->body, true);
                if (!$data || empty($data['job_id'])) {
                    $this->logDaemon("Invalid message received: " . substr($payload->body, 0, 200));
                    $message->ack();
                    continue;
                }

                $jobId = $data['job_id'];
                $audioPath = $data['audio_path'] ?? '';
                $options = $data['options'] ?? [];

                try {
                    $affected = $this->jobStore->markProcessing($jobId);
                    if ($affected === 0) {
                        $this->logDaemon("[{$jobId}] Skipping: not in queued state");
                        $message->ack();
                        continue;
                    }

                    // Dispatch first pipeline stage and HOLD the message (deferred ACK)
                    $this->dispatchJob($jobId, $audioPath, $options);
                    $this->activeMessages[$jobId] = $message;
                    $this->logDaemon("[{$jobId}] Dispatched pipeline, holding JetStream message");
                } catch (\Throwable $e) {
                    $this->logDaemon("[{$jobId}] Dispatch FAILED: " . $e->getMessage());
                    $this->jobStore->markFailed($jobId, "Dispatch: " . $e->getMessage());
                    $message->ack();
                    $this->incrementMetric('errors');
                }

                $this->recordActivity();
            }

            // 4. Periodic housekeeping — purge old completed/failed jobs (DB + files)
            $now = time();
            if (($now - $this->lastCleanupTime) >= 3600) {
                $this->lastCleanupTime = $now;
                try {
                    $deleted = $this->jobStore->cleanup($this->cleanupTtlDays, $this->outputPath);
                    if (!empty($deleted)) {
                        $this->logDaemon("Cleanup: purged " . count($deleted) . " expired jobs (TTL={$this->cleanupTtlDays}d)");
                    }
                } catch (\Throwable $e) {
                    $this->logDaemon("Cleanup error: " . $e->getMessage());
                }
            }
        }

        if ($client) {
            try { $client->disconnect(); } catch (\Throwable $e) {}
        }
        $this->cleanupDaemon();
        return 0;
    }
}

// --- Main ---

/** @var IniConfig|null $SETTINGS */
global $SETTINGS;

$consumer = new PherbConsumer($SETTINGS);
exit($consumer->run());
