[CARBONDATA-2417] [SDK Writer] SDK writer goes to infinite wait when consumer 
thread goes dead

Problem:
SDK Writer going into infinte loop in case of multi-threaded scenario

Analysis:
In multi-threaded scenarios where multiple instances of writer thread are 
trying to add a row to the RowBatch, addition to given batch size cannot be 
ensured as addition process is not synchronized and it can lead to 
ArrayIndexOutOfBound Exception or data loss/mismatch issues.
If multiple writer threads are adding the data to RowBatch and immediately 
after launching all the threads closeWriter is called, in that case we don't 
know when all the data is finished writing by all the threads but we are 
returning immediately from close writer after setting the close flag to true. 
This does not ensure complete processing of data.

Solution:
Make the row addition logic synchronized and modify the code in closeWriter to 
ensure data completeness.

This closes #2286


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/fe436c3e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/fe436c3e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/fe436c3e

Branch: refs/heads/spark-2.3
Commit: fe436c3ecef632f6a5621d391c6dd8724813b25f
Parents: 35a7b5e
Author: manishgupta88 <tomanishgupt...@gmail.com>
Authored: Wed May 9 15:08:05 2018 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Thu May 10 20:54:11 2018 +0530

----------------------------------------------------------------------
 docs/sdk-writer-guide.md                                       | 3 ++-
 .../loading/iterator/CarbonOutputIteratorWrapper.java          | 6 ++++--
 .../java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java   | 1 -
 .../main/java/org/apache/carbondata/sdk/file/CarbonWriter.java | 3 ++-
 4 files changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/fe436c3e/docs/sdk-writer-guide.md
----------------------------------------------------------------------
diff --git a/docs/sdk-writer-guide.md b/docs/sdk-writer-guide.md
index bfbf997..18b583a 100644
--- a/docs/sdk-writer-guide.md
+++ b/docs/sdk-writer-guide.md
@@ -281,7 +281,8 @@ public CarbonWriter buildWriterForAvroInput() throws 
IOException, InvalidLoadOpt
 /**
 * Write an object to the file, the format of the object depends on the 
implementation
 * If AvroCarbonWriter, object is of type 
org.apache.avro.generic.GenericData.Record 
-* If CSVCarbonWriter, object is of type String[] 
+* If CSVCarbonWriter, object is of type String[]
+* Note: This API is not thread safe
 * @param object
 * @throws IOException
 */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fe436c3e/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
index 4067be1..deb628c 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.util.CarbonProperties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -33,12 +34,12 @@ public class CarbonOutputIteratorWrapper extends 
CarbonIterator<Object[]> {
 
   private static final Log LOG = 
LogFactory.getLog(CarbonOutputIteratorWrapper.class);
 
-  private boolean close = false;
+  private boolean close;
 
   /**
    * Number of rows kept in memory at most will be batchSize * queue size
    */
-  private int batchSize = 1000;
+  private int batchSize = CarbonProperties.getInstance().getBatchSize();
 
   private RowBatch loadBatch = new RowBatch(batchSize);
 
@@ -98,6 +99,7 @@ public class CarbonOutputIteratorWrapper extends 
CarbonIterator<Object[]> {
         close = true;
         return;
       }
+      // below code will ensure that the last RowBatch is consumed properly
       loadBatch.readyRead();
       if (loadBatch.size > 0) {
         queue.put(loadBatch);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fe436c3e/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
----------------------------------------------------------------------
diff --git 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
index 62d5860..627e060 100644
--- 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
+++ 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
@@ -69,7 +69,6 @@ class CSVCarbonWriter extends CarbonWriter {
       writable.set((String[]) object);
       recordWriter.write(NullWritable.get(), writable);
     } catch (Exception e) {
-      close();
       throw new IOException(e);
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fe436c3e/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriter.java
----------------------------------------------------------------------
diff --git 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriter.java 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriter.java
index e29aa18..60ad060 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriter.java
@@ -32,7 +32,8 @@ public abstract class CarbonWriter {
 
   /**
    * Write an object to the file, the format of the object depends on the
-   * implementation
+   * implementation.
+   * Note: This API is not thread safe
    */
   public abstract void write(Object object) throws IOException;
 

Reply via email to