/*
 * Decompiled with CFR 0.152.
 */
package org.jumpmind.symmetric.io.stage;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.Reader;
import java.util.Date;
import java.util.HashMap;
import org.apache.commons.lang3.StringUtils;
import org.jumpmind.db.util.BinaryEncoding;
import org.jumpmind.symmetric.AbstractSymmetricEngine;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.csv.CsvReader;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.data.ProtocolException;
import org.jumpmind.symmetric.io.data.writer.IProtocolDataWriterListener;
import org.jumpmind.symmetric.io.stage.IStagedResource;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.jumpmind.symmetric.model.ProcessInfo;
import org.jumpmind.util.Statistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleStagingDataWriter {
    protected static final int MAX_WRITE_LENGTH = 32768;
    protected final Logger log = LoggerFactory.getLogger(this.getClass());
    protected CsvReader reader;
    protected ISymmetricEngine engine;
    protected IStagingManager stagingManager;
    protected IProtocolDataWriterListener[] listeners;
    protected long memoryThresholdInBytes;
    protected String category;
    protected Batch.BatchType batchType;
    protected String sourceNodeId;
    protected String targetNodeId;
    protected DataContext context;
    protected ProcessInfo processInfo;
    protected BufferedWriter writer;
    protected Batch batch;
    protected long invalidLineCount;
    protected Exception exception;

    public SimpleStagingDataWriter(ProcessInfo processInfo, BufferedReader reader, ISymmetricEngine engine, String category, long memoryThresholdInBytes, Batch.BatchType batchType, String sourceNodeId, String targetNodeId, DataContext context, IProtocolDataWriterListener ... listeners) {
        this.reader = new CsvReader((Reader)reader);
        this.reader.setEscapeMode(2);
        this.reader.setSafetySwitch(false);
        this.engine = engine;
        this.stagingManager = engine.getStagingManager();
        this.memoryThresholdInBytes = memoryThresholdInBytes;
        this.category = category;
        this.batchType = batchType;
        this.sourceNodeId = sourceNodeId;
        this.targetNodeId = targetNodeId;
        this.listeners = listeners;
        this.context = context;
        this.processInfo = processInfo;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void process() throws IOException {
        IStagedResource resource = null;
        try {
            long startTime;
            String catalogLine = null;
            String schemaLine = null;
            String nodeLine = null;
            String binaryLine = null;
            String channelLine = null;
            TableLine tableLine = null;
            HashMap<TableLine, TableLine> syncTableLines = new HashMap<TableLine, TableLine>();
            HashMap<TableLine, TableLine> batchTableLines = new HashMap<TableLine, TableLine>();
            String line = null;
            long ts = startTime = System.currentTimeMillis();
            long lineCount = 0L;
            String batchStatsColumnsLine = null;
            String batchStatsLine = null;
            Statistics batchStats = null;
            while (this.reader.readRecord()) {
                line = this.reader.getRawRecord();
                if (line.startsWith("catalog")) {
                    catalogLine = line;
                    this.writeLine(line);
                } else if (line.startsWith("schema")) {
                    schemaLine = line;
                    this.writeLine(line);
                } else if (line.startsWith("table")) {
                    tableLine = new TableLine(catalogLine, schemaLine, line);
                    TableLine tableLine2 = (TableLine)batchTableLines.get(tableLine);
                    if (tableLine2 != null) {
                        tableLine = tableLine2;
                        this.writeLine(line);
                    } else {
                        TableLine syncTableLine = (TableLine)syncTableLines.get(tableLine);
                        if (syncTableLine != null) {
                            tableLine = syncTableLine;
                            this.writeLine(tableLine.catalogLine);
                            this.writeLine(tableLine.schemaLine);
                            this.writeLine(line);
                            this.writeLine(tableLine.keysLine);
                            this.writeLine(tableLine.columnsLine);
                        } else {
                            syncTableLines.put(tableLine, tableLine);
                            batchTableLines.put(tableLine, tableLine);
                            this.writeLine(line);
                        }
                    }
                } else if (line.startsWith("keys")) {
                    tableLine.keysLine = line;
                    this.writeLine(line);
                } else if (line.startsWith("columns")) {
                    tableLine.columnsLine = line;
                    this.writeLine(line);
                } else if (line.startsWith("batch")) {
                    this.batch = new Batch(this.batchType, Long.parseLong(this.getArgLine(line)), this.getArgLine(channelLine), this.getBinaryEncoding(binaryLine), this.getArgLine(nodeLine), this.targetNodeId, false);
                    this.processInfo.setCurrentBatchId(this.batch.getBatchId());
                    this.processInfo.setCurrentBatchStartTime(new Date());
                    this.processInfo.incrementBatchCount();
                    this.processInfo.setCurrentDataCount(0L);
                    this.processInfo.setTotalDataCount(0L);
                    String string = this.batch.getStagedLocation();
                    if (resource != null) {
                        resource.close();
                        resource.setState(IStagedResource.State.DONE);
                    }
                    resource = this.stagingManager.create(new Object[]{this.category, string, this.batch.getBatchId()});
                    this.writer = resource.getWriter(this.memoryThresholdInBytes);
                    this.writeLine(nodeLine);
                    this.writeLine(binaryLine);
                    this.writeLine(channelLine);
                    this.writeLine(line);
                    if (this.listeners != null && this.exception == null) {
                        IProtocolDataWriterListener[] syncTableLine = this.listeners;
                        var22_32 = syncTableLine.length;
                        for (int i = 0; i < var22_32; ++i) {
                            IProtocolDataWriterListener listener = syncTableLine[i];
                            listener.start(this.context, this.batch);
                        }
                    }
                } else if (line.startsWith("commit")) {
                    if (this.writer != null) {
                        this.writeLine(line);
                        resource.close();
                        resource.setState(IStagedResource.State.DONE);
                        this.writer = null;
                    }
                    batchTableLines.clear();
                    if (this.batch != null) {
                        this.batch.setStatistics(batchStats);
                        if (this.listeners != null && this.exception == null) {
                            IProtocolDataWriterListener[] iProtocolDataWriterListenerArray = this.listeners;
                            int syncTableLine = iProtocolDataWriterListenerArray.length;
                            for (var22_32 = 0; var22_32 < syncTableLine; ++var22_32) {
                                IProtocolDataWriterListener listener = iProtocolDataWriterListenerArray[var22_32];
                                listener.end(this.context, this.batch, resource);
                            }
                        }
                    }
                    batchStats = null;
                    resource = null;
                } else if (line.startsWith("retry")) {
                    this.batch = new Batch(this.batchType, Long.parseLong(this.getArgLine(line)), this.getArgLine(channelLine), this.getBinaryEncoding(binaryLine), this.getArgLine(nodeLine), this.targetNodeId, false);
                    this.processInfo.setCurrentBatchId(this.batch.getBatchId());
                    this.processInfo.setCurrentBatchStartTime(new Date());
                    this.processInfo.incrementBatchCount();
                    this.processInfo.setCurrentDataCount(0L);
                    this.processInfo.setTotalDataCount(0L);
                    if (resource != null) {
                        resource.close();
                        resource.setState(IStagedResource.State.DONE);
                    }
                    if ((resource = this.getStagedResource()) == null || resource.getState() == IStagedResource.State.CREATE) {
                        if (resource != null) {
                            resource.delete();
                        }
                        resource = null;
                        this.writer = null;
                    }
                    if (this.log.isDebugEnabled()) {
                        this.debugLine(nodeLine);
                        this.debugLine(binaryLine);
                        this.debugLine(channelLine);
                        this.debugLine(line);
                    }
                    if (this.listeners != null && this.exception == null) {
                        IProtocolDataWriterListener[] iProtocolDataWriterListenerArray = this.listeners;
                        int syncTableLine = iProtocolDataWriterListenerArray.length;
                        for (var22_32 = 0; var22_32 < syncTableLine; ++var22_32) {
                            IProtocolDataWriterListener listener = iProtocolDataWriterListenerArray[var22_32];
                            listener.start(this.context, this.batch);
                        }
                    }
                } else if (line.startsWith("nodeid")) {
                    nodeLine = line;
                } else if (line.startsWith("binary")) {
                    binaryLine = line;
                } else if (line.startsWith("channel")) {
                    channelLine = line;
                } else if (line.startsWith("stats_columns")) {
                    batchStatsColumnsLine = line;
                } else if (line.startsWith("stats")) {
                    batchStatsLine = line;
                    batchStats = new Statistics();
                    this.putStats(batchStats, batchStatsColumnsLine, batchStatsLine);
                    this.processInfo.setTotalDataCount(batchStats.get("DATA_ROW_COUNT"));
                } else if (this.writer == null) {
                    ++this.invalidLineCount;
                    if (this.log.isDebugEnabled() && line != null) {
                        this.log.debug("Invalid line received outside of a batch: {}", (Object)line);
                    }
                } else {
                    int size;
                    TableLine syncLine;
                    TableLine tableLine3 = (TableLine)batchTableLines.get(tableLine);
                    if ((tableLine3 == null || tableLine3 != null && tableLine3.columnsLine == null) && (syncLine = (TableLine)syncTableLines.get(tableLine)) != null) {
                        this.log.debug("Injecting keys and columns to be backwards compatible");
                        if (tableLine3 == null) {
                            TableLine tableLine4 = syncLine;
                            batchTableLines.put(tableLine4, tableLine4);
                            this.writeLine(tableLine4.tableLine);
                        }
                        var20_25.keysLine = syncLine.keysLine;
                        this.writeLine(syncLine.keysLine);
                        var20_25.columnsLine = syncLine.columnsLine;
                        this.writeLine(syncLine.columnsLine);
                    }
                    if (line.startsWith("insert") || line.startsWith("delete") || line.startsWith("update") || line.startsWith("create") || line.startsWith("sql") || line.startsWith("bsh")) {
                        this.processInfo.incrementCurrentDataCount();
                    }
                    if ((size = line.length()) > 32768) {
                        this.log.debug("Exceeded max line length with {}", (Object)size);
                        for (int i = 0; i < size; i += 32768) {
                            int end = i + 32768;
                            this.writer.append(line, i, end < size ? end : size);
                        }
                        this.writer.append("\n");
                    } else {
                        this.writeLine(line);
                    }
                }
                ++lineCount;
                if (System.currentTimeMillis() - ts <= 60000L) continue;
                this.log.info("Batch '{}', from node '{}', for process 'transfer to stage' has been processing for {} seconds.  The following stats have been gathered: {}", new Object[]{this.batch != null ? Long.valueOf(this.batch.getBatchId()) : "?", this.sourceNodeId, (System.currentTimeMillis() - startTime) / 1000L, "LINES=" + lineCount + ", BYTES=" + (resource == null ? 0L : resource.getSize())});
                ts = System.currentTimeMillis();
            }
            if (resource != null) {
                resource.close();
                resource.setState(IStagedResource.State.DONE);
            }
            this.processInfo.setStatus(ProcessInfo.ProcessStatus.OK);
            if (this.invalidLineCount <= 0L) return;
        }
        catch (Exception ex) {
            try {
                if (this.exception == null) {
                    this.exception = ex;
                }
                if (resource != null) {
                    resource.delete();
                }
                this.processInfo.setStatus(ProcessInfo.ProcessStatus.ERROR);
                this.log.error("Failed to write batch into staging from {}.  {}: {}", new Object[]{this.context.getContext().get("sourceNode").toString(), ex.getClass().getName(), ex.getMessage()});
                if (this.invalidLineCount <= 0L) return;
            }
            catch (Throwable throwable) {
                if (this.invalidLineCount <= 0L) throw throwable;
                throw new ProtocolException("Received {} invalid lines from node {} that were outside of a batch", new Object[]{this.invalidLineCount, this.sourceNodeId});
            }
            throw new ProtocolException("Received {} invalid lines from node {} that were outside of a batch", new Object[]{this.invalidLineCount, this.sourceNodeId});
        }
        throw new ProtocolException("Received {} invalid lines from node {} that were outside of a batch", new Object[]{this.invalidLineCount, this.sourceNodeId});
    }

    public Exception getException() {
        return this.exception;
    }

    protected String getArgLine(String line) throws IOException {
        if (line != null) {
            int i = line.indexOf(",");
            if (i >= 0) {
                return line.substring(i + 1).trim();
            }
            throw new IOException("Invalid token line in CSV: " + line);
        }
        return null;
    }

    protected BinaryEncoding getBinaryEncoding(String line) throws IOException {
        String value = this.getArgLine(line);
        if (value != null) {
            return BinaryEncoding.valueOf((String)value);
        }
        return null;
    }

    protected void writeLine(String line) throws IOException {
        if (line != null) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Writing staging data: {}", (Object)line);
            }
            if (this.writer != null) {
                this.writer.write(line);
                this.writer.write("\n");
            } else {
                this.exception = new ProtocolException("Batch data is corrupt from node " + this.sourceNodeId + " because no batch ID was present", new Object[0]);
                this.processInfo.setStatus(ProcessInfo.ProcessStatus.ERROR);
            }
        }
    }

    protected void putStats(Statistics stats, String columnsString, String statsString) {
        String[] statsColumns = StringUtils.split((String)columnsString, (char)',');
        String[] statsValues = StringUtils.split((String)statsString, (char)',');
        if (statsValues != null && statsColumns != null) {
            for (int i = 1; i < statsColumns.length; ++i) {
                String column = statsColumns[i];
                if (i >= statsValues.length) continue;
                long stat = Long.parseLong(statsValues[i]);
                stats.set(column, stat);
            }
        }
    }

    protected void debugLine(String line) {
        if (line != null) {
            this.log.debug("Received: {}", (Object)line);
        }
    }

    protected IStagedResource getStagedResource() {
        ISymmetricEngine sourceEngine;
        IStagedResource resource = null;
        boolean isSourceStagingEnabled = this.engine.getConfigurationService().isUseSourceStagingEnabled(this.batch.getSourceNodeId());
        ISymmetricEngine iSymmetricEngine = sourceEngine = isSourceStagingEnabled ? AbstractSymmetricEngine.findEngineByNodeId(this.batch.getSourceNodeId()) : null;
        if (sourceEngine != null) {
            Batch outgoingBatch = new Batch(Batch.BatchType.EXTRACT, this.batch.getBatchId(), this.batch.getChannelId(), this.batch.getBinaryEncoding(), this.batch.getSourceNodeId(), this.batch.getTargetNodeId(), this.batch.isCommon());
            resource = sourceEngine.getStagingManager().find(new Object[]{"outgoing", outgoingBatch.getStagedLocation(), this.batch.getBatchId()});
            if (resource == null) {
                outgoingBatch.setCommon(true);
                resource = sourceEngine.getStagingManager().find(new Object[]{"outgoing", outgoingBatch.getStagedLocation(), this.batch.getBatchId()});
            }
        }
        if (resource == null) {
            resource = this.stagingManager.find(new Object[]{this.category, this.batch.getStagedLocation(), this.batch.getBatchId()});
        }
        return resource;
    }

    static class TableLine {
        String catalogLine;
        String schemaLine;
        String tableLine;
        String keysLine;
        String columnsLine;

        public TableLine(String catalogLine, String schemaLine, String tableLine) {
            this.catalogLine = catalogLine;
            this.schemaLine = schemaLine;
            this.tableLine = tableLine;
        }

        public boolean equals(Object o) {
            if (o == null || !(o instanceof TableLine)) {
                return false;
            }
            TableLine t = (TableLine)o;
            return StringUtils.equals((CharSequence)this.catalogLine, (CharSequence)t.catalogLine) && StringUtils.equals((CharSequence)this.schemaLine, (CharSequence)t.schemaLine) && StringUtils.equals((CharSequence)this.tableLine, (CharSequence)t.tableLine);
        }

        public int hashCode() {
            return (this.catalogLine + "." + this.schemaLine + "." + this.tableLine).hashCode();
        }
    }
}

