/*
 * Decompiled with CFR 0.152.
 */
package org.jumpmind.symmetric.io.data;

import java.util.concurrent.CancellationException;
import org.jumpmind.db.model.Table;
import org.jumpmind.exception.InvalidRetryException;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.data.IDataProcessorListener;
import org.jumpmind.symmetric.io.data.IDataReader;
import org.jumpmind.symmetric.io.data.IDataResource;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.ProtocolException;
import org.jumpmind.symmetric.io.data.writer.IgnoreBatchException;
import org.jumpmind.util.Statistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataProcessor {
    private static final String STAT_WRITE_DATA = "statWriteData";
    private static final String STAT_READ_DATA = "statReadData";
    private static final Logger log = LoggerFactory.getLogger(DataProcessor.class);
    protected IDataReader dataReader;
    protected IDataWriter defaultDataWriter;
    protected IDataProcessorListener listener;
    protected Table currentTable;
    protected CsvData currentData;
    protected Batch currentBatch;
    protected String name;

    public DataProcessor() {
    }

    public DataProcessor(IDataReader dataReader, IDataWriter defaultDataWriter, String name) {
        this(dataReader, defaultDataWriter, null, name);
    }

    public DataProcessor(IDataReader dataReader, IDataWriter defaultDataWriter, IDataProcessorListener listener, String name) {
        this.dataReader = dataReader;
        this.defaultDataWriter = defaultDataWriter;
        this.listener = listener;
        this.name = name;
    }

    protected IDataWriter chooseDataWriter(Batch batch) {
        return this.defaultDataWriter;
    }

    public void process() {
        this.process(new DataContext());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void process(DataContext context) {
        try {
            context.setReader(this.dataReader);
            this.dataReader.open(context);
            do {
                this.currentBatch = this.dataReader.nextBatch();
                if (this.currentBatch == null) continue;
                context.setBatch(this.currentBatch);
                boolean endBatchCalled = false;
                IDataWriter dataWriter = null;
                try {
                    boolean processBatch;
                    boolean bl = processBatch = this.listener == null ? true : this.listener.beforeBatchStarted(context);
                    if (processBatch) {
                        dataWriter = this.chooseDataWriter(this.currentBatch);
                        processBatch &= dataWriter != null;
                    }
                    if (processBatch) {
                        context.setWriter(dataWriter);
                        dataWriter.open(context);
                        dataWriter.start(this.currentBatch);
                        if (this.listener != null) {
                            this.listener.afterBatchStarted(context);
                        }
                    }
                    if (this.currentBatch.isInvalidRetry()) {
                        throw new InvalidRetryException();
                    }
                    this.forEachDataInTable(context, processBatch, true, this.currentBatch);
                    this.forEachTableInBatch(context, processBatch, this.currentBatch);
                    if (this.currentBatch != null && !this.currentBatch.isComplete()) {
                        Object msg = "The batch %s was not complete";
                        if (this.currentBatch.getBatchType() == Batch.BatchType.EXTRACT) {
                            msg = (String)msg + ".  Note that this is the error you receive on Oracle when the total size of row_data in sym_data is greater than 4k.  You can work around this by changing the contains_big_lobs in sym_channel to 1";
                        }
                        throw new ProtocolException((String)msg, this.currentBatch.getNodeBatchId());
                    }
                    if (processBatch) {
                        if (this.listener != null) {
                            this.listener.beforeBatchEnd(context);
                        }
                        dataWriter.end(this.currentBatch, false);
                        endBatchCalled = true;
                        if (this.listener != null) {
                            this.listener.batchSuccessful(context);
                        }
                    }
                    this.close(dataWriter);
                }
                catch (Throwable ex) {
                    try {
                        try {
                            context.setLastError(ex);
                            if (dataWriter != null && !endBatchCalled) {
                                dataWriter.end(this.currentBatch, true);
                            }
                        }
                        finally {
                            if (this.listener != null) {
                                this.listener.batchInError(context, ex);
                            }
                        }
                        this.rethrow(ex);
                        this.close(dataWriter);
                    }
                    catch (Throwable throwable) {
                        this.close(dataWriter);
                        throw throwable;
                    }
                }
            } while (this.currentBatch != null);
        }
        finally {
            this.close(this.dataReader);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected int forEachTableInBatch(DataContext context, boolean processBatch, Batch batch) {
        int dataRow = 0;
        do {
            this.currentTable = this.dataReader.nextTable();
            context.setTable(this.currentTable);
            if (this.currentTable == null) continue;
            boolean processTable = false;
            try {
                try {
                    if (processBatch) {
                        processTable = context.getWriter().start(this.currentTable);
                    }
                    dataRow += this.forEachDataInTable(context, processBatch, processTable, batch);
                }
                catch (IgnoreBatchException ex) {
                    processBatch = false;
                }
            }
            finally {
                if (processTable) {
                    context.getWriter().end(this.currentTable);
                }
            }
        } while (this.currentTable != null);
        return dataRow;
    }

    protected int forEachDataInTable(DataContext context, boolean processBatch, boolean processTable, Batch batch) {
        int dataRow = 0;
        IgnoreBatchException ignore = null;
        long startTime = System.currentTimeMillis();
        long ts = System.currentTimeMillis();
        do {
            batch.startTimer(STAT_READ_DATA);
            this.currentData = this.dataReader.nextData();
            context.setData(this.currentData);
            batch.incrementDataReadMillis(batch.endTimer(STAT_READ_DATA));
            if (this.currentData != null) {
                ++dataRow;
                if (processTable || !this.currentData.requiresTable()) {
                    try {
                        batch.startTimer(STAT_WRITE_DATA);
                        batch.incrementLineCount();
                        if (context.getWriter() == null) {
                            context.setWriter(this.chooseDataWriter(batch));
                        }
                        if (processBatch) {
                            context.getWriter().write(this.currentData);
                        }
                        batch.incrementDataWriteMillis(batch.endTimer(STAT_WRITE_DATA));
                    }
                    catch (IgnoreBatchException ex) {
                        ignore = ex;
                        processTable = false;
                    }
                }
                if (this.listener != null) {
                    this.listener.dataRowProcessed();
                }
            }
            if (System.currentTimeMillis() - ts > 60000L && context.getWriter() != null) {
                Statistics stats = context.getWriter().getStatistics().get(batch);
                if (this.listener != null) {
                    this.listener.batchProgressUpdate(context);
                }
                if (stats != null) {
                    log.info("Batch '{}', for node '{}', for process '{}' has been processing for {} seconds.  The following stats have been gathered: {}", new Object[]{batch.getBatchId(), batch.getTargetNodeId(), this.name, (System.currentTimeMillis() - startTime) / 1000L, stats.toString()});
                }
                ts = System.currentTimeMillis();
            }
            if (!Thread.currentThread().isInterrupted()) continue;
            throw new CancellationException("This thread was interrupted");
        } while (this.currentData != null);
        if (ignore != null) {
            throw ignore;
        }
        return dataRow;
    }

    protected void rethrow(Throwable ex) {
        if (ex instanceof RuntimeException) {
            throw (RuntimeException)ex;
        }
        throw new RuntimeException(ex);
    }

    protected void close(IDataResource dataResource) {
        try {
            if (dataResource != null) {
                dataResource.close();
            }
        }
        catch (Exception ex) {
            log.error("Failed to close dataResource:" + dataResource, (Throwable)ex);
        }
    }

    public void setListener(IDataProcessorListener listener) {
        this.listener = listener;
    }

    public void setDataReader(IDataReader dataReader) {
        this.dataReader = dataReader;
    }

    public void setDefaultDataWriter(IDataWriter dataWriter) {
        this.defaultDataWriter = dataWriter;
    }
}

