import equal from "fast-deep-equal/es6";
import { VideoPipelineName, VideoPipelineStats } from "../../../../shared/Models/VideoPipeline.js";
import { dumpMediaStream } from "../../../../shared/helpers/dumpMediaStream.js";
import { logger } from "../../../../shared/infra/logger.js";
import { PostProcSyncObjectCache } from "../../PostProcObjectCache.js";
import { IVideoPipelineExecutor } from "../interfaces/IVideoPipelineExecutor.js";
import { PostProcVideoSpec } from "../interfaces/IVideoPipelineLauncher.js";
import { PipelineError, PipelineErrorHandler } from "../interfaces/VideoPipelineError.js";
import { zeroVideoPipelineStats } from "../pipelineStats.js";
import { VideoWorkerProxy } from "../worker/VideoWorkerProxy.js";

/*
 * VideoWorkerPipelineExecutorProxy runs in the main JavaScript thread to arrange for a video stream
 * processed by a pipeline running in a worker. The worker itself (a web worker with a VideoWorker
 * instantiated in it) is created and can be communicated with through a VideoWorkerProxy, which
 * must be retrievable from the workerCache passed to the constructor.
 *
 * When update() is called, VideoWorkerPipelineExecutorProxy arranges for the supplied input stream to
 * be sent to the worker and returns an output stream that will have the results of processing.
 * Subsequent calls to update() cause new settings to be sent to the worker and, if necessary, the
 * streams to be recreated.
 *
 * This is a lightweight wrapper object. If video is muted or postprocessing is not needed it
 * should be destroyed by calling close() and a new one created when video is unmuted or
 * postprocessing is again needed.
 *
 */
export class VideoWorkerPipelineExecutorProxy implements IVideoPipelineExecutor {
  private spec?: PostProcVideoSpec;
  private inputStream?: MediaStream;
  private outputStream?: MediaStream;
  private worker: VideoWorkerProxy;

  private stats: VideoPipelineStats;
  private closed = false;

  constructor(
    private streamId: string,
    private pipelineName: VideoPipelineName,
    private workerCache: PostProcSyncObjectCache<VideoWorkerProxy>,
    private onPipelineError: PipelineErrorHandler
  ) {
    this.stats = zeroVideoPipelineStats(pipelineName, streamId);
    this.worker = workerCache.get();
    if (!this.worker) {
      throw new Error("unable to get worker from cache");
    }
    this.worker.setCallbacks(
      (err) => this.onPipelineError(err),
      (stats) => (this.stats = stats)
    );

    if (
      typeof MediaStreamTrackProcessor === "undefined" ||
      typeof MediaStreamTrackGenerator === "undefined" ||
      typeof TransformStream === "undefined"
    ) {
      throw new Error("WorkerVideoPipeline requires insertable streams");
    }
  }

  public async update(spec: PostProcVideoSpec, rawStream: MediaStream): Promise<MediaStream> {
    if (this.closed) {
      throw new Error("attempt to update a closed VideoWorkerPipelineExecutorProxy");
    }

    if (equal(spec, this.spec) && rawStream === this.inputStream && this.outputStream) {
      // No modifications to make; just return the output stream
      return this.outputStream;
    }

    const oldSpec = this.spec;
    this.spec = spec;

    const inputTrack = rawStream?.getVideoTracks()[0];

    if (!inputTrack) {
      throw new Error("no video track found on rawStream");
    }

    const height = inputTrack.getSettings().height || 480;
    const width = inputTrack.getSettings().width || 640;

    await this.worker.waitForWorker();

    // If the inputStream has changed, send it and a new outputStream to the worker, which will tear
    // down and replace its insertableStreams pipeline
    if (this.inputStream !== rawStream) {
      // The worker should close the outputStream when it tears down the pipeline, but just
      // for safety's sake, stop the tracks here too
      this.outputStream?.getTracks().forEach((t) => t.stop());

      this.inputStream = rawStream;
      const { readable } = new MediaStreamTrackProcessor({ track: inputTrack });
      const inputReadable = readable;

      const generator = new MediaStreamTrackGenerator({ kind: "video" });
      const outputWritable = generator.writable;
      this.outputStream = new MediaStream([generator]);

      this.worker.setStreamsAndSpec(
        spec,
        { width, height },
        inputReadable,
        outputWritable,
        this.streamId
      );
    } else {
      if (!this.outputStream) {
        throw new PipelineError("update should always have an outputStream");
      }
      this.worker.setSpec(spec);
    }

    return this.outputStream;
  }

  public async close() {
    if (this.closed) return;
    this.closed = true;
    this.worker
      .stop()
      .then(() => {
        this.workerCache.put(this.worker);
      })
      .catch((err) => {
        logger.error({ err }, "error stopping worker. destroying.");
        this.worker.cleanup();
      });
    // Destroy the captured stream
    this.outputStream?.getTracks().forEach((t) => t.stop());
  }

  public getStats() {
    return this.stats;
  }

  public dump(): any {
    return {
      inputStream: this.inputStream ? dumpMediaStream(this.inputStream) : undefined,
      outputStream: this.outputStream ? dumpMediaStream(this.outputStream) : undefined,
      spec: this.spec,
    };
  }
}
