This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new bd16325  [CARBONDATA-3384] Fix NullPointerException for update/delete 
using index server
bd16325 is described below

commit bd1632564acb248db7080b9fd5f76b8e8da79101
Author: kunal642 <kunalkapoor...@gmail.com>
AuthorDate: Wed May 15 11:35:18 2019 +0530

    [CARBONDATA-3384] Fix NullPointerException for update/delete using index 
server
    
    Problem:
    After update the segment cache is cleared from the executor, then in any 
subsequent query only one index file is considered for creating the 
BlockUniqueIdentifier. Therefore the query throws NullPointer when accessing 
the segmentProperties.
    
    Solution:
    Consider all index file for the segment for Identifier creation.
    
    This closes #3218
---
 .../indexstore/blockletindex/BlockletDataMapFactory.java |  4 ++--
 .../carbondata/hadoop/api/CarbonTableInputFormat.java    |  4 +++-
 .../indexserver/InvalidateSegmentCacheRDD.scala          | 16 ++++++++++------
 3 files changed, 15 insertions(+), 9 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index e4a3ad8..446507f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -344,6 +344,7 @@ public class BlockletDataMapFactory extends 
CoarseGrainDataMapFactory
     Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
         segmentMap.get(distributable.getSegment().getSegmentNo());
     if (tableBlockIndexUniqueIdentifiers == null) {
+      tableBlockIndexUniqueIdentifiers = new HashSet<>();
       Set<String> indexFiles = 
distributable.getSegment().getCommittedIndexFile().keySet();
       for (String indexFile : indexFiles) {
         CarbonFile carbonFile = FileFactory.getCarbonFile(indexFile);
@@ -363,10 +364,9 @@ public class BlockletDataMapFactory extends 
CoarseGrainDataMapFactory
         identifiersWrapper.add(
             new 
TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
                 this.getCarbonTable()));
-        tableBlockIndexUniqueIdentifiers = new HashSet<>();
         tableBlockIndexUniqueIdentifiers.add(tableBlockIndexUniqueIdentifier);
-        segmentMap.put(distributable.getSegment().getSegmentNo(), 
tableBlockIndexUniqueIdentifiers);
       }
+      segmentMap.put(distributable.getSegment().getSegmentNo(), 
tableBlockIndexUniqueIdentifiers);
     } else {
       for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier :
           tableBlockIndexUniqueIdentifiers) {
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 458c95e..dd86dcb 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -564,7 +564,9 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
                 allSegments.getInvalidSegments(), toBeCleanedSegments));
         for (InputSplit extendedBlocklet : extendedBlocklets) {
           CarbonInputSplit blocklet = (CarbonInputSplit) extendedBlocklet;
-          blockletToRowCountMap.put(blocklet.getSegmentId() + "," + 
blocklet.getFilePath(),
+          String filePath = blocklet.getFilePath();
+          String blockName = filePath.substring(filePath.lastIndexOf("/") + 1);
+          blockletToRowCountMap.put(blocklet.getSegmentId() + "," + blockName,
               (long) blocklet.getDetailInfo().getRowCount());
         }
       } else {
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala
index 1aa8cd9..bc83d2f 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala
@@ -43,12 +43,16 @@ class InvalidateSegmentCacheRDD(@transient private val ss: 
SparkSession, databas
   }
 
   override protected def internalGetPartitions: Array[Partition] = {
-    executorsList.zipWithIndex.map {
-      case (executor, idx) =>
-        // create a dummy split for each executor to accumulate the cache size.
-        val dummySplit = new CarbonInputSplit()
-        dummySplit.setLocation(Array(executor))
-        new DataMapRDDPartition(id, idx, dummySplit)
+    if (invalidSegmentIds.isEmpty) {
+      Array()
+    } else {
+      executorsList.zipWithIndex.map {
+        case (executor, idx) =>
+          // create a dummy split for each executor to accumulate the cache 
size.
+          val dummySplit = new CarbonInputSplit()
+          dummySplit.setLocation(Array(executor))
+          new DataMapRDDPartition(id, idx, dummySplit)
+      }
     }
   }
 }

Reply via email to