import { isEqual, uniqWith } from "lodash";
import { ReactiveMap } from "libs/EventEmitter";
import {
  BrokenError,
  PermissionError,
  TransactionConflictError,
  ValidationError,
} from "libs/errors";
import { ClientApi, formatResponseError } from "./api";
import { Transaction } from "libs/transaction";
import { RecordPointer, RecordTable } from "libs/schema";
import { ResolvablePromise, resolvablePromise, wait } from "libs/promise-utils";
import { MS_IN_SECOND } from "libs/date-helpers";

type Thunk = { deferred: ResolvablePromise<void>; transaction: Transaction };

function pointerToKey<T extends RecordTable>(pointer: RecordPointer<T>) {
  return [pointer.table, pointer.id].join(":");
}

export class TransactionQueue {
  constructor(
    private args: {
      environment: { api: ClientApi };
      onRollback: (transaction: Transaction) => Promise<void>;
    },
  ) {
    this.loadFromOffline();
  }

  private thunks: Thunk[] = [];

  private pendingWrites = new ReactiveMap<string, number>();

  private incPendingWrites(pointers: RecordPointer[]) {
    const writes = pointers.map(pointerToKey).map((key) => {
      const n = this.pendingWrites.get(key) || 0;
      return { key, value: n + 1 };
    });
    this.pendingWrites.write(writes);
  }

  private decPendingWrites(pointers: RecordPointer[]) {
    const writes = pointers.map(pointerToKey).map((key) => {
      const n = this.pendingWrites.get(key) || 0;
      if (n === 0) console.error("This should never be zero!");

      if (n > 1) return { key, value: n - 1 };
      else return { key, value: undefined };
    });
    this.pendingWrites.write(writes);
  }

  isPendingWrite<T extends RecordTable>(pointer: RecordPointer<T>) {
    const n = this.pendingWrites.get(pointerToKey(pointer)) || 0;
    return n > 0;
  }
  subscribeIsPendingWrite<T extends RecordTable>(
    pointer: RecordPointer<T>,
    fn: (pending: boolean) => void,
  ): () => void {
    let prev = this.isPendingWrite(pointer);
    return this.pendingWrites.subscribe(pointerToKey(pointer), () => {
      const next = this.isPendingWrite(pointer);
      if (prev !== next) {
        prev = next;
        fn(next);
      }
    });
  }

  private saveForOffline() {
    localStorage.setItem(
      "transactions",
      JSON.stringify(this.thunks.map((thunk) => thunk.transaction)),
    );
  }

  private loadFromOffline() {
    const result = localStorage.getItem("transactions");
    if (!result) return;
    const transactions = JSON.parse(result) as Transaction[];
    for (const transaction of transactions) {
      this.enqueue(transaction);
    }
  }

  /** Use write(environment, transaction) instead of this function for optimistic updates. */
  enqueue(transaction: Transaction) {
    // Track which records have a pending write.
    const pointers = uniqWith(
      transaction.operations.map(
        ({ table, id }) => ({ table, id } as RecordPointer),
      ),
      isEqual,
    );

    this.incPendingWrites(pointers);

    // This promise will get resolved once it is submitted.
    const deferred = resolvablePromise<void>();
    this.thunks.push({ deferred, transaction });
    this.saveForOffline();
    this.dequeue();

    return deferred.finally(() => {
      this.decPendingWrites(pointers);
    });
  }

  private running = false;

  async dequeue() {
    if (this.running) return;
    this.running = true;

    // Note: You can also use Websocket status to detect online too.
    while (this.thunks.length && navigator.onLine) {
      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
      const thunk = this.thunks[0]!;
      const result = await this.write(thunk);
      if (result === "ok" || result === "rollback") {
        this.thunks.shift();
        this.saveForOffline();
      }
      if (result === "offline") break;
    }

    this.running = false;
  }

  private async write(thunk: Thunk) {
    const { deferred, transaction } = thunk;
    // Submit and retry.
    while (true) {
      const response = await this.args.environment.api.write(transaction);
      if (response.status === 200) {
        deferred.resolve();
        return "ok";
      }

      if (response.status === ValidationError.statusCode) {
        await this.args.onRollback(transaction);
        deferred.reject(new ValidationError(formatResponseError(response)));
        return "rollback";
      }

      if (response.status === PermissionError.statusCode) {
        await this.args.onRollback(transaction);
        deferred.reject(new PermissionError(formatResponseError(response)));
        return "rollback";
      }

      if (response.status === TransactionConflictError.statusCode) {
        // retry immediately
        continue;
      }

      if (response.status === BrokenError.statusCode) {
        // Wait before retrying.
        await wait(10 * MS_IN_SECOND);
        continue;
      }

      if (response.status === 0) {
        // Offline
        return "offline";
      }

      // Unknown error
      deferred.reject(new Error(response.status + ": " + response.body));
      return "error";
    }
  }
}
