This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ccfb9b  [CARBONDATA-3958] Avoid blocking the loading task when the 
output queue poll timeout
1ccfb9b is described below

commit 1ccfb9be3fab8dfc2556541872632471d733518a
Author: QiangCai <qiang...@qq.com>
AuthorDate: Mon Aug 24 16:15:54 2020 +0800

    [CARBONDATA-3958] Avoid blocking the loading task when the output queue 
poll timeout
    
    Why is this PR needed?
    In some cases, the CDC merge tasks are blocked when data loading uses
    CarbonTableOutputFormat.getRecordWriter method. PR #3856 change the code to 
avoid using
    CarbonTableOutputFormat.getRecordWriter, CDC merge will not happen this 
issue again.
    But this issue maybe happen in other similar scenarios.
    
    Because the poll method of the queue is time out in some cases, so the row 
batch in
    the queue will not be polled again.
    
    After the queue is full, the queue blocks the writing task puts a new 
batch. Even if
    the queue is not full, it will also lose the row batch in the queue.
    
    What changes were proposed in this PR?
    If the output is not closed, it will poll a row batch in the loops till it 
gets a not
    null batch.
    If the output is closed, it will break the loop.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #3897
---
 .../iterator/CarbonOutputIteratorWrapper.java      | 32 ++++++++++++++++------
 1 file changed, 24 insertions(+), 8 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
index 3e29288..dc9aa54 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
@@ -63,18 +63,33 @@ public class CarbonOutputIteratorWrapper extends 
CarbonIterator<Object[]> {
   @Override
   public boolean hasNext() {
     if (readBatch == null || !readBatch.hasNext()) {
+      // if readBatch don't have next row, set it to null
+      readBatch = null;
       try {
-        if (!close) {
+        // if the output is not closed, poll a batch from the queue in a loop.
+        // if the queue is always empty, it will wait for the last default 
element of the output,
+        // after that, the output will be closed and the loop will be finished.
+        while (!close) {
           readBatch = queue.poll(5, TimeUnit.MINUTES);
           if (readBatch == null) {
-            LOG.warn("This scenario should not happen");
-            return false;
+            LOG.warn("try to poll a row batch again.");
+          } else {
+            // if readBatch is not null, break this loop to continue
+            break;
           }
-        } else {
+        }
+
+        // when the output is closed and readBatch is null, should poll a 
batch immediately.
+        // it is a double-checking also of the last poll operation. in some 
cases, the output is
+        // closed and the readBatch is null, but the queue is not empty, 
contain the last load
+        // batch or the last default batch.
+        if (close && readBatch == null) {
+          LOG.warn("try to poll a row batch one more time.");
           readBatch = queue.poll();
-          if (readBatch == null) {
-            return false;
-          }
+        }
+        // checking again whether readBatch is null or not
+        if (readBatch == null) {
+          return false;
         }
       } catch (InterruptedException e) {
         throw new RuntimeException(e);
@@ -112,13 +127,14 @@ public class CarbonOutputIteratorWrapper extends 
CarbonIterator<Object[]> {
     } catch (InterruptedException e) {
       throw new RuntimeException(e);
     }
-    close = true;
     // It is required if the thread waits for take.
     if (queue.isEmpty()) {
       if (!queue.offer(new RowBatch(0))) {
         LOG.warn("The default last element is not added to queue");
       }
     }
+    // after try to add the default last element, close the output.
+    close = true;
   }
 
   private static class RowBatch extends CarbonIterator<Object[]> {

Reply via email to