package com.amazon.athena.client.results;

import com.amazon.athena.logging.AthenaLogger;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import lombok.NonNull;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.services.athena.model.QueryExecution;
import software.amazon.awssdk.services.athena.model.ResultSetMetadata;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/amazon/athena/client/results/PaginatingAsyncQueryResultsBase.class */
public abstract class PaginatingAsyncQueryResultsBase implements AsyncQueryResults {
    private static final AthenaLogger logger = AthenaLogger.of(PaginatingAsyncQueryResultsBase.class);
    private final Executor executor;
    private final List<String[]> initialRows;
    private final QueryExecution queryExecution;
    private final ResultSetMetadata resultSetMetadata;
    private final long updateCount;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/amazon/athena/client/results/PaginatingAsyncQueryResultsBase$PaginationController.class */
    public interface PaginationController {
        boolean hasNextPage();

        void loadNextPage(Consumer<String[]> consumer, Consumer<Throwable> consumer2, Runnable runnable);
    }

    /* loaded from: input_file:com/amazon/athena/client/results/PaginatingAsyncQueryResultsBase$RowSubscription.class */
    private class RowSubscription implements Subscription, Runnable {
        private final Subscriber<? super String[]> subscriber;
        private final PaginationController paginationController;
        private final AtomicLong requestCount;
        private final AtomicReference<SubscriptionState> subscriptionState;
        private final AtomicBoolean delivering;
        private final AtomicBoolean loading;
        private final AtomicBoolean headersSkipped;
        private final AtomicReference<Throwable> error;
        private final Queue<String[]> rowBuffer;

        private RowSubscription(Subscriber<? super String[]> subscriber, PaginationController paginationController) {
            this.subscriber = subscriber;
            this.paginationController = paginationController;
            this.rowBuffer = new ConcurrentLinkedQueue();
            this.requestCount = new AtomicLong(0L);
            this.subscriptionState = new AtomicReference<>(SubscriptionState.NEW);
            this.delivering = new AtomicBoolean(false);
            this.loading = new AtomicBoolean(false);
            this.headersSkipped = new AtomicBoolean(ResultFormatHelper.isPlainTextResult(PaginatingAsyncQueryResultsBase.this.queryExecution));
            this.error = new AtomicReference<>(null);
            PaginatingAsyncQueryResultsBase.this.initialRows.forEach(this::addRow);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void start() {
            if (this.subscriptionState.compareAndSet(SubscriptionState.NEW, SubscriptionState.SUBSCRIBED)) {
                scheduleRun();
            }
        }

        @Override // org.reactivestreams.Subscription
        public void request(long j) {
            if (j < 0) {
                PaginatingAsyncQueryResultsBase.logger.warn("Invalid item count requested for query execution {} (got {})", PaginatingAsyncQueryResultsBase.this.queryExecution.queryExecutionId(), Long.valueOf(j));
                this.error.set(new IllegalArgumentException(String.format("Requested item count was negative (got %d)", Long.valueOf(j))));
                scheduleRun();
                return;
            }
            if (j == 0) {
                PaginatingAsyncQueryResultsBase.logger.warn("Invalid item count requested for query execution {} (got {})", PaginatingAsyncQueryResultsBase.this.queryExecution.queryExecutionId(), Long.valueOf(j));
                this.error.set(new IllegalArgumentException("Requested item count was zero"));
                scheduleRun();
            } else {
                if (this.subscriptionState.get() == SubscriptionState.CANCELLED) {
                    PaginatingAsyncQueryResultsBase.logger.warn("Items requested for query execution {}, but subscription is cancelled", PaginatingAsyncQueryResultsBase.this.queryExecution.queryExecutionId());
                    return;
                }
                PaginatingAsyncQueryResultsBase.logger.trace("Items requested for query execution {} (requested {}, request count is {}, buffer size is {})", PaginatingAsyncQueryResultsBase.this.queryExecution.queryExecutionId(), Long.valueOf(j), Long.valueOf(this.requestCount.addAndGet(j)), Integer.valueOf(this.rowBuffer.size()));
                if (this.requestCount.get() < 0) {
                    PaginatingAsyncQueryResultsBase.logger.trace("Request count wrapped, all items will be delivered", new Object[0]);
                    this.requestCount.set(Long.MAX_VALUE);
                }
                if (this.requestCount.get() > this.rowBuffer.size() && this.paginationController.hasNextPage()) {
                    loadNextPage();
                }
                scheduleRun();
            }
        }

        @Override // org.reactivestreams.Subscription
        public void cancel() {
            PaginatingAsyncQueryResultsBase.logger.trace("Subscription for query execution {} cancelled or completed", PaginatingAsyncQueryResultsBase.this.queryExecution.queryExecutionId());
            this.subscriptionState.set(SubscriptionState.CANCELLED);
        }

        private void scheduleRun() {
            if (this.subscriptionState.get() == SubscriptionState.SUBSCRIBED && this.delivering.compareAndSet(false, true)) {
                PaginatingAsyncQueryResultsBase.this.executor.execute(this);
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            String[] poll;
            PaginatingAsyncQueryResultsBase.logger.trace("Deliver items for query execution {} (request count is {}, buffer size is {})", PaginatingAsyncQueryResultsBase.this.queryExecution.queryExecutionId(), Long.valueOf(this.requestCount.get()), Integer.valueOf(this.rowBuffer.size()));
            while (this.requestCount.get() > 0 && (poll = this.rowBuffer.poll()) != null) {
                try {
                    this.requestCount.decrementAndGet();
                    this.subscriber.onNext(poll);
                } catch (Throwable th) {
                    PaginatingAsyncQueryResultsBase.logger.trace("Error caught during item delivery for query execution {}", PaginatingAsyncQueryResultsBase.this.queryExecution.queryExecutionId());
                    this.error.set(th);
                }
            }
            if (this.error.get() != null) {
                PaginatingAsyncQueryResultsBase.logger.trace("Delivering error for query execution {}", PaginatingAsyncQueryResultsBase.this.queryExecution.queryExecutionId());
                cancel();
                this.subscriber.onError(this.error.get());
            } else if (this.requestCount.get() > 0 && !this.loading.get()) {
                PaginatingAsyncQueryResultsBase.logger.trace("Item buffer empty for query execution {} (request count is {})", PaginatingAsyncQueryResultsBase.this.queryExecution.queryExecutionId(), Long.valueOf(this.requestCount.get()));
                if (this.paginationController.hasNextPage()) {
                    loadNextPage();
                } else if (this.rowBuffer.isEmpty()) {
                    PaginatingAsyncQueryResultsBase.logger.trace("No more pages to load for query execution {}, completing subscription", PaginatingAsyncQueryResultsBase.this.queryExecution.queryExecutionId());
                    cancel();
                    this.subscriber.onComplete();
                }
            }
            this.delivering.set(false);
        }

        private void addRow(String[] strArr) {
            if (this.headersSkipped.get()) {
                this.rowBuffer.add(strArr);
            } else {
                this.headersSkipped.set(true);
            }
        }

        private void loadNextPage() {
            if (this.subscriptionState.get() == SubscriptionState.SUBSCRIBED && this.loading.compareAndSet(false, true)) {
                PaginatingAsyncQueryResultsBase.logger.trace("Loading next page for query execution {} (request count is {}, buffer size is {})", PaginatingAsyncQueryResultsBase.this.queryExecution.queryExecutionId(), Long.valueOf(this.requestCount.get()), Integer.valueOf(this.rowBuffer.size()));
                PaginationController paginationController = this.paginationController;
                Consumer<String[]> consumer = this::addRow;
                AtomicReference<Throwable> atomicReference = this.error;
                atomicReference.getClass();
                paginationController.loadNextPage(consumer, (v1) -> {
                    r2.set(v1);
                }, () -> {
                    this.loading.set(false);
                    PaginatingAsyncQueryResultsBase.logger.trace("Loaded page for query execution {} (request count is {}, buffer size is {}, has next page {})", PaginatingAsyncQueryResultsBase.this.queryExecution.queryExecutionId(), Long.valueOf(this.requestCount.get()), Integer.valueOf(this.rowBuffer.size()), Boolean.valueOf(this.paginationController.hasNextPage()));
                    if (this.error.get() == null && this.requestCount.get() > this.rowBuffer.size() && this.paginationController.hasNextPage()) {
                        PaginatingAsyncQueryResultsBase.logger.trace("Page did not fulfill request count for query execution {}, load another page", PaginatingAsyncQueryResultsBase.this.queryExecution.queryExecutionId());
                        loadNextPage();
                    } else if (this.error.get() != null) {
                        PaginatingAsyncQueryResultsBase.logger.trace("Page loading failed for query execution {}", PaginatingAsyncQueryResultsBase.this.queryExecution.queryExecutionId());
                    }
                    scheduleRun();
                });
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/amazon/athena/client/results/PaginatingAsyncQueryResultsBase$SubscriptionState.class */
    public enum SubscriptionState {
        NEW,
        SUBSCRIBED,
        CANCELLED
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PaginatingAsyncQueryResultsBase(Executor executor, QueryExecution queryExecution, ResultSetMetadata resultSetMetadata, Long l, List<String[]> list) {
        this.executor = executor;
        this.queryExecution = queryExecution;
        this.resultSetMetadata = resultSetMetadata;
        this.updateCount = l == null ? 0L : l.longValue();
        this.initialRows = list;
    }

    @Override // org.reactivestreams.Publisher
    public void subscribe(@NonNull Subscriber<? super String[]> subscriber) {
        if (subscriber == null) {
            throw new NullPointerException("subscriber is marked non-null but is null");
        }
        RowSubscription rowSubscription = new RowSubscription(subscriber, startPagination());
        logger.trace("Got subscriber for query execution {}, starting subscription", queryExecution().queryExecutionId());
        subscriber.onSubscribe(rowSubscription);
        rowSubscription.start();
    }

    protected abstract PaginationController startPagination();

    @Override // com.amazon.athena.client.results.AsyncQueryResults
    public QueryExecution queryExecution() {
        return this.queryExecution;
    }

    @Override // com.amazon.athena.client.results.AsyncQueryResults
    public ResultSetMetadata resultSetMetadata() {
        return this.resultSetMetadata;
    }

    @Override // com.amazon.athena.client.results.AsyncQueryResults
    public long updateCount() {
        return this.updateCount;
    }
}
