This is an automated email from the ASF dual-hosted git repository. ravipesala pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new bd16325 [CARBONDATA-3384] Fix NullPointerException for update/delete using index server bd16325 is described below commit bd1632564acb248db7080b9fd5f76b8e8da79101 Author: kunal642 <kunalkapoor...@gmail.com> AuthorDate: Wed May 15 11:35:18 2019 +0530 [CARBONDATA-3384] Fix NullPointerException for update/delete using index server Problem: After update the segment cache is cleared from the executor, then in any subsequent query only one index file is considered for creating the BlockUniqueIdentifier. Therefore the query throws NullPointer when accessing the segmentProperties. Solution: Consider all index file for the segment for Identifier creation. This closes #3218 --- .../indexstore/blockletindex/BlockletDataMapFactory.java | 4 ++-- .../carbondata/hadoop/api/CarbonTableInputFormat.java | 4 +++- .../indexserver/InvalidateSegmentCacheRDD.scala | 16 ++++++++++------ 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java index e4a3ad8..446507f 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java @@ -344,6 +344,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = segmentMap.get(distributable.getSegment().getSegmentNo()); if (tableBlockIndexUniqueIdentifiers == null) { + tableBlockIndexUniqueIdentifiers = new HashSet<>(); Set<String> indexFiles = distributable.getSegment().getCommittedIndexFile().keySet(); for (String indexFile : indexFiles) { CarbonFile carbonFile = FileFactory.getCarbonFile(indexFile); @@ -363,10 +364,9 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory identifiersWrapper.add( new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier, this.getCarbonTable())); - tableBlockIndexUniqueIdentifiers = new HashSet<>(); tableBlockIndexUniqueIdentifiers.add(tableBlockIndexUniqueIdentifier); - segmentMap.put(distributable.getSegment().getSegmentNo(), tableBlockIndexUniqueIdentifiers); } + segmentMap.put(distributable.getSegment().getSegmentNo(), tableBlockIndexUniqueIdentifiers); } else { for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : tableBlockIndexUniqueIdentifiers) { diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index 458c95e..dd86dcb 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -564,7 +564,9 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { allSegments.getInvalidSegments(), toBeCleanedSegments)); for (InputSplit extendedBlocklet : extendedBlocklets) { CarbonInputSplit blocklet = (CarbonInputSplit) extendedBlocklet; - blockletToRowCountMap.put(blocklet.getSegmentId() + "," + blocklet.getFilePath(), + String filePath = blocklet.getFilePath(); + String blockName = filePath.substring(filePath.lastIndexOf("/") + 1); + blockletToRowCountMap.put(blocklet.getSegmentId() + "," + blockName, (long) blocklet.getDetailInfo().getRowCount()); } } else { diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala index 1aa8cd9..bc83d2f 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala @@ -43,12 +43,16 @@ class InvalidateSegmentCacheRDD(@transient private val ss: SparkSession, databas } override protected def internalGetPartitions: Array[Partition] = { - executorsList.zipWithIndex.map { - case (executor, idx) => - // create a dummy split for each executor to accumulate the cache size. - val dummySplit = new CarbonInputSplit() - dummySplit.setLocation(Array(executor)) - new DataMapRDDPartition(id, idx, dummySplit) + if (invalidSegmentIds.isEmpty) { + Array() + } else { + executorsList.zipWithIndex.map { + case (executor, idx) => + // create a dummy split for each executor to accumulate the cache size. + val dummySplit = new CarbonInputSplit() + dummySplit.setLocation(Array(executor)) + new DataMapRDDPartition(id, idx, dummySplit) + } } } }