[CARBONDATA-1177]Fixed batch sort synchronization issue
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a6468f73 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a6468f73 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a6468f73 Branch: refs/heads/branch-1.1 Commit: a6468f73bf74a2afb0a4d2c97664e127f91d69bd Parents: c05523d Author: dhatchayani <dhatcha.offic...@gmail.com> Authored: Thu Jun 15 10:03:08 2017 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Sun Jun 18 14:13:37 2017 +0530 ---------------------------------------------------------------------- .../UnsafeBatchParallelReadMergeSorterImpl.java | 36 ++++++++++++++++---- 1 file changed, 29 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6468f73/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java index f1b4a80..c3243b6 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java @@ -16,6 +16,7 @@ */ package org.apache.carbondata.processing.newflow.sort.impl; +import java.io.File; import java.util.Iterator; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -30,6 +31,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; @@ -44,6 +46,7 @@ import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleT import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException; import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters; import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; /** * It parallely reads data from array of iterates and do merge sort. @@ -184,11 +187,15 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter private AtomicInteger iteratorCount; + private int batchCount; + private ThreadStatusObserver threadStatusObserver; + private final Object lock = new Object(); + public SortBatchHolder(SortParameters sortParameters, int numberOfThreads, ThreadStatusObserver threadStatusObserver) { - this.sortParameters = sortParameters; + this.sortParameters = sortParameters.getCopy(); this.iteratorCount = new AtomicInteger(numberOfThreads); this.mergerQueue = new LinkedBlockingQueue<>(); this.threadStatusObserver = threadStatusObserver; @@ -197,6 +204,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter private void createSortDataRows() { int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB(); + setTempLocation(sortParameters); this.finalMerger = new UnsafeSingleThreadFinalSortFilesMerger(sortParameters, sortParameters.getTempFileLocation()); unsafeIntermediateFileMerger = new UnsafeIntermediateMerger(sortParameters); @@ -208,6 +216,16 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter } catch (CarbonSortKeyAndGroupByException e) { throw new CarbonDataLoadingException(e); } + batchCount++; + } + + private void setTempLocation(SortParameters parameters) { + String carbonDataDirectoryPath = CarbonDataProcessorUtil + .getLocalDataFolderLocation(parameters.getDatabaseName(), + parameters.getTableName(), parameters.getTaskNo(), batchCount + "", + parameters.getSegmentId(), false); + parameters.setTempFileLocation( + carbonDataDirectoryPath + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); } @Override public UnsafeSingleThreadFinalSortFilesMerger next() { @@ -235,7 +253,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter && threadStatusObserver.getThrowable() != null && threadStatusObserver .getThrowable() instanceof CarbonDataLoadingException) { finalMerger.setStopProcess(true); - mergerQueue.offer(finalMerger); + mergerQueue.put(finalMerger); } processRowToNextStep(sortDataRow, sortParameters); unsafeIntermediateFileMerger.finish(); @@ -243,7 +261,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]), unsafeIntermediateFileMerger.getMergedPages()); unsafeIntermediateFileMerger.close(); - mergerQueue.offer(finalMerger); + mergerQueue.put(finalMerger); sortDataRow = null; unsafeIntermediateFileMerger = null; finalMerger = null; @@ -251,16 +269,20 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter throw new CarbonDataLoadingException(e); } catch (CarbonSortKeyAndGroupByException e) { throw new CarbonDataLoadingException(e); + } catch (InterruptedException e) { + throw new CarbonDataLoadingException(e); } } - public synchronized void finishThread() { - if (iteratorCount.decrementAndGet() <= 0) { - finish(); + public void finishThread() { + synchronized (lock) { + if (iteratorCount.decrementAndGet() <= 0) { + finish(); + } } } - public synchronized boolean hasNext() { + public boolean hasNext() { return iteratorCount.get() > 0 || !mergerQueue.isEmpty(); }