Fixed batch load issue count and synchronization
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f7015212 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f7015212 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f7015212 Branch: refs/heads/branch-1.1 Commit: f7015212d10c73c287e28640cb4545158b1ad318 Parents: a6468f7 Author: ravipesala <ravi.pes...@gmail.com> Authored: Thu Jun 15 16:32:09 2017 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Sun Jun 18 14:13:48 2017 +0530 ---------------------------------------------------------------------- .../sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/f7015212/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java index c3243b6..cc7929d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java @@ -197,7 +197,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter ThreadStatusObserver threadStatusObserver) { this.sortParameters = sortParameters.getCopy(); this.iteratorCount = new AtomicInteger(numberOfThreads); - this.mergerQueue = new LinkedBlockingQueue<>(); + this.mergerQueue = new LinkedBlockingQueue<>(1); this.threadStatusObserver = threadStatusObserver; createSortDataRows(); } @@ -254,6 +254,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter .getThrowable() instanceof CarbonDataLoadingException) { finalMerger.setStopProcess(true); mergerQueue.put(finalMerger); + return; } processRowToNextStep(sortDataRow, sortParameters); unsafeIntermediateFileMerger.finish(); @@ -270,6 +271,12 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter } catch (CarbonSortKeyAndGroupByException e) { throw new CarbonDataLoadingException(e); } catch (InterruptedException e) { + // if fails to put in queue because of interrupted exception, we can offer to free the main + // thread from waiting. + if (finalMerger != null) { + finalMerger.setStopProcess(true); + mergerQueue.offer(finalMerger); + } throw new CarbonDataLoadingException(e); } }