[CARBONDATA - 1159] Batch sort loading is not proper without synchronization
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/72bbb62b Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/72bbb62b Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/72bbb62b Branch: refs/heads/branch-1.1 Commit: 72bbb62bc0bb8e7a38be09938c3cfae171af2ea2 Parents: da952e8 Author: dhatchayani <dhatcha.offic...@gmail.com> Authored: Mon Jun 12 21:56:47 2017 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Thu Jun 15 13:36:48 2017 +0530 ---------------------------------------------------------------------- .../UnsafeBatchParallelReadMergeSorterImpl.java | 7 ++- .../newflow/sort/unsafe/UnsafeSortDataRows.java | 49 +++++++++++--------- .../util/CarbonDataProcessorUtil.java | 7 ++- 3 files changed, 37 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/72bbb62b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java index 0c6fa27..20fd48b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java @@ -147,9 +147,9 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter } } if (i > 0) { - sortDataRows.getSortDataRow().addRowBatch(buffer, i); - rowCounter.getAndAdd(i); synchronized (sortDataRows) { + sortDataRows.getSortDataRow().addRowBatchWithOutSync(buffer, i); + rowCounter.getAndAdd(i); if (!sortDataRows.getSortDataRow().canAdd()) { sortDataRows.finish(); sortDataRows.createSortDataRows(); @@ -197,6 +197,9 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter private void createSortDataRows() { int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB(); + if (inMemoryChunkSizeInMB > sortParameters.getBatchSortSizeinMb()) { + inMemoryChunkSizeInMB = sortParameters.getBatchSortSizeinMb(); + } this.finalMerger = new UnsafeSingleThreadFinalSortFilesMerger(sortParameters, sortParameters.getTempFileLocation()); unsafeIntermediateFileMerger = new UnsafeIntermediateMerger(sortParameters); http://git-wip-us.apache.org/repos/asf/carbondata/blob/72bbb62b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java index 898b73d..b4daa51 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java @@ -132,7 +132,7 @@ public class UnsafeSortDataRows { public static MemoryBlock getMemoryBlock(long size) throws CarbonSortKeyAndGroupByException { MemoryBlock baseBlock = null; int tries = 0; - while (true && tries < 100) { + while (tries < 100) { baseBlock = UnsafeMemoryManager.INSTANCE.allocateMemory(size); if (baseBlock == null) { try { @@ -165,29 +165,32 @@ public class UnsafeSortDataRows { // if record holder list size is equal to sort buffer size then it will // sort the list and then write current list data to file synchronized (addRowsLock) { - for (int i = 0; i < size; i++) { - if (rowPage.canAdd()) { - bytesAdded += rowPage.addRow(rowBatch[i]); - } else { - try { - if (enableInMemoryIntermediateMerge) { - unsafeInMemoryIntermediateFileMerger.startInmemoryMergingIfPossible(); - } - unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible(); - semaphore.acquire(); - dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage)); - MemoryBlock memoryBlock = getMemoryBlock(inMemoryChunkSize); - boolean saveToDisk = !UnsafeMemoryManager.INSTANCE.isMemoryAvailable(); - rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(), - parameters.getDimColCount() + parameters.getComplexDimColCount(), - parameters.getMeasureColCount(), parameters.getAggType(), memoryBlock, saveToDisk); - bytesAdded += rowPage.addRow(rowBatch[i]); - } catch (Exception e) { - LOGGER.error( - "exception occurred while trying to acquire a semaphore lock: " + e.getMessage()); - throw new CarbonSortKeyAndGroupByException(e); - } + addBatch(rowBatch, size); + } + } + private void addBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException { + for (int i = 0; i < size; i++) { + if (rowPage.canAdd()) { + bytesAdded += rowPage.addRow(rowBatch[i]); + } else { + try { + if (enableInMemoryIntermediateMerge) { + unsafeInMemoryIntermediateFileMerger.startInmemoryMergingIfPossible(); + } + unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible(); + semaphore.acquire(); + dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage)); + MemoryBlock memoryBlock = getMemoryBlock(inMemoryChunkSize); + boolean saveToDisk = !UnsafeMemoryManager.INSTANCE.isMemoryAvailable(); + rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(), + parameters.getDimColCount() + parameters.getComplexDimColCount(), + parameters.getMeasureColCount(), parameters.getAggType(), memoryBlock, saveToDisk); + bytesAdded += rowPage.addRow(rowBatch[i]); + } catch (Exception e) { + LOGGER.error( + "exception occurred while trying to acquire a semaphore lock: " + e.getMessage()); + throw new CarbonSortKeyAndGroupByException(e); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/72bbb62b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java index a4de24e..12a453d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java @@ -542,9 +542,11 @@ public final class CarbonDataProcessorUtil { configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_SORT_SCOPE) .toString()); } + LOGGER.warn("sort scope is set to " + sortScope); } catch (Exception e) { sortScope = SortScopeOptions.getSortScope(CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT); - LOGGER.warn("sort scope is set to " + sortScope); + LOGGER.warn("Exception occured while resolving sort scope. " + + "sort scope is set to " + sortScope); } return sortScope; } @@ -567,8 +569,11 @@ public final class CarbonDataProcessorUtil { configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB) .toString()); } + LOGGER.warn("batch sort size is set to " + batchSortSizeInMb); } catch (Exception e) { batchSortSizeInMb = 0; + LOGGER.warn("Exception occured while resolving batch sort size. " + + "batch sort size is set to " + batchSortSizeInMb); } return batchSortSizeInMb; }