[CARBONDATA-1223] Fixing empty file creation in batch sort loading
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b4e74ebc Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b4e74ebc Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b4e74ebc Branch: refs/heads/branch-1.1 Commit: b4e74ebcff2cc8af2bc6be209629fc56816ae963 Parents: 39bf63c Author: dhatchayani <dhatcha.offic...@gmail.com> Authored: Fri Jun 23 19:24:47 2017 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Sat Jun 24 10:26:03 2017 +0530 ---------------------------------------------------------------------- .../UnsafeBatchParallelReadMergeSorterImpl.java | 16 +++++++++--- .../UnsafeSingleThreadFinalSortFilesMerger.java | 26 -------------------- .../steps/DataWriterBatchProcessorStepImpl.java | 18 ++++++++------ 3 files changed, 23 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4e74ebc/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java index cc7929d..84d45b3 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java @@ -154,7 +154,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter sortDataRows.getSortDataRow().addRowBatchWithOutSync(buffer, i); rowCounter.getAndAdd(i); if (!sortDataRows.getSortDataRow().canAdd()) { - sortDataRows.finish(); + sortDataRows.finish(false); sortDataRows.createSortDataRows(); } } @@ -245,7 +245,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter return sortDataRow; } - public void finish() { + public void finish(boolean isFinalAttempt) { try { // if the mergerQue is empty and some CarbonDataLoadingException exception has occurred // then set stop process to true in the finalmerger instance @@ -253,6 +253,9 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter && threadStatusObserver.getThrowable() != null && threadStatusObserver .getThrowable() instanceof CarbonDataLoadingException) { finalMerger.setStopProcess(true); + if (isFinalAttempt) { + iteratorCount.decrementAndGet(); + } mergerQueue.put(finalMerger); return; } @@ -262,6 +265,9 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]), unsafeIntermediateFileMerger.getMergedPages()); unsafeIntermediateFileMerger.close(); + if (isFinalAttempt) { + iteratorCount.decrementAndGet(); + } mergerQueue.put(finalMerger); sortDataRow = null; unsafeIntermediateFileMerger = null; @@ -283,8 +289,10 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter public void finishThread() { synchronized (lock) { - if (iteratorCount.decrementAndGet() <= 0) { - finish(); + if (iteratorCount.get() <= 1) { + finish(true); + } else { + iteratorCount.decrementAndGet(); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4e74ebc/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java index cd6b321..44f29d1 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java @@ -55,25 +55,6 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec private SortParameters parameters; /** - * number of measures - */ - private int measureCount; - - /** - * number of dimensionCount - */ - private int dimensionCount; - - /** - * number of complexDimensionCount - */ - private int noDictionaryCount; - - private int complexDimensionCount; - - private boolean[] isNoDictionaryDimensionColumn; - - /** * tempFileLocation */ private String tempFileLocation; @@ -85,13 +66,6 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec public UnsafeSingleThreadFinalSortFilesMerger(SortParameters parameters, String tempFileLocation) { this.parameters = parameters; - // set measure and dimension count - this.measureCount = parameters.getMeasureColCount(); - this.dimensionCount = parameters.getDimColCount(); - this.complexDimensionCount = parameters.getComplexDimColCount(); - - this.noDictionaryCount = parameters.getNoDictionaryCount(); - this.isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn(); this.tempFileLocation = tempFileLocation; this.tableName = parameters.getTableName(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4e74ebc/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java index ae2b625..e6f61f6 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java @@ -104,13 +104,16 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS int k = 0; while (iterator.hasNext()) { CarbonRowBatch next = iterator.next(); - CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel - .createCarbonFactDataHandlerModel(configuration, storeLocation, i, k++); - CarbonFactHandler dataHandler = CarbonFactHandlerFactory - .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR); - dataHandler.initialise(); - processBatch(next, dataHandler, model.getSegmentProperties()); - finish(tableName, dataHandler); + // If no rows from merge sorter, then don't create a file in fact column handler + if (next.hasNext()) { + CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel + .createCarbonFactDataHandlerModel(configuration, storeLocation, i, k++); + CarbonFactHandler dataHandler = CarbonFactHandlerFactory + .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR); + dataHandler.initialise(); + processBatch(next, dataHandler, model.getSegmentProperties()); + finish(tableName, dataHandler); + } } i++; } @@ -181,6 +184,7 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS outputRow[len - 1] = keyGenerator.generateKey(row.getIntArray(dimsArrayIndex)); dataHandler.addDataToStore(outputRow); } + batch.close(); rowCounter.getAndAdd(batchSize); }