[CARBONDATA-2784][CARBONDATA-2786][SDK writer] Fixed:Forever blocking wait with more than 21 batch of data
problem: [CARBONDATA-2784] [SDK writer] Forever blocking wait with more than 21 batch of data, when consumer is dead due to data loading exception (bad record / out of memory) root cause: When the consumer is dead due to data loading exception, writer will be forcefully closed. but queue.clear() cleared only snapshot of entries (10 batches) and close is set to true after that. In between clear() and close = true, If more than 10 batches of data is again put into queue. For 11th batch, queue.put() goes for forever block as consumer is dead. Solution: set close = true, before clearing the queue. This will avoid adding more batches to queue from write(). problem [CARBONDATA-2786] NPE when SDK writer tries to write a file solution and cause: #2387 , in CarbonProperties.java After systemLocation = getStorePath(); Null validation missing for systemLocation. because this can be null in SDK case. As Store location is not applicable for SDK. All a null validation. This closes #2561 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/fc8510a1 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/fc8510a1 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/fc8510a1 Branch: refs/heads/external-format Commit: fc8510a112eaeab951b4bfc2c2a45bda45b6d757 Parents: 0e45f3a Author: ajantha-bhat <ajanthab...@gmail.com> Authored: Thu Jul 26 00:35:36 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Sun Jul 29 19:57:43 2018 +0530 ---------------------------------------------------------------------- .../org/apache/carbondata/core/util/CarbonProperties.java | 6 ++++-- .../loading/iterator/CarbonOutputIteratorWrapper.java | 8 ++++++-- 2 files changed, 10 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/fc8510a1/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index 004eb74..8a91a43 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -1532,8 +1532,10 @@ public final class CarbonProperties { if (systemLocation == null) { systemLocation = getStorePath(); } - systemLocation = CarbonUtil.checkAndAppendFileSystemURIScheme(systemLocation); - systemLocation = FileFactory.getUpdatedFilePath(systemLocation); + if (systemLocation != null) { + systemLocation = CarbonUtil.checkAndAppendFileSystemURIScheme(systemLocation); + systemLocation = FileFactory.getUpdatedFilePath(systemLocation); + } return systemLocation + CarbonCommonConstants.FILE_SEPARATOR + "_system"; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/fc8510a1/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java index deb628c..a00b562 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java @@ -94,9 +94,13 @@ public class CarbonOutputIteratorWrapper extends CarbonIterator<Object[]> { } try { if (isForceClose) { - // unblock the queue.put on the other thread and clear the queue. - queue.clear(); + // first make close is set to true, when force close happens because of dead consumer. + // so that, write() method will stop taking input rows. close = true; + // once write() method stops taking input rows, clear the queue. + // If queue is cleared before close is set to true, then queue will be again filled + // by .write() and it can go to blocking put() forever as consumer is dead. + queue.clear(); return; } // below code will ensure that the last RowBatch is consumed properly