Repository: carbondata Updated Branches: refs/heads/encoding_override 30ef14e0d -> 3ecb3ec58
[CARBONDATA-1223] Fixing empty file creation in batch sort loading Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0205fa69 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0205fa69 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0205fa69 Branch: refs/heads/encoding_override Commit: 0205fa6991e2b1d3f2a807121d15a6eeb9f07714 Parents: 30ef14e Author: dhatchayani <dhatcha.offic...@gmail.com> Authored: Fri Jun 23 19:24:47 2017 +0530 Committer: dhatchayani <dhatcha.offic...@gmail.com> Committed: Fri Jun 23 19:24:51 2017 +0530 ---------------------------------------------------------------------- .../UnsafeBatchParallelReadMergeSorterImpl.java | 16 +++++++++--- .../UnsafeSingleThreadFinalSortFilesMerger.java | 26 -------------------- .../steps/DataWriterBatchProcessorStepImpl.java | 18 ++++++++------ 3 files changed, 23 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/0205fa69/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java index 20a560d..a8d1eef 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java @@ -155,7 +155,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter sortDataRows.getSortDataRow().addRowBatchWithOutSync(buffer, i); rowCounter.getAndAdd(i); if (!sortDataRows.getSortDataRow().canAdd()) { - sortDataRows.finish(); + sortDataRows.finish(false); sortDataRows.createSortDataRows(); } } @@ -246,7 +246,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter return sortDataRow; } - public void finish() { + public void finish(boolean isFinalAttempt) { try { // if the mergerQue is empty and some CarbonDataLoadingException exception has occurred // then set stop process to true in the finalmerger instance @@ -254,6 +254,9 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter && threadStatusObserver.getThrowable() != null && threadStatusObserver .getThrowable() instanceof CarbonDataLoadingException) { finalMerger.setStopProcess(true); + if (isFinalAttempt) { + iteratorCount.decrementAndGet(); + } mergerQueue.put(finalMerger); return; } @@ -263,6 +266,9 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]), unsafeIntermediateFileMerger.getMergedPages()); unsafeIntermediateFileMerger.close(); + if (isFinalAttempt) { + iteratorCount.decrementAndGet(); + } mergerQueue.put(finalMerger); sortDataRow = null; unsafeIntermediateFileMerger = null; @@ -284,8 +290,10 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter public void finishThread() { synchronized (lock) { - if (iteratorCount.decrementAndGet() <= 0) { - finish(); + if (iteratorCount.get() <= 1) { + finish(true); + } else { + iteratorCount.decrementAndGet(); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/0205fa69/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java index acb976f..eb7af47 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java @@ -55,25 +55,6 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec private SortParameters parameters; /** - * number of measures - */ - private int measureCount; - - /** - * number of dimensionCount - */ - private int dimensionCount; - - /** - * number of complexDimensionCount - */ - private int noDictionaryCount; - - private int complexDimensionCount; - - private boolean[] isNoDictionaryDimensionColumn; - - /** * tempFileLocation */ private String tempFileLocation; @@ -85,13 +66,6 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec public UnsafeSingleThreadFinalSortFilesMerger(SortParameters parameters, String tempFileLocation) { this.parameters = parameters; - // set measure and dimension count - this.measureCount = parameters.getMeasureColCount(); - this.dimensionCount = parameters.getDimColCount(); - this.complexDimensionCount = parameters.getComplexDimColCount(); - - this.noDictionaryCount = parameters.getNoDictionaryCount(); - this.isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn(); this.tempFileLocation = tempFileLocation; this.tableName = parameters.getTableName(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/0205fa69/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java index d58835c..46c1020 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java @@ -82,13 +82,16 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS int k = 0; while (iterator.hasNext()) { CarbonRowBatch next = iterator.next(); - CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel - .createCarbonFactDataHandlerModel(configuration, storeLocation, i, k++); - CarbonFactHandler dataHandler = CarbonFactHandlerFactory - .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR); - dataHandler.initialise(); - processBatch(next, dataHandler); - finish(tableName, dataHandler); + // If no rows from merge sorter, then don't create a file in fact column handler + if (next.hasNext()) { + CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel + .createCarbonFactDataHandlerModel(configuration, storeLocation, i, k++); + CarbonFactHandler dataHandler = CarbonFactHandlerFactory + .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR); + dataHandler.initialise(); + processBatch(next, dataHandler); + finish(tableName, dataHandler); + } } i++; } @@ -137,6 +140,7 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS dataHandler.addDataToStore(row); batchSize++; } + batch.close(); rowCounter.getAndAdd(batchSize); }