[CARBONDATA-2784][CARBONDATA-2786][SDK writer] Fixed:Forever blocking wait with 
more than 21 batch of data

problem: [CARBONDATA-2784]
[SDK writer] Forever blocking wait with more than 21 batch of data, when 
consumer is dead due to data loading exception (bad record / out of memory)

root cause:
When the consumer is dead due to data loading exception, writer will be 
forcefully closed. but queue.clear() cleared only snapshot of entries (10 
batches) and close is set to true after that. In between clear() and close = 
true, If more than 10 batches of data is again put into queue. For 11th batch, 
queue.put() goes for forever block as consumer is dead.

Solution:
set close = true, before clearing the queue. This will avoid adding more 
batches to queue from write().

problem [CARBONDATA-2786] NPE when SDK writer tries to write a file

solution and cause:
#2387 , in CarbonProperties.java
After systemLocation = getStorePath(); Null validation missing for 
systemLocation.
because this can be null in SDK case. As Store location is not applicable for 
SDK.
All a null validation.

This closes #2561


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/339f1a80
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/339f1a80
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/339f1a80

Branch: refs/heads/branch-1.4
Commit: 339f1a8015c0a562bd11b6f8fce2541a1598d19e
Parents: 769b306
Author: ajantha-bhat <ajanthab...@gmail.com>
Authored: Thu Jul 26 00:35:36 2018 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Tue Jul 31 00:11:26 2018 +0530

----------------------------------------------------------------------
 .../org/apache/carbondata/core/util/CarbonProperties.java    | 6 ++++--
 .../loading/iterator/CarbonOutputIteratorWrapper.java        | 8 ++++++--
 2 files changed, 10 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/339f1a80/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 004eb74..8a91a43 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1532,8 +1532,10 @@ public final class CarbonProperties {
     if (systemLocation == null) {
       systemLocation = getStorePath();
     }
-    systemLocation = 
CarbonUtil.checkAndAppendFileSystemURIScheme(systemLocation);
-    systemLocation = FileFactory.getUpdatedFilePath(systemLocation);
+    if (systemLocation != null) {
+      systemLocation = 
CarbonUtil.checkAndAppendFileSystemURIScheme(systemLocation);
+      systemLocation = FileFactory.getUpdatedFilePath(systemLocation);
+    }
     return systemLocation + CarbonCommonConstants.FILE_SEPARATOR + "_system";
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/339f1a80/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
index deb628c..a00b562 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
@@ -94,9 +94,13 @@ public class CarbonOutputIteratorWrapper extends 
CarbonIterator<Object[]> {
     }
     try {
       if (isForceClose) {
-        // unblock the queue.put on the other thread and clear the queue.
-        queue.clear();
+        // first make close is set to true, when force close happens because 
of dead consumer.
+        // so that, write() method will stop taking input rows.
         close = true;
+        // once write() method stops taking input rows, clear the queue.
+        // If queue is cleared before close is set to true, then queue will be 
again filled
+        // by .write() and it can go to blocking put() forever as consumer is 
dead.
+        queue.clear();
         return;
       }
       // below code will ensure that the last RowBatch is consumed properly

Reply via email to