/*
 * Decompiled with CFR 0.152.
 */
package org.jumpmind.symmetric.service.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.jumpmind.db.sql.ISqlReadCursor;
import org.jumpmind.db.sql.ISqlRowMapper;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.sql.Row;
import org.jumpmind.db.sql.mapper.LongMapper;
import org.jumpmind.db.sql.mapper.StringMapper;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.cache.ICacheManager;
import org.jumpmind.symmetric.ext.IOutgoingBatchFilter;
import org.jumpmind.symmetric.model.AbstractBatch;
import org.jumpmind.symmetric.model.Channel;
import org.jumpmind.symmetric.model.NodeChannel;
import org.jumpmind.symmetric.model.NodeGroupChannelWindow;
import org.jumpmind.symmetric.model.NodeGroupLinkAction;
import org.jumpmind.symmetric.model.NodeHost;
import org.jumpmind.symmetric.model.NodeSecurity;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.OutgoingBatchSummary;
import org.jumpmind.symmetric.model.OutgoingBatches;
import org.jumpmind.symmetric.model.ReadyChannels;
import org.jumpmind.symmetric.service.FilterCriterion;
import org.jumpmind.symmetric.service.IClusterService;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.service.IExtensionService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IOutgoingBatchService;
import org.jumpmind.symmetric.service.ISequenceService;
import org.jumpmind.symmetric.service.impl.AbstractService;
import org.jumpmind.symmetric.service.impl.OutgoingBatchServiceSqlMap;
import org.jumpmind.symmetric.util.QueueThread;
import org.jumpmind.util.AppUtils;
import org.jumpmind.util.FormatUtils;

public class OutgoingBatchService
extends AbstractService
implements IOutgoingBatchService {
    private INodeService nodeService;
    private IConfigurationService configurationService;
    private ISequenceService sequenceService;
    private IClusterService clusterService;
    private IExtensionService extensionService;
    private ICacheManager cacheManager;

    public OutgoingBatchService(ISymmetricEngine engine) {
        super(engine.getParameterService(), engine.getSymmetricDialect());
        this.nodeService = engine.getNodeService();
        this.configurationService = engine.getConfigurationService();
        this.sequenceService = engine.getSequenceService();
        this.clusterService = engine.getClusterService();
        this.extensionService = engine.getExtensionService();
        this.cacheManager = engine.getCacheManager();
        this.setSqlMap(new OutgoingBatchServiceSqlMap(this.symmetricDialect.getPlatform(), this.createSqlReplacementTokens()));
    }

    @Override
    public void updateOutgoingError(long batchId, String nodeId) {
        this.sqlTemplate.update(this.getSql("updateOutgoingError"), new Object[]{batchId, nodeId});
    }

    @Override
    public int cancelLoadBatches(long loadId) {
        return this.sqlTemplate.update(this.getSql("cancelLoadBatchesSql"), new Object[]{new Date(), loadId});
    }

    @Override
    public void markAllAsSentForNode(String nodeId, boolean includeConfigChannel) {
        int configCount;
        OutgoingBatches batches = null;
        do {
            configCount = 0;
            batches = this.getOutgoingBatches(nodeId, true);
            List<OutgoingBatch> list = batches.getBatches();
            Collections.sort(list, new Comparator<OutgoingBatch>(){

                @Override
                public int compare(OutgoingBatch o1, OutgoingBatch o2) {
                    return -Long.valueOf(o1.getBatchId()).compareTo(o2.getBatchId());
                }
            });
            for (OutgoingBatch outgoingBatch : list) {
                if (includeConfigChannel || !outgoingBatch.getChannelId().equals("config") && !outgoingBatch.getChannelId().equals("monitor") && !outgoingBatch.getChannelId().equals("system")) {
                    outgoingBatch.setStatus(AbstractBatch.Status.OK);
                    outgoingBatch.setErrorFlag(false);
                    this.updateOutgoingBatch(outgoingBatch);
                    continue;
                }
                ++configCount;
            }
        } while (batches.getBatches().size() > configCount);
    }

    @Override
    public void markAllConfigAsSentForNode(String nodeId) {
        int updateCount;
        do {
            updateCount = 0;
            OutgoingBatches batches = this.getOutgoingBatches(nodeId, false);
            List<OutgoingBatch> list = batches.getBatches();
            for (OutgoingBatch outgoingBatch : list) {
                if (!outgoingBatch.getChannelId().equals("config") && !outgoingBatch.getChannelId().equals("system")) continue;
                outgoingBatch.setStatus(AbstractBatch.Status.OK);
                outgoingBatch.setErrorFlag(false);
                outgoingBatch.setIgnoreCount(1L);
                this.updateOutgoingBatch(outgoingBatch);
                ++updateCount;
            }
        } while (updateCount > 0);
    }

    @Override
    public void copyOutgoingBatches(String channelId, long startBatchId, String fromNodeId, String toNodeId) {
        this.log.info("Copying outgoing batches for channel '{}' from node '{}' to node '{}' starting at {}", new Object[]{channelId, fromNodeId, toNodeId, startBatchId});
        this.sqlTemplate.update(this.getSql("deleteOutgoingBatchesForNodeSql"), new Object[]{toNodeId, channelId, fromNodeId, channelId});
        int count = this.sqlTemplate.update(this.getSql("copyOutgoingBatchesSql"), new Object[]{toNodeId, new Date(), fromNodeId, channelId, startBatchId});
        this.log.info("Copied {} outgoing batches for channel '{}' from node '{}' to node '{}'", new Object[]{count, channelId, fromNodeId, toNodeId});
    }

    @Override
    public void updateAbandonedRoutingBatches() {
        int count = this.sqlTemplate.queryForInt(this.getSql("countOutgoingBatchesWithStatusSql"), new Object[]{AbstractBatch.Status.RT.name()});
        if (count > 0) {
            this.log.info("Cleaning up {} batches that were abandoned by a failed or aborted attempt at routing", (Object)count);
            this.sqlTemplate.update(this.getSql("updateOutgoingBatchesStatusSql"), new Object[]{AbstractBatch.Status.OK.name(), AbstractBatch.Status.RT.name()});
        }
    }

    @Override
    public void updateOutgoingBatches(List<OutgoingBatch> outgoingBatches) {
        for (OutgoingBatch batch : outgoingBatches) {
            this.updateOutgoingBatch(batch);
        }
    }

    @Override
    public void updateOutgoingBatch(OutgoingBatch outgoingBatch) {
        ISqlTransaction transaction = null;
        try {
            transaction = this.sqlTemplate.startSqlTransaction();
            this.updateOutgoingBatch(transaction, outgoingBatch);
            transaction.commit();
        }
        catch (Error ex) {
            if (transaction != null) {
                transaction.rollback();
            }
            throw ex;
        }
        catch (RuntimeException ex) {
            if (transaction != null) {
                transaction.rollback();
            }
            throw ex;
        }
        finally {
            this.close(transaction);
        }
    }

    @Override
    public void updateCommonBatchExtractStatistics(OutgoingBatch outgoingBatch) {
        this.sqlTemplate.update(this.getSql("updateCommonBatchExtractStatsSql"), new Object[]{outgoingBatch.getByteCount(), outgoingBatch.getDataRowCount(), outgoingBatch.getDataInsertRowCount(), outgoingBatch.getDataUpdateRowCount(), outgoingBatch.getDataDeleteRowCount(), outgoingBatch.getOtherRowCount(), outgoingBatch.getExtractRowCount(), outgoingBatch.getExtractInsertRowCount(), outgoingBatch.getExtractUpdateRowCount(), outgoingBatch.getExtractDeleteRowCount(), outgoingBatch.getBatchId(), outgoingBatch.getNodeId()}, new int[]{2, 2, 2, 2, 2, 2, 2, 2, 2, 2, this.symmetricDialect.getSqlTypeForIds(), 12});
    }

    @Override
    public void updateOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgoingBatch) {
        outgoingBatch.setLastUpdatedTime(new Date());
        outgoingBatch.setLastUpdatedHostName(this.clusterService.getServerId());
        Object sql = this.getSql("updateOutgoingBatchSql");
        if (outgoingBatch.getStatus() != AbstractBatch.Status.OK) {
            sql = (String)sql + this.getSql("statusNotOk");
        }
        transaction.prepareAndExecute((String)sql, new Object[]{outgoingBatch.getStatus().name(), outgoingBatch.getLoadId(), outgoingBatch.isExtractJobFlag() ? 1 : 0, outgoingBatch.isLoadFlag() ? 1 : 0, outgoingBatch.isErrorFlag() ? 1 : 0, outgoingBatch.getByteCount(), outgoingBatch.getExtractCount(), outgoingBatch.getSentCount(), outgoingBatch.getLoadCount(), outgoingBatch.getDataRowCount(), outgoingBatch.getReloadRowCount(), outgoingBatch.getDataInsertRowCount(), outgoingBatch.getDataUpdateRowCount(), outgoingBatch.getDataDeleteRowCount(), outgoingBatch.getOtherRowCount(), outgoingBatch.getIgnoreCount(), outgoingBatch.getRouterMillis(), outgoingBatch.getNetworkMillis(), outgoingBatch.getFilterMillis(), outgoingBatch.getLoadMillis(), outgoingBatch.getExtractMillis(), outgoingBatch.getExtractStartTime(), outgoingBatch.getTransferStartTime(), outgoingBatch.getLoadStartTime(), outgoingBatch.getSqlState(), outgoingBatch.getSqlCode(), FormatUtils.abbreviateForLogging((String)outgoingBatch.getSqlMessage()), outgoingBatch.getFailedDataId(), outgoingBatch.getFailedLineNumber(), outgoingBatch.getLastUpdatedHostName(), new Date(), outgoingBatch.getSummary(), outgoingBatch.getLoadRowCount(), outgoingBatch.getLoadInsertRowCount(), outgoingBatch.getLoadUpdateRowCount(), outgoingBatch.getLoadDeleteRowCount(), outgoingBatch.getFallbackInsertCount(), outgoingBatch.getFallbackUpdateCount(), outgoingBatch.getConflictWinCount(), outgoingBatch.getConflictLoseCount(), outgoingBatch.getIgnoreRowCount(), outgoingBatch.getMissingDeleteCount(), outgoingBatch.getSkipCount(), outgoingBatch.getExtractRowCount(), outgoingBatch.getExtractInsertRowCount(), outgoingBatch.getExtractUpdateRowCount(), outgoingBatch.getExtractDeleteRowCount(), outgoingBatch.getTransformExtractMillis(), outgoingBatch.getTransformLoadMillis(), outgoingBatch.isBulkLoaderFlag() ? 1 : 0, outgoingBatch.getBatchId(), outgoingBatch.getNodeId()}, new int[]{1, this.symmetricDialect.getSqlTypeForIds(), 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 93, 93, 93, 12, 2, 12, this.symmetricDialect.getSqlTypeForIds(), 2, 12, 93, 12, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, this.symmetricDialect.getSqlTypeForIds(), 12});
    }

    @Override
    public void updateOutgoingBatches(ISqlTransaction transaction, List<OutgoingBatch> batches, int flushSize) {
        int[] types = new int[]{1, this.symmetricDialect.getSqlTypeForIds(), 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 93, 93, 93, 12, 2, 12, this.symmetricDialect.getSqlTypeForIds(), 2, 12, 93, 12, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, this.symmetricDialect.getSqlTypeForIds(), 12};
        int count = 0;
        transaction.prepare(this.getSql("updateOutgoingBatchSql"));
        for (OutgoingBatch outgoingBatch : batches) {
            outgoingBatch.setLastUpdatedTime(new Date());
            outgoingBatch.setLastUpdatedHostName(this.clusterService.getServerId());
            transaction.addRow((Object)this.getSql("updateOutgoingBatchSql"), new Object[]{outgoingBatch.getStatus().name(), outgoingBatch.getLoadId(), outgoingBatch.isExtractJobFlag() ? 1 : 0, outgoingBatch.isLoadFlag() ? 1 : 0, outgoingBatch.isErrorFlag() ? 1 : 0, outgoingBatch.getByteCount(), outgoingBatch.getExtractCount(), outgoingBatch.getSentCount(), outgoingBatch.getLoadCount(), outgoingBatch.getDataRowCount(), outgoingBatch.getReloadRowCount(), outgoingBatch.getDataInsertRowCount(), outgoingBatch.getDataUpdateRowCount(), outgoingBatch.getDataDeleteRowCount(), outgoingBatch.getOtherRowCount(), outgoingBatch.getIgnoreCount(), outgoingBatch.getRouterMillis(), outgoingBatch.getNetworkMillis(), outgoingBatch.getFilterMillis(), outgoingBatch.getLoadMillis(), outgoingBatch.getExtractMillis(), outgoingBatch.getExtractStartTime(), outgoingBatch.getTransferStartTime(), outgoingBatch.getLoadStartTime(), outgoingBatch.getSqlState(), outgoingBatch.getSqlCode(), FormatUtils.abbreviateForLogging((String)outgoingBatch.getSqlMessage()), outgoingBatch.getFailedDataId(), outgoingBatch.getFailedLineNumber(), outgoingBatch.getLastUpdatedHostName(), new Date(), outgoingBatch.getSummary(), outgoingBatch.getLoadRowCount(), outgoingBatch.getLoadInsertRowCount(), outgoingBatch.getLoadUpdateRowCount(), outgoingBatch.getLoadDeleteRowCount(), outgoingBatch.getFallbackInsertCount(), outgoingBatch.getFallbackUpdateCount(), outgoingBatch.getConflictWinCount(), outgoingBatch.getConflictLoseCount(), outgoingBatch.getIgnoreRowCount(), outgoingBatch.getMissingDeleteCount(), outgoingBatch.getSkipCount(), outgoingBatch.getExtractRowCount(), outgoingBatch.getExtractInsertRowCount(), outgoingBatch.getExtractUpdateRowCount(), outgoingBatch.getExtractDeleteRowCount(), outgoingBatch.getTransformExtractMillis(), outgoingBatch.getTransformLoadMillis(), outgoingBatch.isBulkLoaderFlag() ? 1 : 0, outgoingBatch.getBatchId(), outgoingBatch.getNodeId()}, types);
            if (++count < flushSize) continue;
            transaction.flush();
            count = 0;
        }
        transaction.flush();
    }

    @Override
    public void updateOutgoingBatchStatus(ISqlTransaction transaction, AbstractBatch.Status status, String nodeId, long startBatchId, long endBatchId) {
        transaction.prepareAndExecute(this.getSql("updateOutgoingBatchStatusSql"), new Object[]{status.name(), new Date(), this.clusterService.getServerId(), nodeId, startBatchId, endBatchId}, new int[]{1, 93, 12, 12, this.symmetricDialect.getSqlTypeForIds(), this.symmetricDialect.getSqlTypeForIds()});
    }

    @Override
    public void updateOutgoingSetupBatchStatusByStatus(ISqlTransaction transaction, String targetNodeId, long loadId, long maxBatchId, String fromStatus, String toStatus) {
        transaction.prepareAndExecute(this.getSql("updateOutgoingSetupBatchStatusByStatus"), new Object[]{toStatus, new Date(), this.clusterService.getServerId(), targetNodeId, loadId, fromStatus, maxBatchId}, new int[]{1, 93, 12, 12, 2, 1, 2});
    }

    @Override
    public void updateOutgoingLoadBatchStatusByStatus(ISqlTransaction transaction, String targetNodeId, long loadId, long startDataBatchId, long endDataBatchId, String fromStatus, String toStatus) {
        transaction.prepareAndExecute(this.getSql("updateOutgoingLoadBatchStatusByStatus"), new Object[]{toStatus, new Date(), this.clusterService.getServerId(), targetNodeId, loadId, fromStatus, startDataBatchId, endDataBatchId}, new int[]{1, 93, 12, 12, 2, 1, 2, 2});
    }

    @Override
    public void updateOutgoingFinalizeBatchStatusByStatus(ISqlTransaction transaction, String targetNodeId, long loadId, long minBatchId, String fromStatus, String toStatus) {
        transaction.prepareAndExecute(this.getSql("updateOutgoingFinalizeBatchStatusByStatus"), new Object[]{toStatus, new Date(), this.clusterService.getServerId(), targetNodeId, loadId, fromStatus, minBatchId}, new int[]{1, 93, 12, 12, 2, 1, 2});
    }

    @Override
    public void insertOutgoingBatch(OutgoingBatch outgoingBatch) {
        ISqlTransaction transaction = null;
        try {
            transaction = this.sqlTemplate.startSqlTransaction();
            this.insertOutgoingBatch(transaction, outgoingBatch);
            transaction.commit();
        }
        catch (Error ex) {
            if (transaction != null) {
                transaction.rollback();
            }
            throw ex;
        }
        catch (RuntimeException ex) {
            if (transaction != null) {
                transaction.rollback();
            }
            throw ex;
        }
        finally {
            this.close(transaction);
        }
    }

    @Override
    public void insertOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgoingBatch) {
        outgoingBatch.setLastUpdatedHostName(this.clusterService.getServerId());
        long batchId = outgoingBatch.getBatchId();
        if (batchId <= 0L) {
            batchId = this.platform.supportsMultiThreadedTransactions() ? this.sequenceService.nextVal("outgoing_batch") : this.sequenceService.nextVal(transaction, "outgoing_batch");
        }
        transaction.prepareAndExecute(this.getSql("insertOutgoingBatchSql"), new Object[]{batchId, outgoingBatch.getNodeId(), outgoingBatch.getChannelId(), outgoingBatch.getStatus().name(), outgoingBatch.getLoadId(), outgoingBatch.isExtractJobFlag() ? 1 : 0, outgoingBatch.isLoadFlag() ? 1 : 0, outgoingBatch.isCommonFlag() ? 1 : 0, outgoingBatch.getReloadRowCount(), outgoingBatch.getOtherRowCount(), outgoingBatch.getDataUpdateRowCount(), outgoingBatch.getDataInsertRowCount(), outgoingBatch.getDataDeleteRowCount(), outgoingBatch.getLastUpdatedHostName(), new Date(), new Date(), outgoingBatch.getCreateBy(), outgoingBatch.getSummary(), outgoingBatch.getDataRowCount()});
        outgoingBatch.setBatchId(batchId);
    }

    @Override
    public void insertOutgoingBatches(ISqlTransaction transaction, List<OutgoingBatch> batches, int flushSize, boolean isCommon) {
        long batchId = 0L;
        int count = 0;
        int size = isCommon ? 1 : batches.size();
        batchId = this.platform.supportsMultiThreadedTransactions() ? this.sequenceService.nextRange("outgoing_batch", size) : this.sequenceService.nextRange(transaction, "outgoing_batch", size);
        transaction.prepare(this.getSql("insertOutgoingBatchSql"));
        for (OutgoingBatch batch : batches) {
            batch.setLastUpdatedHostName(this.clusterService.getServerId());
            batch.setBatchId(batchId);
            transaction.addRow((Object)batch, new Object[]{batch.getBatchId(), batch.getNodeId(), batch.getChannelId(), batch.getStatus().name(), batch.getLoadId(), batch.isExtractJobFlag() ? 1 : 0, batch.isLoadFlag() ? 1 : 0, batch.isCommonFlag() ? 1 : 0, batch.getReloadRowCount(), batch.getOtherRowCount(), batch.getDataUpdateRowCount(), batch.getDataInsertRowCount(), batch.getDataDeleteRowCount(), batch.getLastUpdatedHostName(), new Date(), new Date(), batch.getCreateBy(), batch.getSummary(), batch.getDataRowCount()}, new int[]{this.symmetricDialect.getSqlTypeForIds(), 12, 12, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 12, 93, 93, 12, 12, 2});
            if (!isCommon) {
                ++batchId;
            }
            if (++count < flushSize) continue;
            transaction.flush();
            count = 0;
        }
        transaction.flush();
    }

    @Override
    public OutgoingBatch findOutgoingBatch(long batchId, String nodeId) {
        List list = null;
        list = StringUtils.isNotBlank((CharSequence)nodeId) ? this.sqlTemplateDirty.query(this.getSql("selectOutgoingBatchPrefixSql", "findOutgoingBatchSql"), (ISqlRowMapper)new OutgoingBatchMapper(true), new Object[]{batchId, nodeId}, new int[]{this.symmetricDialect.getSqlTypeForIds(), 12}) : this.sqlTemplateDirty.query(this.getSql("selectOutgoingBatchPrefixSql", "findOutgoingBatchByIdOnlySql"), (ISqlRowMapper)new OutgoingBatchMapper(true), new Object[]{batchId}, new int[]{this.symmetricDialect.getSqlTypeForIds()});
        if (list != null && list.size() > 0) {
            return (OutgoingBatch)list.get(0);
        }
        return null;
    }

    @Override
    public int countOutgoingBatchesInError() {
        return this.sqlTemplateDirty.queryForInt(this.getSql("countOutgoingBatchesErrorsSql"), new Object[0]);
    }

    @Override
    public int countOutgoingBatchesInError(String channelId) {
        return this.sqlTemplateDirty.queryForInt(this.getSql("countOutgoingBatchesErrorsOnChannelSql"), new Object[]{channelId});
    }

    @Override
    public Date getOutgoingBatchesLatestUpdateSql() {
        return (Date)this.sqlTemplateDirty.queryForObject(this.getSql("getOutgoingBatchesLatestUpdateSql"), Date.class, new Object[0]);
    }

    @Override
    public int countOutgoingBatchesUnsent() {
        return this.sqlTemplateDirty.queryForInt(this.getSql("countOutgoingBatchesUnsentSql"), new Object[0]);
    }

    @Override
    public int[] countOutgoingNonSystemBatchesRowsUnsent() {
        int[] batchesRows = new int[2];
        for (Row row : this.sqlTemplateDirty.query(this.getSql("countOutgoingNonSystemBatchesUnsentSql"))) {
            batchesRows[0] = row.getInt("batch_count");
            batchesRows[1] = row.getInt("row_count");
        }
        return batchesRows;
    }

    @Override
    public int countOutgoingBatchesUnsent(String channelId) {
        return this.sqlTemplateDirty.queryForInt(this.getSql("countOutgoingBatchesUnsentOnChannelSql"), new Object[]{channelId});
    }

    @Override
    public int countOutgoingBatchesUnsentHeartbeat() {
        return this.sqlTemplateDirty.queryForInt(this.getSql("countOutgoingBatchesUnsentHeartbeat"), new Object[0]);
    }

    @Override
    public Map<String, Integer> countOutgoingBatchesPendingByChannel(String nodeId) {
        List rows = this.sqlTemplateDirty.query(this.getSql("countOutgoingBatchesByChannelSql"), new Object[]{nodeId});
        HashMap<String, Integer> results = new HashMap<String, Integer>();
        if (rows != null && !rows.isEmpty()) {
            for (Row row : rows) {
                results.put(row.getString("channel_id"), row.getInt("batch_count"));
            }
        }
        Set<String> channelIds = this.configurationService.getChannels(false).keySet();
        for (String channelId : channelIds) {
            if (results.containsKey(channelId) || "heartbeat".equals(channelId)) continue;
            results.put(channelId, 0);
        }
        return results;
    }

    @Override
    public int countUnsentBatchesByTargetNode(String nodeId, boolean includeHeartbeats) {
        if (includeHeartbeats) {
            return this.sqlTemplateDirty.queryForInt(this.getSql("countOutgoingBatchesByTargetNodeSql"), new Object[]{nodeId});
        }
        return this.sqlTemplateDirty.queryForInt(this.getSql("countOutgoingBatchesByTargetNodeExcludingHeartbeatsSql"), new Object[]{nodeId});
    }

    @Override
    public long countUnsentRowsByTargetNode(String nodeId) {
        return this.sqlTemplateDirty.queryForLong(this.getSql("countOutgoingRowsByTargetNodeSql"), new Object[]{nodeId});
    }

    @Override
    public int countOutgoingBatches(List<String> nodeIds, List<String> channels, List<AbstractBatch.Status> statuses, List<Long> loads) {
        HashMap<String, List<String>> params = new HashMap<String, List<String>>();
        params.put("NODES", nodeIds);
        params.put("CHANNELS", channels);
        params.put("STATUSES", this.toStringList(statuses));
        return this.sqlTemplateDirty.queryForInt(this.getSql("selectCountBatchesPrefixSql", this.buildBatchWhere(nodeIds, channels, statuses, loads, null)), params);
    }

    @Override
    public List<OutgoingBatch> listOutgoingBatches(List<String> nodeIds, List<String> channels, List<AbstractBatch.Status> statuses, List<Long> loads, long startAtBatchId, Date startAtLastUpdateTime, int maxRowsToRetrieve, boolean ascending) {
        String where = this.buildBatchWhere(nodeIds, channels, statuses, loads, startAtLastUpdateTime);
        HashMap<String, Object> params = new HashMap<String, Object>();
        params.put("NODES", nodeIds);
        params.put("CHANNELS", channels);
        params.put("STATUSES", this.toStringList(statuses));
        params.put("LAST_UPDATE_TIME", startAtLastUpdateTime);
        params.put("LOADS", loads);
        String startAtBatchIdSql = null;
        if (startAtBatchId > 0L) {
            if (StringUtils.isBlank((CharSequence)where)) {
                where = " where 1=1 ";
            }
            params.put("BATCH_ID", startAtBatchId);
            startAtBatchIdSql = " and batch_id = :BATCH_ID ";
        }
        String sql = this.getSql("selectOutgoingBatchPrefixSql", where, startAtBatchIdSql, ascending ? " order by batch_id asc" : " order by batch_id desc");
        return this.sqlTemplateDirty.query(sql, maxRowsToRetrieve, (ISqlRowMapper)new OutgoingBatchMapper(true), params);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<OutgoingBatch> listOutgoingBatchesWithLimit(int offset, int limit, List<FilterCriterion> filter, String orderColumn, String orderDirection) {
        List batchList;
        String where = filter != null ? this.buildBatchWhereFromFilter(filter) : null;
        Map<Object, Object> params = filter != null ? this.buildBatchParams(filter) : new HashMap();
        String orderBy = this.buildBatchOrderBy(orderColumn, orderDirection);
        String sql = this.getSql("selectOutgoingBatchPrefixSql", where, orderBy);
        if (this.platform.supportsLimitOffset()) {
            sql = this.platform.massageForLimitOffset(sql, limit, offset);
            batchList = this.sqlTemplateDirty.query(sql, Integer.MAX_VALUE, (ISqlRowMapper)new OutgoingBatchMapper(true), params);
        } else {
            try (ISqlReadCursor cursor = this.sqlTemplateDirty.queryForCursor(sql, (ISqlRowMapper)new OutgoingBatchMapper(true), params);){
                OutgoingBatch next = null;
                batchList = new ArrayList();
                int rowCount = 0;
                do {
                    if ((next = (OutgoingBatch)cursor.next()) != null) {
                        if (offset <= rowCount && rowCount < limit + offset) {
                            batchList.add((OutgoingBatch)next);
                        }
                        ++rowCount;
                    }
                    if (rowCount < limit + offset) continue;
                    break;
                } while (next != null);
            }
        }
        int maxBatches = this.parameterService.getInt("batch.screen.max.to.select");
        int batchesToReturn = maxBatches - offset;
        if (maxBatches > 0 && limit + offset > maxBatches && batchesToReturn < batchList.size() - 1) {
            batchList = batchList.subList(0, batchesToReturn);
        }
        return batchList;
    }

    @Override
    public int countOutgoingBatchesWithLimit(List<FilterCriterion> filter) {
        String where = filter != null ? this.buildBatchWhereFromFilter(filter) : null;
        Map<Object, Object> params = filter != null ? this.buildBatchParams(filter) : new HashMap();
        String sql = this.getSql("selectCountBatchesPrefixSql", where);
        int count = this.sqlTemplateDirty.queryForInt(sql, params);
        int maxBatches = this.parameterService.getInt("batch.screen.max.to.select");
        return maxBatches > 0 ? Math.min(count, maxBatches) : count;
    }

    protected List<String> toStringList(List<AbstractBatch.Status> statuses) {
        ArrayList<String> statusStrings = new ArrayList<String>(statuses.size());
        for (AbstractBatch.Status status : statuses) {
            statusStrings.add(status.name());
        }
        return statusStrings;
    }

    protected boolean containsOnlyStatus(AbstractBatch.Status status, List<AbstractBatch.Status> statuses) {
        return statuses.size() == 1 && statuses.get(0) == status;
    }

    @Override
    public OutgoingBatches getOutgoingBatches(String nodeId, boolean includeDisabledChannels) {
        return this.getOutgoingBatches(nodeId, null, includeDisabledChannels);
    }

    @Override
    public OutgoingBatches getOutgoingBatches(String nodeId, String channelThread, boolean includeDisabledChannels) {
        return this.getOutgoingBatches(nodeId, channelThread, null, null, includeDisabledChannels);
    }

    @Override
    public OutgoingBatches getOutgoingBatches(String nodeId, String channelThread, NodeGroupLinkAction eventAction, NodeGroupLinkAction defaultEventAction, boolean includeDisabledChannels) {
        long ts = System.currentTimeMillis();
        int maxNumberOfBatchesToSelect = this.parameterService.getInt("outgoing.batches.max.to.select", 1000);
        String sql = null;
        Object[] params = null;
        int[] types = null;
        QueueThread queueThread = new QueueThread(channelThread);
        if ((channelThread = queueThread.getQueueName()) != null && channelThread.equals("reload") && queueThread.isUsingThreading()) {
            sql = this.getSql("selectOutgoingBatchPrefixSql", "selectOutgoingBatchByThreadSql");
            params = new Object[]{nodeId, "reload", AbstractBatch.Status.RQ.name(), AbstractBatch.Status.NE.name(), AbstractBatch.Status.QY.name(), AbstractBatch.Status.SE.name(), AbstractBatch.Status.LD.name(), AbstractBatch.Status.ER.name(), AbstractBatch.Status.IG.name(), AbstractBatch.Status.RS.name(), queueThread.getThreadId()};
            types = new int[]{12, 12, 1, 1, 1, 1, 1, 1, 1, 1, 4};
            this.log.debug("Querying outgoing batches on reload for thread {}", (Object)queueThread.getThreadId());
        } else if (eventAction != null) {
            sql = eventAction.equals((Object)defaultEventAction) ? this.getSql("selectOutgoingBatchPrefixSql", "selectOutgoingBatchChannelActionNullSql") : this.getSql("selectOutgoingBatchPrefixSql", "selectOutgoingBatchChannelActionSql");
            params = new Object[]{eventAction.name(), nodeId, channelThread, AbstractBatch.Status.RQ.name(), AbstractBatch.Status.NE.name(), AbstractBatch.Status.QY.name(), AbstractBatch.Status.SE.name(), AbstractBatch.Status.LD.name(), AbstractBatch.Status.ER.name(), AbstractBatch.Status.IG.name(), AbstractBatch.Status.RS.name()};
            types = new int[]{1, 12, 12, 1, 1, 1, 1, 1, 1, 1, 1};
        } else if (channelThread != null) {
            sql = this.getSql("selectOutgoingBatchPrefixSql", "selectOutgoingBatchChannelSql");
            params = new Object[]{nodeId, channelThread, AbstractBatch.Status.RQ.name(), AbstractBatch.Status.NE.name(), AbstractBatch.Status.QY.name(), AbstractBatch.Status.SE.name(), AbstractBatch.Status.LD.name(), AbstractBatch.Status.ER.name(), AbstractBatch.Status.IG.name(), AbstractBatch.Status.RS.name()};
            types = new int[]{12, 12, 1, 1, 1, 1, 1, 1, 1, 1};
        } else {
            sql = this.getSql("selectOutgoingBatchPrefixSql", "selectOutgoingBatchSql");
            params = new Object[]{nodeId, AbstractBatch.Status.RQ.name(), AbstractBatch.Status.NE.name(), AbstractBatch.Status.QY.name(), AbstractBatch.Status.SE.name(), AbstractBatch.Status.LD.name(), AbstractBatch.Status.ER.name(), AbstractBatch.Status.IG.name(), AbstractBatch.Status.RS.name()};
            types = new int[]{12, 1, 1, 1, 1, 1, 1, 1, 1};
        }
        List list = this.sqlTemplateDirty.query(sql, maxNumberOfBatchesToSelect, (ISqlRowMapper)new OutgoingBatchMapper(includeDisabledChannels), params, types);
        OutgoingBatches batches = new OutgoingBatches(list);
        ArrayList<NodeChannel> channels = new ArrayList<NodeChannel>(this.configurationService.getNodeChannels(nodeId, true));
        batches.sortChannels(channels);
        List<IOutgoingBatchFilter> filters = this.extensionService.getExtensionPointList(IOutgoingBatchFilter.class);
        ArrayList<OutgoingBatch> keepers = new ArrayList<OutgoingBatch>();
        for (NodeChannel channel : channels) {
            List<OutgoingBatch> batchesForChannel = this.getBatchesForChannelWindows(batches, nodeId, channel, this.configurationService.getNodeGroupChannelWindows(this.parameterService.getNodeGroupId(), channel.getChannelId()));
            if (filters != null) {
                for (IOutgoingBatchFilter filter : filters) {
                    batchesForChannel = filter.filter(channel, batchesForChannel);
                }
            }
            if (!this.parameterService.is("dataextractor.enable") && !channel.getChannelId().equals("config") && !channel.getChannelId().equals("system")) continue;
            keepers.addAll(batchesForChannel);
        }
        batches.setBatches(keepers);
        long executeTimeInMs = System.currentTimeMillis() - ts;
        if (executeTimeInMs > 30000L) {
            this.log.info("Selecting {} outgoing batch rows for node {} on queue '{}' took {} ms", new Object[]{list.size(), nodeId, channelThread, executeTimeInMs});
        }
        return batches;
    }

    public List<OutgoingBatch> getBatchesForChannelWindows(OutgoingBatches batches, String targetNodeId, NodeChannel channel, List<NodeGroupChannelWindow> windows) {
        ArrayList<OutgoingBatch> keeping = new ArrayList<OutgoingBatch>();
        List<OutgoingBatch> current = batches.getBatches();
        if (current != null && current.size() > 0 && this.inTimeWindow(windows, targetNodeId)) {
            int maxBatchesToSend = channel.getMaxBatchToSend();
            for (OutgoingBatch outgoingBatch : current) {
                if (!channel.getChannelId().equals(outgoingBatch.getChannelId()) || maxBatchesToSend <= 0) continue;
                keeping.add(outgoingBatch);
                --maxBatchesToSend;
            }
        }
        return keeping;
    }

    public boolean inTimeWindow(List<NodeGroupChannelWindow> windows, String targetNodeId) {
        if (windows != null && windows.size() > 0) {
            for (NodeGroupChannelWindow window : windows) {
                String timezoneOffset = null;
                List<NodeHost> hosts = this.nodeService.findNodeHosts(targetNodeId);
                timezoneOffset = hosts.size() > 0 ? hosts.get(0).getTimezoneOffset() : AppUtils.getTimezoneOffset();
                if (!window.inTimeWindow(timezoneOffset)) continue;
                return true;
            }
            return false;
        }
        return true;
    }

    @Override
    public OutgoingBatches getOutgoingBatchRange(String nodeId, Date startDate, Date endDate, String ... channels) {
        OutgoingBatches batches = new OutgoingBatches();
        ArrayList<OutgoingBatch> batchList = new ArrayList<OutgoingBatch>();
        for (String channel : channels) {
            batchList.addAll(this.sqlTemplate.query(this.getSql("selectOutgoingBatchPrefixSql", "selectOutgoingBatchTimeRangeSql"), (ISqlRowMapper)new OutgoingBatchMapper(true), new Object[]{nodeId, channel, startDate, endDate}));
        }
        batches.setBatches(batchList);
        return batches;
    }

    @Override
    public OutgoingBatches getOutgoingBatchRange(long startBatchId, long endBatchId) {
        OutgoingBatches batches = new OutgoingBatches();
        batches.setBatches(this.sqlTemplate.query(this.getSql("selectOutgoingBatchPrefixSql", "selectOutgoingBatchRangeSql"), (ISqlRowMapper)new OutgoingBatchMapper(true), new Object[]{startBatchId, endBatchId}));
        return batches;
    }

    @Override
    public OutgoingBatches getOutgoingBatchByLoad(long loadId) {
        OutgoingBatches batches = new OutgoingBatches();
        batches.setBatches(this.sqlTemplate.query(this.getSql("selectOutgoingBatchPrefixSql", "selectOutgoingBatchLoadSql"), (ISqlRowMapper)new OutgoingBatchMapper(true), new Object[]{loadId}));
        return batches;
    }

    @Override
    public OutgoingBatches getOutgoingBatchByLoadRangeAndTable(long loadId, long startBatchId, long endBatchId, String tableName) {
        OutgoingBatches batches = new OutgoingBatches();
        batches.setBatches(this.sqlTemplate.query(this.getSql("selectOutgoingBatchPrefixSql", "selectOutgoingBatchLoadByBatchRangeByTableNameSql"), (ISqlRowMapper)new OutgoingBatchMapper(true), new Object[]{loadId, startBatchId, endBatchId, tableName}));
        return batches;
    }

    @Override
    public OutgoingBatches getOutgoingBatchErrors(int maxRows) {
        OutgoingBatches batches = new OutgoingBatches();
        batches.setBatches(this.sqlTemplateDirty.query(this.getSql("selectOutgoingBatchPrefixSql", "selectOutgoingBatchErrorsSql"), maxRows, (ISqlRowMapper)new OutgoingBatchMapper(true), null, null));
        return batches;
    }

    @Override
    public List<String> getNodesInError() {
        return this.sqlTemplate.query(this.getSql("selectNodesInErrorSql"), (ISqlRowMapper)new StringMapper(), new Object[0]);
    }

    public List<OutgoingBatch> getNextOutgoingBatchForEachNode() {
        return this.sqlTemplateDirty.query(this.getSql("getNextOutgoingBatchForEachNodeSql"), (ISqlRowMapper)new OutgoingBatchMapper(true, true), new Object[0]);
    }

    @Override
    public boolean isInitialLoadComplete(String nodeId) {
        return this.areAllLoadBatchesComplete(nodeId) && !this.isUnsentDataOnChannelForNode("system", nodeId);
    }

    @Override
    public boolean areAllLoadBatchesComplete(String nodeId) {
        NodeSecurity security = this.nodeService.findNodeSecurity(nodeId);
        if (security == null || security.isInitialLoadEnabled()) {
            return false;
        }
        List statuses = this.sqlTemplate.query(this.getSql("initialLoadStatusSql"), (ISqlRowMapper)new StringMapper(), new Object[]{nodeId, 1});
        if (statuses == null || statuses.size() == 0) {
            throw new RuntimeException("The initial load has not been started for " + nodeId);
        }
        for (String status : statuses) {
            if (AbstractBatch.Status.OK.name().equals(status)) continue;
            return false;
        }
        return true;
    }

    @Override
    public boolean isUnsentDataOnChannelForNode(String channelId, String nodeId) {
        int unsentCount = this.sqlTemplate.queryForInt(this.getSql("unsentBatchesForNodeIdChannelIdSql"), new Object[]{nodeId, channelId});
        return unsentCount > 0;
    }

    protected StringBuilder buildStatusList(Object[] args, AbstractBatch.Status ... statuses) {
        StringBuilder inList = new StringBuilder();
        for (int i = 0; i < statuses.length; ++i) {
            args[i] = statuses[i].name();
            inList.append("?,");
        }
        return inList;
    }

    @Override
    public List<OutgoingBatchSummary> findOutgoingBatchSummaryByNode(String nodeId, Date sinceCreateTime, AbstractBatch.Status ... statuses) {
        Object[] args = new Object[statuses.length + 1];
        args[args.length - 1] = nodeId;
        StringBuilder inList = this.buildStatusList(args, statuses);
        String sql = this.getSql("selectOutgoingBatchSummaryPrefixSql", "selectOutgoingBatchSummaryStatsPrefixSql", "whereStatusAndNodeGroupByStatusSql").replace(":STATUS_LIST", inList.substring(0, inList.length() - 1));
        return this.sqlTemplateDirty.query(sql, (ISqlRowMapper)new OutgoingBatchSummaryMapper(false, false), args);
    }

    @Override
    public List<OutgoingBatchSummary> findOutgoingBatchSummary(AbstractBatch.Status ... statuses) {
        Object[] args = new Object[statuses.length];
        StringBuilder inList = this.buildStatusList(args, statuses);
        String sql = this.getSql("selectOutgoingBatchSummaryByNodePrefixSql", "selectOutgoingBatchSummaryStatsPrefixSql", "whereStatusGroupByStatusAndNodeSql").replace(":STATUS_LIST", inList.substring(0, inList.length() - 1));
        return this.sqlTemplateDirty.query(sql, (ISqlRowMapper)new OutgoingBatchSummaryMapper(true, false), args);
    }

    @Override
    public List<OutgoingBatchSummary> findOutgoingBatchSummaryByChannel(AbstractBatch.Status ... statuses) {
        Object[] args = new Object[statuses.length];
        StringBuilder inList = this.buildStatusList(args, statuses);
        String sql = this.getSql("selectOutgoingBatchSummaryByNodeAndChannelPrefixSql", "selectOutgoingBatchSummaryStatsPrefixSql", "whereStatusGroupByStatusAndNodeAndChannelSql").replace(":STATUS_LIST", inList.substring(0, inList.length() - 1));
        return this.sqlTemplateDirty.query(sql, (ISqlRowMapper)new OutgoingBatchSummaryMapper(true, true), args);
    }

    @Override
    public List<Long> getAllBatches() {
        return this.sqlTemplateDirty.query(this.getSql("getAllBatchesSql"), (ISqlRowMapper)new LongMapper(), new Object[0]);
    }

    @Override
    public List<OutgoingBatch> getBatchesInProgress() {
        return this.sqlTemplateDirty.query(this.getSql("selectOutgoingBatchPrefixSql", "whereInProgressStatusSql"), (ISqlRowMapper)new OutgoingBatchMapper(true), new Object[]{AbstractBatch.Status.ER.name(), AbstractBatch.Status.LD.name(), AbstractBatch.Status.QY.name(), AbstractBatch.Status.RS.name(), AbstractBatch.Status.SE.name()});
    }

    @Override
    public Collection<String> getReadyQueues(String nodeId, boolean refreshCache) {
        Collection<Object> queues = null;
        if (this.parameterService.is("sync.use.ready.queues")) {
            queues = this.cacheManager.getReadyQueues(refreshCache).get(nodeId);
        }
        if (queues == null) {
            queues = new HashSet();
        }
        return queues;
    }

    @Override
    public Map<String, Collection<String>> getReadyQueues(boolean refreshCache) {
        Map<Object, Object> readyQueuesMap = null;
        readyQueuesMap = this.parameterService.is("sync.use.ready.queues") ? this.cacheManager.getReadyQueues(refreshCache) : new HashMap();
        return readyQueuesMap;
    }

    @Override
    public Map<String, ReadyChannels> getReadyChannelsFromDb() {
        List rows = this.sqlTemplateDirty.query(this.getSql("selectReadyChannels"), new Object[]{AbstractBatch.Status.NE.name(), AbstractBatch.Status.QY.name(), AbstractBatch.Status.SE.name(), AbstractBatch.Status.LD.name(), AbstractBatch.Status.ER.name(), AbstractBatch.Status.IG.name(), AbstractBatch.Status.RS.name()});
        HashMap<String, ReadyChannels> readyChannelMap = new HashMap<String, ReadyChannels>();
        for (Row row : rows) {
            String nodeId = row.getString("node_id");
            String channelId = row.getString("channel_id");
            Integer threadId = row.getInteger("thread_id");
            ReadyChannels channels = (ReadyChannels)readyChannelMap.get(nodeId);
            if (channels == null) {
                channels = new ReadyChannels(nodeId);
                readyChannelMap.put(nodeId, channels);
            }
            channels.add(channelId, threadId);
        }
        return readyChannelMap;
    }

    class OutgoingBatchMapper
    implements ISqlRowMapper<OutgoingBatch> {
        private boolean statusOnly = false;
        private boolean includeDisabledChannels = false;
        private Map<String, Channel> channels;

        public OutgoingBatchMapper(boolean includeDisabledChannels, boolean statusOnly) {
            this.includeDisabledChannels = includeDisabledChannels;
            this.statusOnly = statusOnly;
            this.channels = OutgoingBatchService.this.configurationService.getChannels(false);
        }

        public OutgoingBatchMapper(boolean includeDisabledChannels) {
            this(includeDisabledChannels, false);
        }

        public OutgoingBatch mapRow(Row rs) {
            String channelId = rs.getString("channel_id");
            Channel channel = this.channels.get(channelId);
            if (channel != null && (this.includeDisabledChannels || channel.isEnabled())) {
                OutgoingBatch batch = new OutgoingBatch();
                batch.setNodeId(rs.getString("node_id"));
                batch.setStatusFromString(rs.getString("status"));
                batch.setBatchId(rs.getLong("batch_id"));
                if (!this.statusOnly) {
                    batch.setChannelId(channelId);
                    batch.setByteCount(rs.getLong("byte_count"));
                    batch.setExtractCount(rs.getLong("extract_count"));
                    batch.setSentCount(rs.getLong("sent_count"));
                    batch.setLoadCount(rs.getLong("load_count"));
                    batch.setDataRowCount(rs.getLong("data_row_count"));
                    batch.setLoadRowCount(rs.getLong("load_row_count"));
                    batch.setExtractRowCount(rs.getLong("extract_row_count"));
                    batch.setReloadRowCount(rs.getLong("reload_row_count"));
                    batch.setDataInsertRowCount(rs.getLong("data_insert_row_count"));
                    batch.setDataUpdateRowCount(rs.getLong("data_update_row_count"));
                    batch.setDataDeleteRowCount(rs.getLong("data_delete_row_count"));
                    batch.setLoadInsertRowCount(rs.getLong("load_insert_row_count"));
                    batch.setLoadUpdateRowCount(rs.getLong("load_update_row_count"));
                    batch.setLoadDeleteRowCount(rs.getLong("load_delete_row_count"));
                    batch.setExtractInsertRowCount(rs.getLong("extract_insert_row_count"));
                    batch.setExtractUpdateRowCount(rs.getLong("extract_update_row_count"));
                    batch.setExtractDeleteRowCount(rs.getLong("extract_delete_row_count"));
                    batch.setOtherRowCount(rs.getLong("other_row_count"));
                    batch.setIgnoreCount(rs.getLong("ignore_count"));
                    batch.setRouterMillis(rs.getLong("router_millis"));
                    batch.setNetworkMillis(rs.getLong("network_millis"));
                    batch.setFilterMillis(rs.getLong("filter_millis"));
                    batch.setLoadMillis(rs.getLong("load_millis"));
                    batch.setExtractMillis(rs.getLong("extract_millis"));
                    batch.setTransformExtractMillis(rs.getLong("transform_extract_millis"));
                    batch.setTransformLoadMillis(rs.getLong("transform_load_millis"));
                    batch.setExtractStartTime(rs.getDateTime("extract_start_time"));
                    batch.setTransferStartTime(rs.getDateTime("transfer_start_time"));
                    batch.setLoadStartTime(rs.getDateTime("load_start_time"));
                    batch.setSqlState(rs.getString("sql_state"));
                    batch.setSqlCode(rs.getInt("sql_code"));
                    batch.setSqlMessage(rs.getString("sql_message"));
                    batch.setFailedDataId(rs.getLong("failed_data_id"));
                    batch.setFailedLineNumber(rs.getLong("failed_line_number"));
                    batch.setLastUpdatedHostName(rs.getString("last_update_hostname"));
                    batch.setLastUpdatedTime(rs.getDateTime("last_update_time"));
                    batch.setCreateTime(rs.getDateTime("create_time"));
                    batch.setLoadFlag(rs.getBoolean("load_flag"));
                    batch.setErrorFlag(rs.getBoolean("error_flag"));
                    batch.setCommonFlag(rs.getBoolean("common_flag"));
                    batch.setExtractJobFlag(rs.getBoolean("extract_job_flag"));
                    batch.setLoadId(rs.getLong("load_id"));
                    batch.setCreateBy(rs.getString("create_by"));
                    batch.setSummary(rs.getString("summary"));
                    batch.setFallbackInsertCount(rs.getLong("fallback_insert_count"));
                    batch.setFallbackUpdateCount(rs.getLong("fallback_update_count"));
                    batch.setConflictWinCount(rs.getLong("conflict_win_count"));
                    batch.setConflictLoseCount(rs.getLong("conflict_lose_count"));
                    batch.setIgnoreRowCount(rs.getLong("ignore_row_count"));
                    batch.setMissingDeleteCount(rs.getLong("missing_delete_count"));
                    batch.setSkipCount(rs.getLong("skip_count"));
                    batch.setBulkLoaderFlag(rs.getBoolean("bulk_loader_flag"));
                    batch.setThreadId(rs.getInteger("thread_id"));
                }
                return batch;
            }
            return null;
        }
    }

    static class OutgoingBatchSummaryMapper
    implements ISqlRowMapper<OutgoingBatchSummary> {
        boolean withNode = false;
        boolean withChannel = false;

        public OutgoingBatchSummaryMapper(boolean withNode, boolean withChannel) {
            this.withNode = withNode;
            this.withChannel = withChannel;
        }

        public OutgoingBatchSummary mapRow(Row rs) {
            OutgoingBatchSummary summary = new OutgoingBatchSummary();
            if (this.withNode) {
                summary.setNodeId(rs.getString("node_id"));
            }
            if (this.withChannel) {
                summary.setChannel(rs.getString("channel_id"));
            }
            summary.setBatchCount(rs.getInt("batches"));
            summary.setDataCount(rs.getInt("data"));
            summary.setStatus(AbstractBatch.Status.valueOf(rs.getString("status")));
            summary.setOldestBatchCreateTime(rs.getDateTime("oldest_batch_time"));
            summary.setLastBatchUpdateTime(rs.getDateTime("last_update_time"));
            summary.setTotalBytes(rs.getLong("total_bytes"));
            summary.setTotalMillis(rs.getLong("total_millis"));
            summary.setErrorFlag(rs.getBoolean("error_flag"));
            summary.setMinBatchId(rs.getLong("batch_id"));
            summary.setInsertCount(rs.getInt("insert_event_count"));
            summary.setUpdateCount(rs.getInt("update_event_count"));
            summary.setDeleteCount(rs.getInt("delete_event_count"));
            summary.setOtherCount(rs.getInt("other_event_count"));
            summary.setOtherCount(rs.getInt("reload_event_count"));
            summary.setRouterMillis(rs.getLong("total_router_millis"));
            summary.setExtractMillis(rs.getLong("total_extract_millis"));
            summary.setTransferMillis(rs.getLong("total_network_millis"));
            summary.setLoadMillis(rs.getLong("total_load_millis"));
            return summary;
        }
    }
}

