[CARBONDATA-2784][CARBONDATA-2786][SDK writer] Fixed:Forever blocking wait with 
more than 21 batch of data

problem: [CARBONDATA-2784]
[SDK writer] Forever blocking wait with more than 21 batch of data, when 
consumer is dead due to data loading exception (bad record / out of memory)

root cause:
When the consumer is dead due to data loading exception, writer will be 
forcefully closed. but queue.clear() cleared only snapshot of entries (10 
batches) and close is set to true after that. In between clear() and close = 
true, If more than 10 batches of data is again put into queue. For 11th batch, 
queue.put() goes for forever block as consumer is dead.

Solution:
set close = true, before clearing the queue. This will avoid adding more 
batches to queue from write().

problem [CARBONDATA-2786] NPE when SDK writer tries to write a file

solution and cause:
#2387 , in CarbonProperties.java
After systemLocation = getStorePath(); Null validation missing for 
systemLocation.
because this can be null in SDK case. As Store location is not applicable for 
SDK.
All a null validation.

This closes #2561


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/fc8510a1
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/fc8510a1
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/fc8510a1

Branch: refs/heads/external-format
Commit: fc8510a112eaeab951b4bfc2c2a45bda45b6d757
Parents: 0e45f3a
Author: ajantha-bhat <ajanthab...@gmail.com>
Authored: Thu Jul 26 00:35:36 2018 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Sun Jul 29 19:57:43 2018 +0530

----------------------------------------------------------------------
 .../org/apache/carbondata/core/util/CarbonProperties.java    | 6 ++++--
 .../loading/iterator/CarbonOutputIteratorWrapper.java        | 8 ++++++--
 2 files changed, 10 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/fc8510a1/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 004eb74..8a91a43 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1532,8 +1532,10 @@ public final class CarbonProperties {
     if (systemLocation == null) {
       systemLocation = getStorePath();
     }
-    systemLocation = 
CarbonUtil.checkAndAppendFileSystemURIScheme(systemLocation);
-    systemLocation = FileFactory.getUpdatedFilePath(systemLocation);
+    if (systemLocation != null) {
+      systemLocation = 
CarbonUtil.checkAndAppendFileSystemURIScheme(systemLocation);
+      systemLocation = FileFactory.getUpdatedFilePath(systemLocation);
+    }
     return systemLocation + CarbonCommonConstants.FILE_SEPARATOR + "_system";
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fc8510a1/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
index deb628c..a00b562 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
@@ -94,9 +94,13 @@ public class CarbonOutputIteratorWrapper extends 
CarbonIterator<Object[]> {
     }
     try {
       if (isForceClose) {
-        // unblock the queue.put on the other thread and clear the queue.
-        queue.clear();
+        // first make close is set to true, when force close happens because 
of dead consumer.
+        // so that, write() method will stop taking input rows.
         close = true;
+        // once write() method stops taking input rows, clear the queue.
+        // If queue is cleared before close is set to true, then queue will be 
again filled
+        // by .write() and it can go to blocking put() forever as consumer is 
dead.
+        queue.clear();
         return;
       }
       // below code will ensure that the last RowBatch is consumed properly

Reply via email to