package com.borland.bms.framework.async;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/borland/bms/framework/async/AbstractPipelineProcessor.class */
public abstract class AbstractPipelineProcessor {
    private static final int INITIAL_DELAY_IN_MILLIS = 1000;
    private static final int TIMEOUT_FOR_SHUTDOWN_IN_MILLIS = 2000;
    private static Logger logger = LoggerFactory.getLogger(AbstractPipelineProcessor.class.getName());
    private final ScheduledExecutorService scheduler;
    private final PipelineQueue queue;
    private final MessageConsumer consumer;
    private final ScheduledFuture<?> processorTask;

    public AbstractPipelineProcessor(PipelineProcessorConfig pipelineProcessorConfig, MessageConsumer messageConsumer) {
        this.scheduler = Executors.newScheduledThreadPool(pipelineProcessorConfig.getThreadPoolSize());
        this.queue = new PipelineQueue(pipelineProcessorConfig.getQueueSize());
        this.processorTask = initializeProcessorTask(pipelineProcessorConfig);
        this.consumer = messageConsumer;
        addShutdownHook();
        registerMessageListener();
    }

    private final void registerMessageListener() {
        try {
            this.consumer.setMessageListener(this.queue);
        } catch (JMSException e) {
            logger.error("Unable to register the message listener.", e);
            throw new AsyncPipelineProcessorInitializationException("Unable to register the message listener.", e);
        }
    }

    private final ScheduledFuture<?> initializeProcessorTask(PipelineProcessorConfig pipelineProcessorConfig) {
        return this.scheduler.scheduleAtFixedRate(new Runnable() { // from class: com.borland.bms.framework.async.AbstractPipelineProcessor.1
            @Override // java.lang.Runnable
            public void run() {
                AbstractPipelineProcessor.this.processMessages();
            }
        }, 1000L, pipelineProcessorConfig.getFrequencyInMillis(), TimeUnit.MILLISECONDS);
    }

    private final void addShutdownHook() {
        Runtime.getRuntime().addShutdownHook(new Thread("[ThreadPool-Shutdown]") { // from class: com.borland.bms.framework.async.AbstractPipelineProcessor.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                AbstractPipelineProcessor.this.shutdown();
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final int processMessages() {
        if (this.queue.isEmpty()) {
            return 0;
        }
        PipelineTextMessage[] allMessagesAndEmptyQueue = this.queue.getAllMessagesAndEmptyQueue();
        try {
            processMessages(allMessagesAndEmptyQueue);
        } catch (Throwable th) {
            logger.error("ProcessMessages implementation errors..", th);
        }
        return allMessagesAndEmptyQueue.length;
    }

    final void shutdown() {
        try {
            if (this.consumer != null) {
                this.consumer.setMessageListener((MessageListener) null);
            }
        } catch (JMSException e) {
            logger.error("Unable to unregister the Pipeline Queue from the Topic.", e);
        }
        try {
            if (this.scheduler != null && !this.scheduler.isShutdown()) {
                this.scheduler.awaitTermination(2000L, TimeUnit.MILLISECONDS);
            }
        } catch (InterruptedException e2) {
            logger.error("Interrupted while trying to await terminiation of Scheduler for Processor threads.");
        }
    }

    final MessageListener getMessageListener() {
        return this.queue;
    }

    protected abstract void processMessages(PipelineTextMessage[] pipelineTextMessageArr);
}
