Acquire semaphore before submit a producer in finish.

Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2403f280
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2403f280
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2403f280

Branch: refs/heads/branch-1.1
Commit: 2403f2807d1f9d4257a34ecf322d7262ca3a6320
Parents: 64f973e
Author: Yadong Qi <qiyadong2...@gmail.com>
Authored: Thu Jun 1 20:28:19 2017 +0800
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Thu Jun 15 13:16:56 2017 +0530

----------------------------------------------------------------------
 .../store/CarbonFactDataHandlerColumnar.java    | 20 +++++++++++++-------
 1 file changed, 13 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2403f280/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index f6ceb84..4ba1717 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -862,13 +862,19 @@ public class CarbonFactDataHandlerColumnar implements 
CarbonFactHandler {
   public void finish() throws CarbonDataWriterException {
     // still some data is present in stores if entryCount is more
     // than 0
-    producerExecutorServiceTaskList.add(producerExecutorService
-        .submit(new Producer(blockletDataHolder, dataRows, 
++writerTaskSequenceCounter, true)));
-    blockletProcessingCount.incrementAndGet();
-    processedDataCount += entryCount;
-    closeWriterExecutionService(producerExecutorService);
-    processWriteTaskSubmitList(producerExecutorServiceTaskList);
-    processingComplete = true;
+    try {
+      semaphore.acquire();
+      producerExecutorServiceTaskList.add(producerExecutorService
+          .submit(new Producer(blockletDataHolder, dataRows, 
++writerTaskSequenceCounter, true)));
+      blockletProcessingCount.incrementAndGet();
+      processedDataCount += entryCount;
+      closeWriterExecutionService(producerExecutorService);
+      processWriteTaskSubmitList(producerExecutorServiceTaskList);
+      processingComplete = true;
+    } catch (InterruptedException e) {
+      LOGGER.error(e, e.getMessage());
+      throw new CarbonDataWriterException(e.getMessage(), e);
+    }
   }
 
   /**

Reply via email to