import {
  Observable,
  OperatorFunction,
  concat,
  interval,
  take,
  skip,
  catchError,
  switchMap,
  BehaviorSubject,
  pairwise,
  map,
  NEVER,
  Subscription,
  combineLatest,
  filter,
  finalize,
  share,
  defer,
  of,
  shareReplay,
} from "rxjs";
import { arrayChange } from "./array-utils";

/**
 * rxjs has a built-in `startWith` operator however the
 * built in operator is seriously flawed. The built-in
 * operator accepts a static value _on observable creation_
 * and uses that static value when the observable is
 * subscribed to. This is surprising because, in practice,
 * you ~~generally~~ always
 * want to get the startWith value when the observable is
 * _subscribed to_, not when it is created.
 *
 * I actually opened an issue about this years ago and the rxjs
 * maintainers have acknowledged the problems with `startWith()`
 * but they indicated that they aren't interested changing
 * startWith because it would be a large-ish breaking change
 * which they don't feel is worthwhile. We should always use
 * this custom startwith operator in our app.
 *
 * @param fns a spread of functions which will be called, in
 *   order, when the observable is subscribed to to get the
 *   initial values for the subscription.
 * @returns rxjs operator
 */
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function startWith<T extends any[]>(
  ...fns: ReadonlyArray<() => never[]>
): OperatorFunction<T, T>;
export function startWith<T, A = T>(
  ...fns: ReadonlyArray<() => A>
): OperatorFunction<T, T | A>;
export function startWith<T, A = T>(
  ...fns: ReadonlyArray<() => A>
): OperatorFunction<T, T | A> {
  return (source) => {
    return new Observable((observer) => {
      fns.forEach((fn) => {
        try {
          observer.next(fn());
        } catch (e) {
          observer.error(e);
        }
      });

      return source.subscribe(observer);
    });
  };
}

/**
 * @param msDelay e.g. 1000
 * @returns observable that will complete without emitting after
 *   the specified delay
 */
export function wait$(msDelay: number): Observable<never>;
/**
 * @param msDelay e.g. 1000
 * @param observable observable that will be subscribed
 *   to after the specified delay
 * @returns observable that will be subscribed
 *   to after the specified delay
 */
export function wait$<T>(
  msDelay: number,
  observable: Observable<T>,
): Observable<T>;
export function wait$<T>(msDelay: number, observable?: Observable<T>) {
  const wait = interval(msDelay).pipe(take(1), skip(1)) as Observable<never>;

  return observable ? concat(wait, observable) : wait;
}

export class FailedPredicateError<T> extends Error {
  constructor(
    public source$: Observable<T>,
    public predicate: (value: T) => boolean,
    public label?: string,
  ) {
    super(`Failed ${label} assertPredicate check`);
  }
}

export function assertPredicate<T, P extends T>(
  name: string,
  predicate: (value: T) => value is P,
): OperatorFunction<T, P>;
export function assertPredicate<T>(
  name: string,
  predicate: (value: T) => boolean,
): OperatorFunction<T, T>;
export function assertPredicate<T>(
  name: string,
  predicate: (value: T) => boolean,
): OperatorFunction<T, T> {
  return (source) =>
    new Observable((observer) => {
      return source.subscribe({
        next(value) {
          if (!predicate(value)) {
            observer.error(new FailedPredicateError(source, predicate, name));
          } else {
            observer.next(value);
          }
        },
        error(err) {
          observer.error(err);
        },
        complete() {
          observer.complete();
        },
      });
    });
}

export function catchFailedPredicateError<I, T>(
  name: string,
  defaultValue: () => T,
): (
  source: Observable<I>,
) => I extends unknown[]
  ? T extends never[]
    ? Observable<I>
    : Observable<I | T>
  : Observable<I | T>;
export function catchFailedPredicateError<I>(
  name: string,
  defaultValue: () => unknown,
): (source: Observable<I>) => Observable<I>;
export function catchFailedPredicateError<I, T>(
  name: string,
  defaultValue: () => T,
): (source: Observable<I>) => Observable<I | T> {
  return (source$: Observable<I>) => {
    function handleError(err: unknown): Observable<I | T> {
      if (err instanceof FailedPredicateError && name === err.label) {
        return defaultValue
          ? err.source$.pipe(
              switchMap((v) =>
                err.predicate(v)
                  ? source$
                  : NEVER.pipe(startWith(defaultValue)),
              ),
              catchError(handleError),
            )
          : err.source$.pipe(
              switchMap((v) => (err.predicate(v) ? source$ : NEVER)),
              catchError(handleError),
            );
      }

      throw err;
    }

    return source$.pipe(catchError(handleError));
  };
}

/**
 * Applies a mapper function to each entry of an input observable.
 * The input observable must return an array of objects containing
 * an ID property. This function aims to be more performent than
 * other methods by ensuring that documents are only mapped when
 * they have been changed. Note that the mapper function accepts
 * an observable argument which emits a single entry. You should
 * apply distinctUntilChanged to the retun value of the mapper
 * function you provide.
 */
// eslint-disable-next-line @typescript-eslint/no-unused-vars
function mapIndividualEntries<T extends { id: string }, R>(
  input: Observable<T[]>,
  mapper: (entry$: Observable<T>) => Observable<R>,
): Observable<R[]> {
  const store = new Map<
    string,
    { input: BehaviorSubject<T>; result: Observable<R> }
  >();

  const allowEmissions$ = new BehaviorSubject(true);

  const inputWithChanges = input.pipe(
    startWith(() => []),
    pairwise(),
    map(([prev, curr]) => {
      const { added, removed } = arrayChange(
        curr,
        prev,
        (a, b) => a.id === b.id,
      );

      return {
        currentValue: curr,
        added,
        removed,
      };
    }),
  );

  const result = new BehaviorSubject<Observable<R>[]>([]);

  let subscription: Subscription | undefined;

  return defer(() => {
    subscription = inputWithChanges.subscribe(
      ({ currentValue, added, removed }) => {
        allowEmissions$.next(false);

        for (const entry of removed) {
          store.delete(entry.id);
        }

        for (const [entryId, entry] of Array.from(store.entries())) {
          const value = currentValue.find((v) => v.id === entryId);

          if (!value) {
            console.error("mapIndividualEntries() expected entry missing");
            continue;
          }

          entry.input.next(value);
        }

        for (const entry of added) {
          const input = new BehaviorSubject(entry);

          store.set(entry.id, {
            input,
            result: mapper(input),
          });
        }

        if (added.length > 0 || removed.length > 0) {
          // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
          result.next(currentValue.map((value) => store.get(value.id)!.result));
        }

        allowEmissions$.next(true);
      },
    );

    return result;
  }).pipe(
    switchMap((values) => {
      if (values.length === 0) return of([true] as const);
      return combineLatest([allowEmissions$, ...values]);
    }),
    filter(([allowEmissions]) => allowEmissions),
    map(([, ...value]) => value),
    finalize(() => {
      subscription?.unsubscribe();
      subscription = undefined;
    }),
    share(),
  );
}

/**
 * This operator is like shareReplay except, after all subscribers have
 * unsubscribed, it will maintain the replay buffer for a specified
 * amount of time before clearing it. This is useful for cases where
 * you want to maintain a replay buffer for a period of time
 * after the last subscriber has unsubscribed.
 */
export function cacheReplayForTime(options: {
  timeMs: number;
  bufferSize?: number;
  onInit?: () => void;
  onCleanup?: () => void;
}) {
  return <T>(source: Observable<T>) => {
    let cachedSubscription: Subscription | undefined;
    let timeout: ReturnType<typeof setTimeout> | undefined;

    const shared = source.pipe(
      shareReplay({
        refCount: true,
        bufferSize: options.bufferSize || 1,
      }),
    );

    let subscriberCount = 0;

    return new Observable<T>((observer) => {
      if (!cachedSubscription) {
        options.onInit?.();
        cachedSubscription = shared.subscribe();
      }

      if (timeout) {
        clearTimeout(timeout);
        timeout = undefined;
      }

      subscriberCount++;
      const sub = shared.subscribe(observer);
      let teardownCalled = false;

      return () => {
        if (teardownCalled) return;
        teardownCalled = true;
        sub.unsubscribe();
        subscriberCount--;

        if (subscriberCount <= 0) {
          timeout = setTimeout(() => {
            cachedSubscription?.unsubscribe();
            cachedSubscription = undefined;
            options.onCleanup?.();
          }, options.timeMs);
        }
      };
    });
  };
}
