package com.inet.persistence.azure.cosmos;

import com.azure.cosmos.CosmosContainer;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.ExcludedPath;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.IndexingPolicy;
import com.inet.annotations.JsonData;
import com.inet.id.GUID;
import com.inet.lib.json.Json;
import com.inet.logging.LogID;
import com.inet.persistence.PersistenceListener;
import com.inet.persistence.spi.PersistenceLogger;
import com.inet.persistence.spi.events.PersistenceListenerContainer;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Objects;
import javax.annotation.Nonnull;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/inet/persistence/azure/cosmos/AzureCosmosPublishSubscribe.class */
public class AzureCosmosPublishSubscribe {
    private static final String CHANNEL = "inet-events";
    private final PersistenceListenerContainer listeners;
    private CosmosContainer collection;
    private boolean stopped;

    /* JADX INFO: Access modifiers changed from: private */
    @JsonData
    /* loaded from: input_file:com/inet/persistence/azure/cosmos/AzureCosmosPublishSubscribe$EventPOJO.class */
    public static class EventPOJO {
        public String id = GUID.generateNew().toString();
        public String client;
        public String type;
        public String message;

        private EventPOJO() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AzureCosmosPublishSubscribe(PersistenceListenerContainer persistenceListenerContainer) {
        this.listeners = persistenceListenerContainer;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start(@Nonnull String str) {
        CosmosContainerProperties cosmosContainerProperties = new CosmosContainerProperties(CHANNEL, "/type");
        cosmosContainerProperties.setDefaultTimeToLiveInSeconds(60);
        IndexingPolicy indexingPolicy = new IndexingPolicy();
        indexingPolicy.setExcludedPaths(Arrays.asList(new ExcludedPath("/*")));
        cosmosContainerProperties.setIndexingPolicy(indexingPolicy);
        this.collection = AzureCosmosPersistence.getOrCreateContainer(cosmosContainerProperties);
        Thread thread = new Thread(() -> {
            LogID.setID("pubsub");
            String[] strArr = new String[1];
            while (!this.stopped) {
                try {
                    Iterator it = this.collection.queryChangeFeed(strArr[0] == null ? CosmosChangeFeedRequestOptions.createForProcessingFromNow(FeedRange.forFullRange()) : CosmosChangeFeedRequestOptions.createForProcessingFromContinuation(strArr[0]), EventPOJO.class).handle(feedResponse -> {
                        strArr[0] = feedResponse.getContinuationToken();
                    }).iterator();
                    while (it.hasNext()) {
                        EventPOJO eventPOJO = (EventPOJO) it.next();
                        if (!Objects.equals(eventPOJO.client, str)) {
                            onMessage(eventPOJO);
                        }
                    }
                    Thread.sleep(1000L);
                } catch (Throwable th) {
                    if (this.stopped) {
                        return;
                    }
                    PersistenceLogger.LOGGER.error(th);
                    return;
                }
            }
        }, CHANNEL);
        thread.setDaemon(true);
        thread.start();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        this.stopped = true;
    }

    private void onMessage(EventPOJO eventPOJO) {
        PersistenceListenerContainer.ListenerDescription listenerDescription;
        String str;
        String str2 = eventPOJO.type;
        if (str2 == null || (listenerDescription = this.listeners.get(str2)) == null || (str = eventPOJO.message) == null) {
            return;
        }
        Object fromJson = new Json().fromJson(str, listenerDescription.getType());
        Iterator<PersistenceListener<?>> it = listenerDescription.iterator();
        while (it.hasNext()) {
            it.next().eventReceived(fromJson);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> void send(T t, String str) {
        EventPOJO eventPOJO = new EventPOJO();
        eventPOJO.client = str;
        eventPOJO.type = t.getClass().getName();
        eventPOJO.message = new Json().toJson(t);
        this.collection.createItem(eventPOJO);
    }
}
