[CARBONDATA-2417] [SDK Writer] SDK writer goes to infinite wait when consumer thread goes dead
Problem: SDK Writer going into infinte loop in case of multi-threaded scenario Analysis: In multi-threaded scenarios where multiple instances of writer thread are trying to add a row to the RowBatch, addition to given batch size cannot be ensured as addition process is not synchronized and it can lead to ArrayIndexOutOfBound Exception or data loss/mismatch issues. If multiple writer threads are adding the data to RowBatch and immediately after launching all the threads closeWriter is called, in that case we don't know when all the data is finished writing by all the threads but we are returning immediately from close writer after setting the close flag to true. This does not ensure complete processing of data. Solution: Make the row addition logic synchronized and modify the code in closeWriter to ensure data completeness. This closes #2286 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/fe436c3e Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/fe436c3e Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/fe436c3e Branch: refs/heads/spark-2.3 Commit: fe436c3ecef632f6a5621d391c6dd8724813b25f Parents: 35a7b5e Author: manishgupta88 <tomanishgupt...@gmail.com> Authored: Wed May 9 15:08:05 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Thu May 10 20:54:11 2018 +0530 ---------------------------------------------------------------------- docs/sdk-writer-guide.md | 3 ++- .../loading/iterator/CarbonOutputIteratorWrapper.java | 6 ++++-- .../java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java | 1 - .../main/java/org/apache/carbondata/sdk/file/CarbonWriter.java | 3 ++- 4 files changed, 8 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/fe436c3e/docs/sdk-writer-guide.md ---------------------------------------------------------------------- diff --git a/docs/sdk-writer-guide.md b/docs/sdk-writer-guide.md index bfbf997..18b583a 100644 --- a/docs/sdk-writer-guide.md +++ b/docs/sdk-writer-guide.md @@ -281,7 +281,8 @@ public CarbonWriter buildWriterForAvroInput() throws IOException, InvalidLoadOpt /** * Write an object to the file, the format of the object depends on the implementation * If AvroCarbonWriter, object is of type org.apache.avro.generic.GenericData.Record -* If CSVCarbonWriter, object is of type String[] +* If CSVCarbonWriter, object is of type String[] +* Note: This API is not thread safe * @param object * @throws IOException */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/fe436c3e/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java index 4067be1..deb628c 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java @@ -21,6 +21,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.core.util.CarbonProperties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,12 +34,12 @@ public class CarbonOutputIteratorWrapper extends CarbonIterator<Object[]> { private static final Log LOG = LogFactory.getLog(CarbonOutputIteratorWrapper.class); - private boolean close = false; + private boolean close; /** * Number of rows kept in memory at most will be batchSize * queue size */ - private int batchSize = 1000; + private int batchSize = CarbonProperties.getInstance().getBatchSize(); private RowBatch loadBatch = new RowBatch(batchSize); @@ -98,6 +99,7 @@ public class CarbonOutputIteratorWrapper extends CarbonIterator<Object[]> { close = true; return; } + // below code will ensure that the last RowBatch is consumed properly loadBatch.readyRead(); if (loadBatch.size > 0) { queue.put(loadBatch); http://git-wip-us.apache.org/repos/asf/carbondata/blob/fe436c3e/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java index 62d5860..627e060 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java @@ -69,7 +69,6 @@ class CSVCarbonWriter extends CarbonWriter { writable.set((String[]) object); recordWriter.write(NullWritable.get(), writable); } catch (Exception e) { - close(); throw new IOException(e); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/fe436c3e/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriter.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriter.java index e29aa18..60ad060 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriter.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriter.java @@ -32,7 +32,8 @@ public abstract class CarbonWriter { /** * Write an object to the file, the format of the object depends on the - * implementation + * implementation. + * Note: This API is not thread safe */ public abstract void write(Object object) throws IOException;