[CARBONDATA - 1159] Batch sort loading is not proper without synchronization


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/72bbb62b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/72bbb62b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/72bbb62b

Branch: refs/heads/branch-1.1
Commit: 72bbb62bc0bb8e7a38be09938c3cfae171af2ea2
Parents: da952e8
Author: dhatchayani <dhatcha.offic...@gmail.com>
Authored: Mon Jun 12 21:56:47 2017 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Thu Jun 15 13:36:48 2017 +0530

----------------------------------------------------------------------
 .../UnsafeBatchParallelReadMergeSorterImpl.java |  7 ++-
 .../newflow/sort/unsafe/UnsafeSortDataRows.java | 49 +++++++++++---------
 .../util/CarbonDataProcessorUtil.java           |  7 ++-
 3 files changed, 37 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/72bbb62b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
index 0c6fa27..20fd48b 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
@@ -147,9 +147,9 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends 
AbstractMergeSorter
             }
           }
           if (i > 0) {
-            sortDataRows.getSortDataRow().addRowBatch(buffer, i);
-            rowCounter.getAndAdd(i);
             synchronized (sortDataRows) {
+              sortDataRows.getSortDataRow().addRowBatchWithOutSync(buffer, i);
+              rowCounter.getAndAdd(i);
               if (!sortDataRows.getSortDataRow().canAdd()) {
                 sortDataRows.finish();
                 sortDataRows.createSortDataRows();
@@ -197,6 +197,9 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends 
AbstractMergeSorter
 
     private void createSortDataRows() {
       int inMemoryChunkSizeInMB = 
CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
+      if (inMemoryChunkSizeInMB > sortParameters.getBatchSortSizeinMb()) {
+        inMemoryChunkSizeInMB = sortParameters.getBatchSortSizeinMb();
+      }
       this.finalMerger = new 
UnsafeSingleThreadFinalSortFilesMerger(sortParameters,
           sortParameters.getTempFileLocation());
       unsafeIntermediateFileMerger = new 
UnsafeIntermediateMerger(sortParameters);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/72bbb62b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
index 898b73d..b4daa51 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
@@ -132,7 +132,7 @@ public class UnsafeSortDataRows {
   public static MemoryBlock getMemoryBlock(long size) throws 
CarbonSortKeyAndGroupByException {
     MemoryBlock baseBlock = null;
     int tries = 0;
-    while (true && tries < 100) {
+    while (tries < 100) {
       baseBlock = UnsafeMemoryManager.INSTANCE.allocateMemory(size);
       if (baseBlock == null) {
         try {
@@ -165,29 +165,32 @@ public class UnsafeSortDataRows {
     // if record holder list size is equal to sort buffer size then it will
     // sort the list and then write current list data to file
     synchronized (addRowsLock) {
-      for (int i = 0; i < size; i++) {
-        if (rowPage.canAdd()) {
-          bytesAdded += rowPage.addRow(rowBatch[i]);
-        } else {
-          try {
-            if (enableInMemoryIntermediateMerge) {
-              
unsafeInMemoryIntermediateFileMerger.startInmemoryMergingIfPossible();
-            }
-            unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
-            semaphore.acquire();
-            dataSorterAndWriterExecutorService.submit(new 
DataSorterAndWriter(rowPage));
-            MemoryBlock memoryBlock = getMemoryBlock(inMemoryChunkSize);
-            boolean saveToDisk = 
!UnsafeMemoryManager.INSTANCE.isMemoryAvailable();
-            rowPage = new 
UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
-                parameters.getDimColCount() + 
parameters.getComplexDimColCount(),
-                parameters.getMeasureColCount(), parameters.getAggType(), 
memoryBlock, saveToDisk);
-            bytesAdded += rowPage.addRow(rowBatch[i]);
-          } catch (Exception e) {
-            LOGGER.error(
-                "exception occurred while trying to acquire a semaphore lock: 
" + e.getMessage());
-            throw new CarbonSortKeyAndGroupByException(e);
-          }
+      addBatch(rowBatch, size);
+    }
+  }
 
+  private void addBatch(Object[][] rowBatch, int size) throws 
CarbonSortKeyAndGroupByException {
+    for (int i = 0; i < size; i++) {
+      if (rowPage.canAdd()) {
+        bytesAdded += rowPage.addRow(rowBatch[i]);
+      } else {
+        try {
+          if (enableInMemoryIntermediateMerge) {
+            
unsafeInMemoryIntermediateFileMerger.startInmemoryMergingIfPossible();
+          }
+          unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
+          semaphore.acquire();
+          dataSorterAndWriterExecutorService.submit(new 
DataSorterAndWriter(rowPage));
+          MemoryBlock memoryBlock = getMemoryBlock(inMemoryChunkSize);
+          boolean saveToDisk = 
!UnsafeMemoryManager.INSTANCE.isMemoryAvailable();
+          rowPage = new 
UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
+              parameters.getDimColCount() + parameters.getComplexDimColCount(),
+              parameters.getMeasureColCount(), parameters.getAggType(), 
memoryBlock, saveToDisk);
+          bytesAdded += rowPage.addRow(rowBatch[i]);
+        } catch (Exception e) {
+          LOGGER.error(
+              "exception occurred while trying to acquire a semaphore lock: " 
+ e.getMessage());
+          throw new CarbonSortKeyAndGroupByException(e);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/72bbb62b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index a4de24e..12a453d 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -542,9 +542,11 @@ public final class CarbonDataProcessorUtil {
             
configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_SORT_SCOPE)
                 .toString());
       }
+      LOGGER.warn("sort scope is set to " + sortScope);
     } catch (Exception e) {
       sortScope = 
SortScopeOptions.getSortScope(CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT);
-      LOGGER.warn("sort scope is set to " + sortScope);
+      LOGGER.warn("Exception occured while resolving sort scope. " +
+          "sort scope is set to " + sortScope);
     }
     return sortScope;
   }
@@ -567,8 +569,11 @@ public final class CarbonDataProcessorUtil {
             
configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB)
                 .toString());
       }
+      LOGGER.warn("batch sort size is set to " + batchSortSizeInMb);
     } catch (Exception e) {
       batchSortSizeInMb = 0;
+      LOGGER.warn("Exception occured while resolving batch sort size. " +
+          "batch sort size is set to " + batchSortSizeInMb);
     }
     return batchSortSizeInMb;
   }

Reply via email to