import {
  distinctUntilChanged,
  EMPTY,
  filter,
  firstValueFrom,
  map,
  Observable,
  ObservableInput,
  ObservedValueOf,
  of,
  OperatorFunction,
  reduce,
  switchMap,
  take,
  takeWhile,
  tap,
} from 'rxjs'
import { collect } from '../rxjs'
import { isNotNullOrUndefined } from '../util'
import { ApiResult } from './api-result'
import { CombinedError } from './combined-error'
import { RemoteData } from './remote-data'

export type Constructor<T> = new (...args: any[]) => T

type RemoteDataValueOf<T> = T extends RemoteData<infer D, unknown> ? D : never
type RemoteDataErrorOf<T> = T extends RemoteData<unknown, infer D> ? D : never

/**
/**
 * Applies the tapper on RemoteData.success, otherwise passes on all other states
 *
 * @example
 * ApiResult<CourtSystem>
 *   .pipe(tapData(courtSystem => (this.courtSystemId = courtSystem.uuid)))
 * returns
 * ApiResult<CourtSystem>
 *
 * Notes:
 * - Exceptions = Thrown down the chain
 * - Remote Data Success = Emission (taps into the success)
 * - Remote Data Failures = Emission
 * - Remote Data Loading = Emission
 * - Remote Data Not Started = Emission
 */
export function tapData<TRemoteData extends RemoteData<RemoteDataValueOf<TRemoteData>, RemoteDataErrorOf<TRemoteData>>>(
  onSuccess: (data: RemoteDataValueOf<TRemoteData>) => void,
): OperatorFunction<TRemoteData, TRemoteData> {
  return obs => obs.pipe(tap(remoteData => remoteData.tap(onSuccess)))
}

/**
 * The tapData variant that will happen just once
 */
export function tapDataOnce<
  TRemoteData extends RemoteData<RemoteDataValueOf<TRemoteData>, RemoteDataErrorOf<TRemoteData>>,
>(once: (data: RemoteDataValueOf<TRemoteData>) => void): OperatorFunction<TRemoteData, TRemoteData> {
  let done = false
  return obs =>
    obs.pipe(
      tapData(data => {
        if (!done) {
          once(data)
          done = true
        }
      }),
    )
}

/**
 * The tapDataOnce variant that will wait for the when condition to be true
 */
export function tapDataOnceWhen<
  TRemoteData extends RemoteData<RemoteDataValueOf<TRemoteData>, RemoteDataErrorOf<TRemoteData>>,
>(
  when: (data: RemoteDataValueOf<TRemoteData>) => boolean,
  once: (data: RemoteDataValueOf<TRemoteData>) => void,
): OperatorFunction<TRemoteData, TRemoteData> {
  let done = false
  return obs =>
    obs.pipe(
      tapData(data => {
        if (!done && when(data)) {
          once(data)
          done = true
        }
      }),
    )
}

/**
 * Like tapData, except it depends on another remote data, that we do not want as part of the stream
 *
 * Applies the tapper when both ApiResults are RemoteData.success,
 * otherwise passes on all other states as expected.
 *
 * @example
 * ApiResult<CourtSystem>
 *   .pipe(tapDataWithObservable(cachedCourtSystemId$, (courtSystem, cachedCourtSystemId) => {
 *     if (courtSystem.id === cachedCourtSystemId) {
 *       this.router.navigate()
 *     }
 *   })
 * returns
 * ApiResult<CourtSystem>
 *
 * Notes:
 * - Exceptions = Thrown down the chain
 * - Remote Data Success = Emission (taps into the success if other is also success)
 * - Remote Data Failures = Emission
 * - Remote Data Loading = Emission
 * - Remote Data Not Started = Emission
 */
export function tapDataWithObservable<
  TRemoteData extends RemoteData<RemoteDataValueOf<TRemoteData>, TError>,
  TData2,
  TError extends Error,
>(
  other: ApiResult<TData2, TError>,
  onSuccess: (data: RemoteDataValueOf<TRemoteData>, data2: TData2) => void,
): OperatorFunction<TRemoteData, RemoteData<RemoteDataValueOf<TRemoteData>, CombinedError<TError> | TError>> {
  return obs =>
    obs.pipe(
      switchMapData(data =>
        ApiResult.combine<[RemoteDataValueOf<TRemoteData>, TData2], TError>([ApiResult.success(data), other]),
      ),
      tapData(([data, data2]) => onSuccess(data, data2)),
      mapData(([data]) => data),
    )
}

/**
 * Applies the tapper on RemoteData.failure, otherwise passes on all other states
 *
 *  ApiResult<CourtSystem>
 *   .pipe(tapFailure(error => alert(error)))
 *  returns
 *  ApiResult<CourtSystem>
 *
 * Notes:
 * - Exceptions = Thrown down the chain
 * - Remote Data Success = Emission
 * - Remote Data Failures = Emission (applies onFailure to the error)
 * - Remote Data Loading = Emission
 * - Remote Data Not Started = Emission
 */
export function tapFailure<
  TRemoteData extends RemoteData<RemoteDataValueOf<TRemoteData>, RemoteDataErrorOf<TRemoteData>>,
>(onFailure: (err: RemoteDataErrorOf<TRemoteData>) => void): OperatorFunction<TRemoteData, TRemoteData> {
  return obs => obs.pipe(tap(remoteData => remoteData.tapFailure(onFailure)))
}

/**
 * Applies the mapper on RemoteData.success, otherwise passes on all other states
 *
 * @example:
 *
 * ApiResult<CourtSystem>
 *   .pipe(mapData(courtSystem => courtSystem.uuid))
 * returns
 * ApiResult<Uuid>
 *
 * Notes:
 * - Exceptions = Thrown down the chain
 * - Remote Data Success = Emission (with onSuccess applied)
 * - Remote Data Failures = Emission
 * - Remote Data Loading = Emission
 * - Remote Data Not Started = Emission
 */
export function mapData<
  TRemoteData extends RemoteData<RemoteDataValueOf<TRemoteData>, RemoteDataErrorOf<TRemoteData>>,
  TMappedData,
>(
  onSuccess: (data: RemoteDataValueOf<TRemoteData>) => TMappedData,
): OperatorFunction<TRemoteData, RemoteData<TMappedData, RemoteDataErrorOf<TRemoteData>>> {
  return obs => obs.pipe(map(remoteData => remoteData.map(onSuccess)))
}

/**
 * Identical to Rxjs reduce, except handle ApiResults
 *
 * @example:
 *
 * Array<People>
 *   .pipe(
 *     mergeMap(person => this.throwAxe(person))
 *     reduceData((acc, score) => acc + score, 0)))
 * returns
 * ApiResult<number> (The team score, and yes, lachlan gets a bonus)
 *
 * Notes:
 * - Exceptions = Thrown down the chain
 * - Remote Data Success = Emission (with accumulator applied)
 * - Remote Data Failure = Emits the first failure
 * - Remote Data Loading = Emission
 * - Remote Data Not Started = Emission
 */
export function reduceData<
  TRemoteData extends RemoteData<RemoteDataValueOf<TRemoteData>, RemoteDataErrorOf<TRemoteData>>,
  TReducedData,
>(
  accumulator: (data1: TReducedData, data2: RemoteDataValueOf<TRemoteData>) => TReducedData,
  seed: TReducedData,
): OperatorFunction<TRemoteData, RemoteData<TReducedData, RemoteDataErrorOf<TRemoteData>>> {
  const seedRemote = RemoteData.success(seed) as RemoteData<TReducedData, RemoteDataErrorOf<TRemoteData>>
  return obs => obs.pipe(reduce((a, b) => RemoteData.map2(a, b, accumulator), seedRemote))
}

/**
 * Recovers from undefined data by providing a default result
 *
 * Most useful when for example, your api returns a 204 (No Content)
 */
export function recoverUndefinedData<
  TRemoteData extends RemoteData<RemoteDataValueOf<TRemoteData>, RemoteDataErrorOf<TRemoteData>>,
  TFallback,
>(
  fallback: TFallback,
): OperatorFunction<
  TRemoteData,
  RemoteData<NonNullable<RemoteDataValueOf<TRemoteData>> | TFallback, RemoteDataErrorOf<TRemoteData>>
> {
  return obs => obs.pipe(map(remoteData => remoteData.map(x => (isNotNullOrUndefined(x) ? x : fallback))))
}

/**
 * Attempts to recover from a failure, by running another ApiResult
 *
 * @example:
 *
 * Attempting to retrieve from cache, then making an api call if it fails
 *
 * courtRoomsLocalCache
 *   .pipe(attemptRecover(err => someApi.getCourtrooms()))
 * returns
 * ApiResult<Courtroom[]>
 *
 * Notes:
 * - Exceptions = Thrown down the chain
 * - Remote Data Success = Emission
 * - Remote Data Failures = Emits the result of onFailure
 * - Remote Data Loading = Emission
 * - Remote Data Not Started = Emission
 */
export function attemptRecover<
  TRemoteData extends RemoteData<RemoteDataValueOf<TRemoteData>, RemoteDataErrorOf<TRemoteData>>,
  TRecoverData,
  TRecoverError = Error,
>(
  onFailure: (err: RemoteDataErrorOf<TRemoteData>) => ApiResult<TRecoverData, TRecoverError>,
): OperatorFunction<
  TRemoteData,
  RemoteData<RemoteDataValueOf<TRemoteData> | TRecoverData, RemoteDataErrorOf<TRemoteData> | TRecoverError>
> {
  return obs => obs.pipe(switchMap(remoteData => remoteData.attemptRecover(onFailure)))
}

/**
 * Recovers from any failure, by using the fallback
 *
 * @example:
 *
 * Attempting to retrieve from cache, then making an api call if it fails
 *
 * ApiResult<Courtroom>
 *   .pipe(recover([]))
 * returns
 * ApiResult<Courtroom[]>
 *
 * Notes:
 * - Exceptions = Thrown down the chain
 * - Remote Data Success = Emission
 * - Remote Data Failures = Emits the result of fallback
 * - Remote Data Loading = Emission
 * - Remote Data Not Started = Emission
 */
export function recover<
  TRemoteData extends RemoteData<RemoteDataValueOf<TRemoteData>, RemoteDataErrorOf<TRemoteData>>,
  TRecoverData,
>(
  fallback: TRecoverData,
): OperatorFunction<
  TRemoteData,
  RemoteData<RemoteDataValueOf<TRemoteData> | TRecoverData, RemoteDataErrorOf<TRemoteData>>
> {
  return obs => obs.pipe(attemptRecover(_ => ApiResult.success(fallback)))
}

/**
 * ApiResult<Courtroom[]>
 *   .pipe(recoverType(NoCourtroomsFoundError, []))
 * returns
 * ApiResult<Courtroom[]>
 *
 * Notes:
 * - Exceptions = Thrown down the chain
 * - Remote Data Success = Emission
 * - Remote Data Failure = Emit Success
 * - Remote Data Failures (Not of Type) = Emission
 * - Remote Data Loading = Emission
 * - Remote Data Not Started = Emission
 */
export function recoverType<
  TRemoteData extends RemoteData<RemoteDataValueOf<TRemoteData>, RemoteDataErrorOf<TRemoteData>>,
  TRecoverData,
  TErrorType extends Error,
>(
  ctor: Constructor<TErrorType>,
  fallback: TRecoverData,
): OperatorFunction<
  TRemoteData,
  RemoteData<RemoteDataValueOf<TRemoteData> | TRecoverData, RemoteDataErrorOf<TRemoteData>>
> {
  return obs =>
    obs.pipe(attemptRecover(err => (err instanceof ctor ? ApiResult.success(fallback) : ApiResult.failure(err))))
}

/**
 * Applies the mapper on RemoteData.failure, otherwise passes on all other states
 *
 * @example:
 *
 * ApiResult<CourtSystem, Error>
 *   .pipe(mapFailure(err => err.message))
 * returns
 * ApiResult<CourtSystem, string>
 *
 * Notes:
 * - Exceptions = Thrown down the chain
 * - Remote Data Success = Emission
 * - Remote Data Failures = Emission (with function applied)
 * - Remote Data Loading = Emission
 * - Remote Data Not Started = Emission
 */
export function mapFailure<
  TRemoteData extends RemoteData<RemoteDataValueOf<TRemoteData>, RemoteDataErrorOf<TRemoteData>>,
  TMappedError,
>(
  f: (error: RemoteDataErrorOf<TRemoteData>) => TMappedError,
): OperatorFunction<TRemoteData, RemoteData<RemoteDataValueOf<TRemoteData>, TMappedError>> {
  return obs => obs.pipe(map(remoteData => remoteData.mapFailure(f)))
}

/**
 * Conditionally convert a Success into a Failure.
 *
 * Loading/Failure/NotAsked get passed along without modification.
 *
 * @example:
 *
 * // Turns all even numbers into RemoteData.failure
 * ApiResult<number, Error>.toFailure(n => isEven(n), n => new Error(`Even numbers are not allowed ${n}`))
 * returns
 * ApiResult<number, Error>
 */
export function toFailure<
  TRemoteData extends RemoteData<RemoteDataValueOf<TRemoteData>, RemoteDataErrorOf<TRemoteData>>,
>(
  predicate: (data: RemoteDataValueOf<TRemoteData>) => boolean,
  errorCreator: (data: RemoteDataValueOf<TRemoteData>) => RemoteDataErrorOf<TRemoteData>,
): OperatorFunction<TRemoteData, TRemoteData> {
  return obs => obs.pipe(map(remoteData => remoteData.toFailure(predicate, errorCreator) as TRemoteData))
}

/**
 * Turns ApiResult.success(null) and ApiResult.success(undefined) into a ApiResult.failure(SomeError)
 *
 * @example:
 *
 * // Turning API response 204's into actual errors that the user sees
 * ApiResult<Remarks[] | null, Error>.failUndefinedData(() => new Error(`No remarks for this time period`))
 * returns
 * ApiResult<Remarks[], Error>
 */
export function failUndefinedData<
  TRemoteData extends RemoteData<RemoteDataValueOf<TRemoteData>, RemoteDataErrorOf<TRemoteData>>,
>(
  errorCreator: (data: RemoteDataValueOf<TRemoteData>) => RemoteDataErrorOf<TRemoteData>,
): OperatorFunction<
  TRemoteData,
  RemoteData<NonNullable<RemoteDataValueOf<TRemoteData>>, RemoteDataErrorOf<TRemoteData>>
> {
  return obs =>
    obs.pipe(
      toFailure(t => !isNotNullOrUndefined(t), errorCreator),
      // this mapData doesn't actually do anything except for the cast. I couldn't figure out a better way 😭
      mapData(x => x as NonNullable<RemoteDataValueOf<TRemoteData>>),
    )
}

/**
 * Unwraps the remote data into TData successful value, essentially removing the "Remote Data" part
 *
 * @example
 *
 * ApiResult<CourtSystem>
 *   .pipe(unwrapData())
 * returns
 * Observable<CourtSystem>
 *
 * Notes:
 * - Exceptions = Thrown down the chain
 * - Remote Data Success = Emission
 * - Remote Data Success (when undefined) = No Emission
 * - Remote Data Failures = Emits as an exception
 * - Remote Data Loading = No Emission
 * - Remote Data Not Started = No Emission
 */
export function unwrapData<
  TRemoteData extends RemoteData<RemoteDataValueOf<TRemoteData>, RemoteDataErrorOf<TRemoteData>>,
>(): OperatorFunction<TRemoteData, RemoteDataValueOf<TRemoteData>> {
  return obs => obs.pipe(switchMap(remoteData => remoteData.unwrap()))
}

/**
 * Essentially mapData, except will unwrap to an observable
 *
 * @example
 *
 * `ApiResult<Courtroom>.pipe(
 *     collectData(courtroom => courtroom.courthouse)
 *   ).subscribe(courthouse => console.log(courthouse.id))`
 * becomes
 * Observable<string>
 *
 * Notes:
 * - Exceptions = Thrown down the chain
 * - Remote Data Success = Emission
 * - Remote Data Failures = No Emission
 * - Remote Data Loading = No Emission
 * - Remote Data Not Started = No Emission
 */
export function collectData<
  TRemoteData extends RemoteData<RemoteDataValueOf<TRemoteData>, RemoteDataErrorOf<TRemoteData>>,
  TReturnData,
>(onSuccess: (TData: RemoteDataValueOf<TRemoteData>) => TReturnData): OperatorFunction<TRemoteData, TReturnData> {
  return obs => obs.pipe(unwrapData(), map(onSuccess))
}

/**
 * Essentially mapData, except will not emit when the result of the function is undefined
 *
 * @example
 *
 * `ApiResult<Courtroom>.pipe(
 *     collectData(courtroom => courtroom.courthouse) // courthouse is an optional property
 *   ).subscribe(courthouse => console.log(courthouse.id))`
 *
 * Note: collectData returns an Observable<Courtroom> and can be
 * subscribed without using Required or the ! operator
 *
 * the alternative is something roughly along the lines of
 *
 * `ApiResult<Courtroom>.pipe(
 *      mapData(courtroom => courtroom.courthouse)
 *      filter(({data}) => data),
 *    )
 *    .subscribe((courthouse: Required<Courthouse, 'id'>) => console.log(courthouse.id))`
 *
 *
 * Notes:
 * - Exceptions = Thrown down the chain
 * - Remote Data Success = Emission
 * - Remote Data Success (undefined function return) = No Emission
 * - Remote Data Failures = No Emission
 * - Remote Data Loading = No Emission
 * - Remote Data Not Started = No Emission
 *
 * TODO (FPD-226): There might be use case having 2 variants here, 1 that unwraps, and 1 that doesnt.
 * We may want to filter undefined, but still have an ApiResult return type
 */
export function collectDefinedData<
  TRemoteData extends RemoteData<RemoteDataValueOf<TRemoteData>, RemoteDataErrorOf<TRemoteData>>,
  TReturnData,
>(
  onSuccess: (TData: RemoteDataValueOf<TRemoteData>) => TReturnData | undefined,
): OperatorFunction<TRemoteData, TReturnData> {
  return obs => obs.pipe(unwrapData(), collect(onSuccess))
}

/**
 * Returns a stream of failures.
 * Mostly used for tests, implementation detail of firstErrorFrom()
 *
 * Notes:
 * - Exceptions = Thrown down the chain
 * - Remote Data Success = No Emission
 * - Remote Data Failures = Emits the error
 * - Remote Data Loading = No Emission
 * - Remote Data Not Started = No Emission
 */
export function collectFailures<
  TRemoteData extends RemoteData<RemoteDataValueOf<TRemoteData>, RemoteDataErrorOf<TRemoteData>>,
>(): OperatorFunction<TRemoteData, RemoteDataErrorOf<TRemoteData>> {
  return obs =>
    obs.pipe(
      switchMap(rd => {
        if (rd.isFailure()) {
          return of(rd._error)
        }
        return EMPTY
      }),
    )
}

/**
 * Returns a stream of successes.
 *
 * Mostly used for tests, implementation detail of firstSuccessFrom()
 *
 * Notes:
 * - Exceptions = Thrown down the chain
 * - Remote Data Success = Emits the success
 * - Remote Data Failures = No Emission
 * - Remote Data Loading = No Emission
 * - Remote Data Not Started = No Emission
 */
export function collectSuccesses<
  TRemoteData extends RemoteData<RemoteDataValueOf<TRemoteData>, RemoteDataErrorOf<TRemoteData>>,
>(): OperatorFunction<TRemoteData, RemoteDataValueOf<TRemoteData>> {
  return obs =>
    obs.pipe(
      switchMap(rd => {
        if (rd.isSuccess()) {
          return of(rd._data)
        }
        return EMPTY
      }),
    )
}

/**
 * Returns data while the condition is true, passes through all non-successes
 *
 * Notes:
 * - Exceptions = Thrown down the chain
 * - Remote Data Success = Emits if predicate is met, Completes the stream if not met
 * - Remote Data Failures = Emits
 * - Remote Data Loading = Emits
 * - Remote Data Not Started = Emits
 *
 * @param predicate The predicate to decide when to stop emitting data
 * @param inclusive Whether the value that cause the change in predicate should be emitted
 */
export function takeDataWhile<
  TRemoteData extends RemoteData<RemoteDataValueOf<TRemoteData>, RemoteDataErrorOf<TRemoteData>>,
>(
  predicate: (value: RemoteDataValueOf<TRemoteData>, index: number) => boolean,
  inclusive = false,
): OperatorFunction<TRemoteData, TRemoteData> {
  return obs =>
    obs.pipe(
      takeWhile((rd, index) => {
        if (rd.isSuccess()) {
          return predicate(rd._data, index)
        }
        return true
      }, inclusive),
    )
}

/**
 * Allows you to chain api calls, where the second api call, depends on TData successful response of the first.
 *
 * OrderIdQueryParam.pipe(
 *     switchMap(orderId => someApi.getOrderById(orderId))
 *     switchMapData(order => someApi.getCourtSystemById(order.courtSystemId))
 *   )
 *   returns
 *   ApiResult<CourtSystem>
 */
export function switchMapData<TRemoteData extends RemoteData<any, any>, TData, TError>(
  onSuccess: (data: RemoteDataValueOf<TRemoteData>) => Observable<RemoteData<TData, TError>>,
): OperatorFunction<TRemoteData, RemoteData<TData, TError>> {
  return obs => obs.pipe(switchMap(remoteData => remoteData.traverse(onSuccess)))
}

/**
 * Returns everything until the first success and then no longer emits
 *
 * Notes:
 * - Exceptions = Thrown down the chain
 * - Remote Data Success = Emits the first success then completes
 * - Remote Data Failures = Emits
 * - Remote Data Loading = Emits
 * - Remote Data Not Started = Emits
 */
export function completeAfterFirstSuccess<
  TRemoteData extends RemoteData<RemoteDataValueOf<TRemoteData>, RemoteDataErrorOf<TRemoteData>>,
>(): OperatorFunction<TRemoteData, TRemoteData> {
  return obs => obs.pipe(takeDataWhile(_ => false, true))
}

/**
 * Emits all loading/failure/notAsked until we get our first success,
 * then filters the rest out, only emitting successes
 */
export function skipFutureNonSuccess<
  TRemoteData extends RemoteData<RemoteDataValueOf<TRemoteData>, RemoteDataErrorOf<TRemoteData>>,
>(): OperatorFunction<TRemoteData, TRemoteData> {
  let hasSentSuccess = false
  return obs =>
    obs.pipe(
      switchMap(remoteData => {
        if (remoteData.isSuccess()) {
          hasSentSuccess = true
          return of(remoteData)
        }

        if (!hasSentSuccess) {
          return of(remoteData)
        }
        return EMPTY
      }),
    )
}

/**
 * Emits all events until we get our first success,
 * then filters out loading/notAsked, only emitting success/failure
 */
export function skipFutureLoading<
  TRemoteData extends RemoteData<RemoteDataValueOf<TRemoteData>, RemoteDataErrorOf<TRemoteData>>,
>(): OperatorFunction<TRemoteData, TRemoteData> {
  let hasSentSuccess = false
  return obs =>
    obs.pipe(
      switchMap(remoteData => {
        if (remoteData.isSuccess() || remoteData.isFailure()) {
          hasSentSuccess = true
          return of(remoteData)
        }

        if (!hasSentSuccess) {
          return of(remoteData)
        }
        return EMPTY
      }),
    )
}

/**
 * `RemoteData` implementation of `switchCombine`
 * @param onSuccess
 * @see switchCombine
 */
export function switchCombineData<
  TDataIn extends Record<string, any>,
  TDataOut extends Record<string, Observable<RemoteData<unknown>>>,
  TError extends Error = Error,
>(
  onSuccess: (data: TDataIn) => TDataOut,
): OperatorFunction<
  RemoteData<TDataIn, TError>,
  RemoteData<
    TDataIn & {
      [Name in keyof TDataOut]: RemoteDataValueOf<ObservedValueOf<TDataOut[Name]>>
    },
    CombinedError<TError> | TError
  >
> {
  return obs =>
    obs.pipe(
      switchMapData(values =>
        ApiResult.combine(onSuccess(values)).pipe(
          switchMapData(newValues => ApiResult.success({ ...values, ...newValues })),
        ),
      ),
    )
}

/**
 * A cross between switchCombine and switchCombineData
 *
 * Given an Observable<A> and a function A => Record<T, ApiResult<???>>
 * returns an object with all the resolved record values as a single ApiResult
 *
 * eg.
 *
 * of(courthouseId) // Observable<Uuid>
 *  .pipe(switchCombineApiResults(courthouseId => ({
 *    courthouse: this.courthouseService.get(courthouseId), // ApiResult<Courthouse>
 *    courtrooms: this.courthouseService.get(courthouseId), // ApiResult<Courtroom[]>
 *  })))
 *
 *  returns
 *
 *  ApiResult<{
 *    courthouse: Courthouse,
 *    courtrooms: Courtroom[]
 *  }>
 *
 */
export function switchCombineApiResults<
  T,
  TDataOut extends Record<string, ObservableInput<RemoteData<unknown, TError>>>,
  TError extends Error = Error,
>(
  project: (data: T) => TDataOut,
): OperatorFunction<
  T,
  RemoteData<
    {
      [name in keyof TDataOut]: RemoteDataValueOf<ObservedValueOf<TDataOut[name]>>
    },
    CombinedError<TError>
  >
> {
  return obs =>
    obs.pipe(
      switchMap(values => {
        const apiResult = ApiResult.combine(project(values))
        // Typescript infers that this is ApiResult<Record<string, TDataOut[name] | unknown>>, but we know better.
        // Unknown is not actually possible, it's just the default type of ApiResult
        return apiResult as ApiResult<
          {
            [name in keyof TDataOut]: RemoteDataValueOf<ObservedValueOf<TDataOut[name]>>
          },
          CombinedError<TError>
        >
      }),
    )
}

/**
 * Distinct until changed, but for remote data
 * @param dataComparator how to compare data
 * @param errorComparator how to compare errors
 */
export function distinctUntilDataChanged<
  TRemoteData extends RemoteData<RemoteDataValueOf<TRemoteData>, RemoteDataErrorOf<TRemoteData>>,
>(
  dataComparator: (previous: RemoteDataValueOf<TRemoteData>, current: RemoteDataValueOf<TRemoteData>) => boolean = (
    a,
    b,
  ) => a === b,
  errorComparator: (previous: RemoteDataErrorOf<TRemoteData>, current: RemoteDataErrorOf<TRemoteData>) => boolean = (
    a,
    b,
  ) => a === b,
): OperatorFunction<TRemoteData, TRemoteData> {
  return obs =>
    obs.pipe(
      distinctUntilChanged((remoteA, remoteB) => {
        if (remoteA.isSuccess() && remoteB.isSuccess()) {
          return dataComparator(remoteA._data, remoteB._data)
        } else if (remoteA.isFailure() && remoteB.isFailure()) {
          return errorComparator(remoteA._error, remoteB._error)
        } else if (remoteA.isLoading() && remoteB.isLoading()) {
          return true
        } else if (remoteA.isNotAsked() && remoteB.isNotAsked()) {
          return true
        }
        return false
      }),
    )
}

/**
 * Returns the first ApiResult.success's underlying data.
 *
 * @throws If the stream completes without emitting a success
 */
export function firstSuccessFrom<TData, TError>(source: ApiResult<TData, TError>): Promise<TData> {
  return firstValueFrom(source.pipe(collectSuccesses()))
}

/**
 * Returns the first ApiResult.success's underlying data.
 * If the stream results in a failure, it will throw the error
 */
export function firstResultFrom<TData, TError>(source: ApiResult<TData, TError>): Promise<TData> {
  return firstValueFrom(source.pipe(unwrapData()))
}

/**
 * Returns the first n ApiResult.success's underlying data.
 *
 * @throws If the stream completes without emitting a success
 */
export function firstNSuccessFrom<TData, TError>(source: ApiResult<TData, TError>, n: number): Promise<TData[]> {
  return firstValueFrom(
    source.pipe(collectSuccesses()).pipe(
      take(n),
      reduce((acc, v) => [...acc, v], [] as TData[]),
    ),
  )
}

/**
 * Returns the first ApiResult.failure's underlying error.
 *
 * @throws If the stream completes without emitting a success
 */
export function firstFailureFrom<TData, TError>(source: ApiResult<TData, TError>): Promise<TError> {
  return firstValueFrom(source.pipe(collectFailures()))
}

export function filterData<
  TRemoteData extends RemoteData<RemoteDataValueOf<TRemoteData>, RemoteDataErrorOf<TRemoteData>>,
  TResultData extends RemoteDataValueOf<TRemoteData>,
>(
  predicate: (data: RemoteDataValueOf<TRemoteData>) => data is TResultData,
): OperatorFunction<TRemoteData, RemoteData<TResultData, RemoteDataErrorOf<TRemoteData>>> {
  return obs =>
    obs.pipe(
      filter((remoteData): boolean => remoteData.map(predicate).unwrapOr(true)),
      map(remoteData => remoteData as unknown as RemoteData<TResultData, RemoteDataErrorOf<TRemoteData>>),
    )
}
