[CARBONDATA-1223] Fixing empty file creation in batch sort loading

Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b4e74ebc
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b4e74ebc
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b4e74ebc

Branch: refs/heads/branch-1.1
Commit: b4e74ebcff2cc8af2bc6be209629fc56816ae963
Parents: 39bf63c
Author: dhatchayani <dhatcha.offic...@gmail.com>
Authored: Fri Jun 23 19:24:47 2017 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Sat Jun 24 10:26:03 2017 +0530

----------------------------------------------------------------------
 .../UnsafeBatchParallelReadMergeSorterImpl.java | 16 +++++++++---
 .../UnsafeSingleThreadFinalSortFilesMerger.java | 26 --------------------
 .../steps/DataWriterBatchProcessorStepImpl.java | 18 ++++++++------
 3 files changed, 23 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4e74ebc/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
index cc7929d..84d45b3 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
@@ -154,7 +154,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends 
AbstractMergeSorter
               sortDataRows.getSortDataRow().addRowBatchWithOutSync(buffer, i);
               rowCounter.getAndAdd(i);
               if (!sortDataRows.getSortDataRow().canAdd()) {
-                sortDataRows.finish();
+                sortDataRows.finish(false);
                 sortDataRows.createSortDataRows();
               }
             }
@@ -245,7 +245,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends 
AbstractMergeSorter
       return sortDataRow;
     }
 
-    public void finish() {
+    public void finish(boolean isFinalAttempt) {
       try {
         // if the mergerQue is empty and some CarbonDataLoadingException 
exception has occurred
         // then set stop process to true in the finalmerger instance
@@ -253,6 +253,9 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends 
AbstractMergeSorter
             && threadStatusObserver.getThrowable() != null && 
threadStatusObserver
             .getThrowable() instanceof CarbonDataLoadingException) {
           finalMerger.setStopProcess(true);
+          if (isFinalAttempt) {
+            iteratorCount.decrementAndGet();
+          }
           mergerQueue.put(finalMerger);
           return;
         }
@@ -262,6 +265,9 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends 
AbstractMergeSorter
         finalMerger.startFinalMerge(rowPages.toArray(new 
UnsafeCarbonRowPage[rowPages.size()]),
             unsafeIntermediateFileMerger.getMergedPages());
         unsafeIntermediateFileMerger.close();
+        if (isFinalAttempt) {
+          iteratorCount.decrementAndGet();
+        }
         mergerQueue.put(finalMerger);
         sortDataRow = null;
         unsafeIntermediateFileMerger = null;
@@ -283,8 +289,10 @@ public class UnsafeBatchParallelReadMergeSorterImpl 
extends AbstractMergeSorter
 
     public void finishThread() {
       synchronized (lock) {
-        if (iteratorCount.decrementAndGet() <= 0) {
-          finish();
+        if (iteratorCount.get() <= 1) {
+          finish(true);
+        } else {
+          iteratorCount.decrementAndGet();
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4e74ebc/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
index cd6b321..44f29d1 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
@@ -55,25 +55,6 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends 
CarbonIterator<Objec
   private SortParameters parameters;
 
   /**
-   * number of measures
-   */
-  private int measureCount;
-
-  /**
-   * number of dimensionCount
-   */
-  private int dimensionCount;
-
-  /**
-   * number of complexDimensionCount
-   */
-  private int noDictionaryCount;
-
-  private int complexDimensionCount;
-
-  private boolean[] isNoDictionaryDimensionColumn;
-
-  /**
    * tempFileLocation
    */
   private String tempFileLocation;
@@ -85,13 +66,6 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends 
CarbonIterator<Objec
   public UnsafeSingleThreadFinalSortFilesMerger(SortParameters parameters,
       String tempFileLocation) {
     this.parameters = parameters;
-    // set measure and dimension count
-    this.measureCount = parameters.getMeasureColCount();
-    this.dimensionCount = parameters.getDimColCount();
-    this.complexDimensionCount = parameters.getComplexDimColCount();
-
-    this.noDictionaryCount = parameters.getNoDictionaryCount();
-    this.isNoDictionaryDimensionColumn = 
parameters.getNoDictionaryDimnesionColumn();
     this.tempFileLocation = tempFileLocation;
     this.tableName = parameters.getTableName();
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4e74ebc/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java
index ae2b625..e6f61f6 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java
@@ -104,13 +104,16 @@ public class DataWriterBatchProcessorStepImpl extends 
AbstractDataLoadProcessorS
         int k = 0;
         while (iterator.hasNext()) {
           CarbonRowBatch next = iterator.next();
-          CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
-              .createCarbonFactDataHandlerModel(configuration, storeLocation, 
i, k++);
-          CarbonFactHandler dataHandler = CarbonFactHandlerFactory
-              .createCarbonFactHandler(model, 
CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
-          dataHandler.initialise();
-          processBatch(next, dataHandler, model.getSegmentProperties());
-          finish(tableName, dataHandler);
+          // If no rows from merge sorter, then don't create a file in fact 
column handler
+          if (next.hasNext()) {
+            CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
+                .createCarbonFactDataHandlerModel(configuration, 
storeLocation, i, k++);
+            CarbonFactHandler dataHandler = CarbonFactHandlerFactory
+                .createCarbonFactHandler(model, 
CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
+            dataHandler.initialise();
+            processBatch(next, dataHandler, model.getSegmentProperties());
+            finish(tableName, dataHandler);
+          }
         }
         i++;
       }
@@ -181,6 +184,7 @@ public class DataWriterBatchProcessorStepImpl extends 
AbstractDataLoadProcessorS
       outputRow[len - 1] = 
keyGenerator.generateKey(row.getIntArray(dimsArrayIndex));
       dataHandler.addDataToStore(outputRow);
     }
+    batch.close();
     rowCounter.getAndAdd(batchSize);
   }
 

Reply via email to