/*
 * Decompiled with CFR 0.152.
 */
package fs2.interop.flow;

import cats.effect.IO;
import cats.effect.IO$;
import cats.effect.kernel.Async;
import cats.effect.kernel.Resource;
import cats.effect.std.Dispatcher;
import cats.effect.unsafe.IORuntime;
import fs2.Compiler$;
import fs2.Compiler$Target$;
import fs2.Stream;
import fs2.interop.flow.StreamPublisher$;
import fs2.interop.flow.StreamPublisher$CanceledStreamPublisherException$;
import fs2.interop.flow.StreamSubscription;
import fs2.interop.flow.StreamSubscription$;
import java.util.Objects;
import java.util.concurrent.Flow;
import java.util.concurrent.RejectedExecutionException;
import scala.runtime.BoxedUnit;

public abstract class StreamPublisher<F, A>
implements Flow.Publisher<A> {
    private final Stream<F, A> stream;
    private final Async<F> F;

    public static <F, A> Resource<F, StreamPublisher<F, A>> apply(Stream<F, A> stream, Async<F> async) {
        return StreamPublisher$.MODULE$.apply(stream, async);
    }

    public static <A> StreamPublisher<IO, A> unsafe(Stream<IO, A> stream, IORuntime iORuntime) {
        return StreamPublisher$.MODULE$.unsafe(stream, iORuntime);
    }

    public StreamPublisher(Stream<F, A> stream, Async<F> F) {
        this.stream = stream;
        this.F = F;
    }

    public abstract void runSubscription(F var1);

    @Override
    public final void subscribe(Flow.Subscriber<? super A> subscriber) {
        Objects.requireNonNull(subscriber, "The subscriber provided to subscribe must not be null");
        StreamSubscription<F, A> subscription = StreamSubscription$.MODULE$.apply(this.stream, subscriber, this.F);
        subscriber.onSubscribe(subscription);
        try {
            this.runSubscription(subscription.run().compile(Compiler$.MODULE$.target(Compiler$Target$.MODULE$.forConcurrent(this.F))).drain());
        }
        catch (Throwable throwable) {
            Throwable throwable2 = throwable;
            if (throwable2 instanceof IllegalStateException || throwable2 instanceof RejectedExecutionException) {
                subscriber.onError(StreamPublisher$CanceledStreamPublisherException$.MODULE$);
            }
            throw throwable;
        }
    }

    public static final class DispatcherStreamPublisher<F, A>
    extends StreamPublisher<F, A> {
        private final Dispatcher<F> dispatcher;

        public DispatcherStreamPublisher(Stream<F, A> stream, Dispatcher<F> dispatcher, Async<F> F) {
            this.dispatcher = dispatcher;
            super(stream, F);
        }

        @Override
        public final void runSubscription(F run) {
            this.dispatcher.unsafeRunAndForget(run);
        }
    }

    public static final class IORuntimeStreamPublisher<A>
    extends StreamPublisher<IO, A> {
        private final IORuntime runtime;

        public IORuntimeStreamPublisher(Stream<IO, A> stream, IORuntime runtime) {
            this.runtime = runtime;
            super(stream, IO$.MODULE$.asyncForIO());
        }

        @Override
        public final void runSubscription(IO<BoxedUnit> run) {
            run.unsafeRunAndForget(this.runtime);
        }
    }
}

