Acquire semaphore before submit a producer in finish.
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2403f280 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2403f280 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2403f280 Branch: refs/heads/branch-1.1 Commit: 2403f2807d1f9d4257a34ecf322d7262ca3a6320 Parents: 64f973e Author: Yadong Qi <qiyadong2...@gmail.com> Authored: Thu Jun 1 20:28:19 2017 +0800 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Thu Jun 15 13:16:56 2017 +0530 ---------------------------------------------------------------------- .../store/CarbonFactDataHandlerColumnar.java | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/2403f280/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java index f6ceb84..4ba1717 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java @@ -862,13 +862,19 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { public void finish() throws CarbonDataWriterException { // still some data is present in stores if entryCount is more // than 0 - producerExecutorServiceTaskList.add(producerExecutorService - .submit(new Producer(blockletDataHolder, dataRows, ++writerTaskSequenceCounter, true))); - blockletProcessingCount.incrementAndGet(); - processedDataCount += entryCount; - closeWriterExecutionService(producerExecutorService); - processWriteTaskSubmitList(producerExecutorServiceTaskList); - processingComplete = true; + try { + semaphore.acquire(); + producerExecutorServiceTaskList.add(producerExecutorService + .submit(new Producer(blockletDataHolder, dataRows, ++writerTaskSequenceCounter, true))); + blockletProcessingCount.incrementAndGet(); + processedDataCount += entryCount; + closeWriterExecutionService(producerExecutorService); + processWriteTaskSubmitList(producerExecutorServiceTaskList); + processingComplete = true; + } catch (InterruptedException e) { + LOGGER.error(e, e.getMessage()); + throw new CarbonDataWriterException(e.getMessage(), e); + } } /**