[CARBONDATA-2784][CARBONDATA-2786][SDK writer] Fixed:Forever blocking wait with more than 21 batch of data
problem: [CARBONDATA-2784] [SDK writer] Forever blocking wait with more than 21 batch of data, when consumer is dead due to data loading exception (bad record / out of memory) root cause: When the consumer is dead due to data loading exception, writer will be forcefully closed. but queue.clear() cleared only snapshot of entries (10 batches) and close is set to true after that. In between clear() and close = true, If more than 10 batches of data is again put into queue. For 11th batch, queue.put() goes for forever block as consumer is dead. Solution: set close = true, before clearing the queue. This will avoid adding more batches to queue from write(). problem [CARBONDATA-2786] NPE when SDK writer tries to write a file solution and cause: #2387 , in CarbonProperties.java After systemLocation = getStorePath(); Null validation missing for systemLocation. because this can be null in SDK case. As Store location is not applicable for SDK. All a null validation. This closes #2561 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/339f1a80 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/339f1a80 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/339f1a80 Branch: refs/heads/branch-1.4 Commit: 339f1a8015c0a562bd11b6f8fce2541a1598d19e Parents: 769b306 Author: ajantha-bhat <ajanthab...@gmail.com> Authored: Thu Jul 26 00:35:36 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Tue Jul 31 00:11:26 2018 +0530 ---------------------------------------------------------------------- .../org/apache/carbondata/core/util/CarbonProperties.java | 6 ++++-- .../loading/iterator/CarbonOutputIteratorWrapper.java | 8 ++++++-- 2 files changed, 10 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/339f1a80/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index 004eb74..8a91a43 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -1532,8 +1532,10 @@ public final class CarbonProperties { if (systemLocation == null) { systemLocation = getStorePath(); } - systemLocation = CarbonUtil.checkAndAppendFileSystemURIScheme(systemLocation); - systemLocation = FileFactory.getUpdatedFilePath(systemLocation); + if (systemLocation != null) { + systemLocation = CarbonUtil.checkAndAppendFileSystemURIScheme(systemLocation); + systemLocation = FileFactory.getUpdatedFilePath(systemLocation); + } return systemLocation + CarbonCommonConstants.FILE_SEPARATOR + "_system"; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/339f1a80/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java index deb628c..a00b562 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java @@ -94,9 +94,13 @@ public class CarbonOutputIteratorWrapper extends CarbonIterator<Object[]> { } try { if (isForceClose) { - // unblock the queue.put on the other thread and clear the queue. - queue.clear(); + // first make close is set to true, when force close happens because of dead consumer. + // so that, write() method will stop taking input rows. close = true; + // once write() method stops taking input rows, clear the queue. + // If queue is cleared before close is set to true, then queue will be again filled + // by .write() and it can go to blocking put() forever as consumer is dead. + queue.clear(); return; } // below code will ensure that the last RowBatch is consumed properly