This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new cdf0594  [CARBONDATA-3456] Fix DataLoading to MV table when 
Yarn-Application is killed
cdf0594 is described below

commit cdf0594cb4fefcec6a892692daca2d73f40ccd19
Author: Indhumathi27 <indhumathi...@gmail.com>
AuthorDate: Thu Jun 27 18:16:04 2019 +0530

    [CARBONDATA-3456] Fix DataLoading to MV table when Yarn-Application is 
killed
    
    Problem:
    When dataLoad is triggered on datamaptable and new LoadMetaDetail with 
SegmentStatus as InsertInProgress and segmentMappingInfo is created and then 
yarn-application is killed. Then on next load, stale loadMetadetail is still in 
InsertInProgress state and mainTableSegemnts mapped to that loadMetaDetail is 
not considered for nextLoad resulted in dataMismatch between main table and 
datamap table
    
    Solution:
    Clean up the old invalid segment before creating a new entry for new Load.
    
    This closes #3310
---
 .../carbondata/core/datamap/DataMapProvider.java   | 25 ++++++++++++++++----
 .../carbondata/core/datamap/DataMapUtil.java       | 18 ++++++++++++++-
 .../core/datamap/dev/DataMapSyncStatus.java        | 19 ++++++++-------
 .../carbondata/core/metadata/SegmentFileStore.java |  2 +-
 .../core/statusmanager/SegmentStatusManager.java   | 27 ++++++++++++++++++----
 .../apache/carbondata/core/util/CarbonUtil.java    |  2 +-
 .../bloom/BloomCoarseGrainDataMapFactory.java      |  3 ++-
 .../datamap/lucene/LuceneDataMapFactoryBase.java   |  3 ++-
 .../carbondata/mv/datamap/MVDataMapProvider.scala  |  8 ++-----
 .../mv/rewrite/MVIncrementalLoadingTestcase.scala  |  6 +++--
 .../hadoop/api/CarbonOutputCommitter.java          |  5 ++--
 .../hadoop/api/CarbonTableInputFormat.java         |  6 +++--
 .../carbondata/datamap/IndexDataMapProvider.java   |  4 ++--
 .../datamap/PreAggregateDataMapProvider.java       |  4 ++--
 .../datamap/IndexDataMapRebuildRDD.scala           |  3 ++-
 .../spark/rdd/CarbonDataRDDFactory.scala           |  1 +
 .../spark/sql/events/MergeIndexEventListener.scala |  2 +-
 .../sql/execution/command/cache/CacheUtil.scala    |  4 ++--
 .../command/cache/DropCacheEventListeners.scala    |  3 ++-
 .../command/datamap/CarbonDataMapShowCommand.scala |  5 ++--
 .../command/mutation/HorizontalCompaction.scala    |  6 +++--
 .../CarbonAlterTableDropHivePartitionCommand.scala |  2 +-
 .../CarbonAlterTableDropPartitionCommand.scala     |  3 ++-
 .../CarbonAlterTableSplitPartitionCommand.scala    |  3 ++-
 .../org/apache/spark/sql/hive/CarbonRelation.scala |  4 ++--
 .../org/apache/spark/util/MergeIndexUtil.scala     |  2 +-
 .../processing/merger/CarbonDataMergerUtil.java    |  7 +++---
 27 files changed, 120 insertions(+), 57 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
index d0b66f3..c320226 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
@@ -129,10 +129,15 @@ public abstract class DataMapProvider {
     }
     String newLoadName = "";
     String segmentMap = "";
-    AbsoluteTableIdentifier dataMapTableAbsoluteTableIdentifier = 
AbsoluteTableIdentifier
-        .from(dataMapSchema.getRelationIdentifier().getTablePath(),
+    CarbonTable dataMapTable = CarbonTable
+        
.buildFromTablePath(dataMapSchema.getRelationIdentifier().getTableName(),
             dataMapSchema.getRelationIdentifier().getDatabaseName(),
-            dataMapSchema.getRelationIdentifier().getTableName());
+            dataMapSchema.getRelationIdentifier().getTablePath(),
+            dataMapSchema.getRelationIdentifier().getTableId());
+    AbsoluteTableIdentifier dataMapTableAbsoluteTableIdentifier =
+        dataMapTable.getAbsoluteTableIdentifier();
+    // Clean up the old invalid segment data before creating a new entry for 
new load.
+    SegmentStatusManager.deleteLoadsAndUpdateMetadata(dataMapTable, false, 
null);
     SegmentStatusManager segmentStatusManager =
         new SegmentStatusManager(dataMapTableAbsoluteTableIdentifier);
     Map<String, List<String>> segmentMapping = new HashMap<>();
@@ -148,6 +153,15 @@ public abstract class DataMapProvider {
             
CarbonTablePath.getMetadataPath(dataMapSchema.getRelationIdentifier().getTablePath());
         LoadMetadataDetails[] loadMetaDataDetails =
             SegmentStatusManager.readLoadMetadata(dataMapTableMetadataPath);
+        // Mark for delete all stale loadMetadetail
+        for (LoadMetadataDetails loadMetadataDetail : loadMetaDataDetails) {
+          if ((loadMetadataDetail.getSegmentStatus() == 
SegmentStatus.INSERT_IN_PROGRESS
+              || loadMetadataDetail.getSegmentStatus()
+              == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) && 
loadMetadataDetail.getVisibility()
+              .equalsIgnoreCase("false")) {
+            
loadMetadataDetail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
+          }
+        }
         List<LoadMetadataDetails> listOfLoadFolderDetails =
             new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
         Collections.addAll(listOfLoadFolderDetails, loadMetaDataDetails);
@@ -223,7 +237,7 @@ public abstract class DataMapProvider {
             + " during table status updation");
       }
     }
-    return rebuildInternal(newLoadName, segmentMapping);
+    return rebuildInternal(newLoadName, segmentMapping, dataMapTable);
   }
 
   /**
@@ -395,5 +409,6 @@ public abstract class DataMapProvider {
 
   public abstract boolean supportRebuild();
 
-  public abstract boolean rebuildInternal(String newLoadName, Map<String, 
List<String>> segmentMap);
+  public abstract boolean rebuildInternal(String newLoadName, Map<String, 
List<String>> segmentMap,
+      CarbonTable carbonTable);
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
index 394a1dc..dd9debc 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datamap;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -258,7 +259,7 @@ public class DataMapUtil {
       CarbonTable carbonTable, Configuration configuration) throws IOException 
{
     SegmentStatusManager ssm =
         new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier(), 
configuration);
-    return ssm.getValidAndInvalidSegments();
+    return ssm.getValidAndInvalidSegments(carbonTable.isChildTable());
   }
 
   /**
@@ -280,4 +281,19 @@ public class DataMapUtil {
     return segmentList;
   }
 
+  public static String getMaxSegmentID(List<String> segmentList) {
+    double[] segment = new double[segmentList.size()];
+    int i = 0;
+    for (String id : segmentList) {
+      segment[i] = Double.parseDouble(id);
+      i++;
+    }
+    Arrays.sort(segment);
+    String maxId = Double.toString(segment[segmentList.size() - 1]);
+    if (maxId.endsWith(".0")) {
+      maxId = maxId.substring(0, maxId.indexOf("."));
+    }
+    return maxId;
+  }
+
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapSyncStatus.java
 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapSyncStatus.java
index eb7bf47..d7d60d2 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapSyncStatus.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapSyncStatus.java
@@ -30,6 +30,7 @@ import 
org.apache.carbondata.core.datamap.status.DataMapSegmentStatusUtil;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
@@ -55,14 +56,16 @@ public abstract class DataMapSyncStatus {
         SegmentStatusManager.readLoadMetadata(metaDataPath);
     Map<String, List<String>> dataMapSegmentMap = new HashMap<>();
     for (LoadMetadataDetails loadMetadataDetail : dataMapLoadMetadataDetails) {
-      Map<String, List<String>> segmentMap =
-          
DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetail.getExtraInfo());
-      if (dataMapSegmentMap.isEmpty()) {
-        dataMapSegmentMap.putAll(segmentMap);
-      } else {
-        for (Map.Entry<String, List<String>> entry : segmentMap.entrySet()) {
-          if (null != dataMapSegmentMap.get(entry.getKey())) {
-            dataMapSegmentMap.get(entry.getKey()).addAll(entry.getValue());
+      if (loadMetadataDetail.getSegmentStatus() == SegmentStatus.SUCCESS) {
+        Map<String, List<String>> segmentMap =
+            
DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetail.getExtraInfo());
+        if (dataMapSegmentMap.isEmpty()) {
+          dataMapSegmentMap.putAll(segmentMap);
+        } else {
+          for (Map.Entry<String, List<String>> entry : segmentMap.entrySet()) {
+            if (null != dataMapSegmentMap.get(entry.getKey())) {
+              dataMapSegmentMap.get(entry.getKey()).addAll(entry.getValue());
+            }
           }
         }
       }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index bc4f05b..1c2d50d 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -747,7 +747,7 @@ public class SegmentFileStore {
     if (toBeDeleteSegments.size() > 0 || toBeUpdatedSegments.size() > 0) {
       Set<Segment> segmentSet = new HashSet<>(
           new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier())
-              .getValidAndInvalidSegments().getValidSegments());
+              
.getValidAndInvalidSegments(carbonTable.isChildTable()).getValidSegments());
       CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, 
uniqueId, true,
           Segment.toSegmentList(toBeDeleteSegments, null),
           Segment.toSegmentList(toBeUpdatedSegments, null), uuid);
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 77b9c52..1f645f6 100755
--- 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -103,13 +103,18 @@ public class SegmentStatusManager {
   }
 
   public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments() throws 
IOException {
-    return getValidAndInvalidSegments(null, null);
+    return getValidAndInvalidSegments(false, null, null);
+  }
+
+  public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(Boolean 
isChildTable)
+      throws IOException {
+    return getValidAndInvalidSegments(isChildTable, null, null);
   }
 
   /**
    * get valid segment for given load status details.
    */
-  public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(
+  public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(Boolean 
isChildTable,
       LoadMetadataDetails[] loadMetadataDetails, ReadCommittedScope 
readCommittedScope)
       throws IOException {
 
@@ -162,9 +167,21 @@ public class SegmentStatusManager {
                 new Segment(segment.getLoadName(), segment.getSegmentFile(), 
readCommittedScope));
             continue;
           }
-          listOfValidSegments.add(
-              new Segment(segment.getLoadName(), segment.getSegmentFile(), 
readCommittedScope,
-                  segment));
+          // In case of child table, during loading, if no record is loaded to 
the segment, then
+          // segmentStatus will be marked as 'Success'. During query, don't 
need to add that segment
+          // to validSegment list, as segment does not exists
+          if (isChildTable) {
+            if (!segment.getDataSize().equalsIgnoreCase("0") && 
!segment.getIndexSize()
+                .equalsIgnoreCase("0")) {
+              listOfValidSegments.add(
+                  new Segment(segment.getLoadName(), segment.getSegmentFile(), 
readCommittedScope,
+                      segment));
+            }
+          } else {
+            listOfValidSegments.add(
+                new Segment(segment.getLoadName(), segment.getSegmentFile(), 
readCommittedScope,
+                    segment));
+          }
         } else if ((SegmentStatus.LOAD_FAILURE == segment.getSegmentStatus()
             || SegmentStatus.COMPACTED == segment.getSegmentStatus()
             || SegmentStatus.MARKED_FOR_DELETE == segment.getSegmentStatus())) 
{
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 376c757..7eeff90 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -3143,7 +3143,7 @@ public final class CarbonUtil {
       SegmentStatusManager segmentStatusManager =
           new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier());
       SegmentStatusManager.ValidAndInvalidSegmentsInfo 
validAndInvalidSegmentsInfo =
-          segmentStatusManager.getValidAndInvalidSegments();
+          
segmentStatusManager.getValidAndInvalidSegments(carbonTable.isChildTable());
       List<Segment> validSegments = 
validAndInvalidSegmentsInfo.getValidSegments();
       if (validSegments.isEmpty()) {
         return carbonProperties.getFormatVersion();
diff --git 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
index f261871..071e172 100644
--- 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
+++ 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
@@ -374,7 +374,8 @@ public class BloomCoarseGrainDataMapFactory extends 
DataMapFactory<CoarseGrainDa
     SegmentStatusManager ssm =
         new 
SegmentStatusManager(getCarbonTable().getAbsoluteTableIdentifier());
     try {
-      List<Segment> validSegments = 
ssm.getValidAndInvalidSegments().getValidSegments();
+      List<Segment> validSegments =
+          
ssm.getValidAndInvalidSegments(getCarbonTable().isChildTable()).getValidSegments();
       for (Segment segment : validSegments) {
         deleteDatamapData(segment);
       }
diff --git 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
index 33f30b6..88c6969 100644
--- 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
+++ 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
@@ -179,7 +179,8 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> 
extends DataMapFactor
   private void deleteDatamap() throws MalformedDataMapCommandException {
     SegmentStatusManager ssm = new SegmentStatusManager(tableIdentifier);
     try {
-      List<Segment> validSegments = 
ssm.getValidAndInvalidSegments().getValidSegments();
+      List<Segment> validSegments =
+          
ssm.getValidAndInvalidSegments(getCarbonTable().isChildTable()).getValidSegments();
       for (Segment segment : validSegments) {
         deleteDatamapData(segment);
       }
diff --git 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
index 8bdac4e..b2d9d3b 100644
--- 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
+++ 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
@@ -103,7 +103,8 @@ class MVDataMapProvider(
 
   @throws[IOException]
   override def rebuildInternal(newLoadName: String,
-      segmentMap: java.util.Map[String, java.util.List[String]]): Boolean = {
+      segmentMap: java.util.Map[String, java.util.List[String]],
+      dataMapTable: CarbonTable): Boolean = {
     val ctasQuery = dataMapSchema.getCtasQuery
     if (ctasQuery != null) {
       val identifier = dataMapSchema.getRelationIdentifier
@@ -129,11 +130,6 @@ class MVDataMapProvider(
       if (isFullRefresh) {
         isOverwriteTable = true
       }
-      val dataMapTable = CarbonTable
-        .buildFromTablePath(identifier.getTableName,
-          identifier.getDatabaseName,
-          identifier.getTablePath,
-          identifier.getTableId)
       // Set specified segments for incremental load
       val segmentMapIterator = segmentMap.entrySet().iterator()
       while (segmentMapIterator.hasNext) {
diff --git 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
index 3f07cda..b47dc47 100644
--- 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
+++ 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
@@ -580,15 +580,17 @@ class MVIncrementalLoadingTestcase extends QueryTest with 
BeforeAndAfterAll {
     loadDataToFactTable("test_table")
     sql("drop datamap if exists datamap1")
     sql("create datamap datamap_com using 'mv' as select empname, designation 
from test_table")
-    for (i <- 0 to 4) {
+    for (i <- 0 to 16) {
       loadDataToFactTable("test_table")
     }
     createTableFactTable("test_table1")
-    for (i <- 0 to 5) {
+    for (i <- 0 to 17) {
       loadDataToFactTable("test_table1")
     }
     checkAnswer(sql("select empname, designation from test_table"),
       sql("select empname, designation from test_table1"))
+    val result = sql("show datamap on table test_table").collectAsList()
+    
assert(result.get(0).get(5).toString.contains("\"default.test_table\":\"12.1\""))
     val df = sql(s""" select empname, designation from 
test_table""".stripMargin)
     val analyzed = df.queryExecution.analyzed
     assert(TestUtil.verifyMVDataMap(analyzed, "datamap_com"))
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 02135e2..03445ac 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -196,7 +196,8 @@ public class CarbonOutputCommitter extends 
FileOutputCommitter {
       List<Segment> segmentDeleteList = 
Segment.toSegmentList(segmentsToBeDeleted.split(","), null);
       Set<Segment> segmentSet = new HashSet<>(
           new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier(),
-              
context.getConfiguration()).getValidAndInvalidSegments().getValidSegments());
+              
context.getConfiguration()).getValidAndInvalidSegments(carbonTable.isChildTable())
+              .getValidSegments());
       if (updateTime != null) {
         CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, 
updateTime, true,
             segmentDeleteList);
@@ -231,7 +232,7 @@ public class CarbonOutputCommitter extends 
FileOutputCommitter {
     if (partitionSpecs != null && partitionSpecs.size() > 0) {
       List<Segment> validSegments =
           new SegmentStatusManager(table.getAbsoluteTableIdentifier())
-              .getValidAndInvalidSegments().getValidSegments();
+              
.getValidAndInvalidSegments(table.isChildTable()).getValidSegments();
       String uniqueId = String.valueOf(System.currentTimeMillis());
       List<String> tobeUpdatedSegs = new ArrayList<>();
       List<String> tobeDeletedSegs = new ArrayList<>();
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 90ff520..3b7a800 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -122,7 +122,8 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
     SegmentStatusManager segmentStatusManager = new 
SegmentStatusManager(identifier,
         readCommittedScope.getConfiguration());
     SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = 
segmentStatusManager
-        .getValidAndInvalidSegments(loadMetadataDetails, 
this.readCommittedScope);
+        .getValidAndInvalidSegments(carbonTable.isChildTable(), 
loadMetadataDetails,
+            this.readCommittedScope);
 
     // to check whether only streaming segments access is enabled or not,
     // if access streaming segment is true then data will be read from 
streaming segments
@@ -523,7 +524,8 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
         table, loadMetadataDetails);
     SegmentStatusManager.ValidAndInvalidSegmentsInfo allSegments =
         new SegmentStatusManager(identifier, 
readCommittedScope.getConfiguration())
-            .getValidAndInvalidSegments(loadMetadataDetails, 
readCommittedScope);
+            .getValidAndInvalidSegments(table.isChildTable(), 
loadMetadataDetails,
+                readCommittedScope);
     Map<String, Long> blockRowCountMapping = new HashMap<>();
     Map<String, Long> segmentAndBlockCountMapping = new HashMap<>();
 
diff --git 
a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
 
b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
index f6031d1..f75ba48 100644
--- 
a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
+++ 
b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
@@ -135,8 +135,8 @@ public class IndexDataMapProvider extends DataMapProvider {
     return dataMapFactory.supportRebuild();
   }
 
-  @Override
-  public boolean rebuildInternal(String newLoadName, Map<String, List<String>> 
segmentMap) {
+  @Override public boolean rebuildInternal(String newLoadName, Map<String, 
List<String>> segmentMap,
+      CarbonTable carbonTable) {
     return false;
   }
 }
diff --git 
a/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
 
b/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
index 72390ce..12da32f 100644
--- 
a/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
+++ 
b/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
@@ -112,8 +112,8 @@ public class PreAggregateDataMapProvider extends 
DataMapProvider {
     return false;
   }
 
-  @Override
-  public boolean rebuildInternal(String newLoadName, Map<String, List<String>> 
segmentMap) {
+  @Override public boolean rebuildInternal(String newLoadName, Map<String, 
List<String>> segmentMap,
+      CarbonTable carbonTable) {
     return false;
   }
 }
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
index a35de58..31d1390 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
@@ -79,7 +79,8 @@ object IndexDataMapRebuildRDD {
   ): Unit = {
     val tableIdentifier = carbonTable.getAbsoluteTableIdentifier
     val segmentStatusManager = new SegmentStatusManager(tableIdentifier)
-    val validAndInvalidSegments = 
segmentStatusManager.getValidAndInvalidSegments()
+    val validAndInvalidSegments = segmentStatusManager
+      .getValidAndInvalidSegments(carbonTable.isChildTable)
     val validSegments = validAndInvalidSegments.getValidSegments
     val indexedCarbonColumns = carbonTable.getIndexedColumns(schema)
     val operationContext = new OperationContext()
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 2e5f0ea..53e2eba 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -518,6 +518,7 @@ object CarbonDataRDDFactory {
       val newEntryLoadStatus =
         if (carbonLoadModel.isCarbonTransactionalTable &&
             
!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap &&
+            
!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildTable &&
             !CarbonLoaderUtil.isValidSegment(carbonLoadModel, 
carbonLoadModel.getSegmentId.toInt)) {
           LOGGER.warn("Cannot write load metadata file as there is no data to 
load")
           SegmentStatus.MARKED_FOR_DELETE
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
index 7844f28..967f390 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
@@ -85,7 +85,7 @@ class MergeIndexEventListener extends OperationEventListener 
with Logging {
                               .getTableName
                           }")
               val validSegments: mutable.Buffer[Segment] = 
CarbonDataMergerUtil.getValidSegmentList(
-                carbonMainTable.getAbsoluteTableIdentifier).asScala
+                carbonMainTable.getAbsoluteTableIdentifier, 
carbonMainTable.isChildTable).asScala
               val validSegmentIds: mutable.Buffer[String] = 
mutable.Buffer[String]()
               validSegments.foreach { segment =>
                 validSegmentIds += segment.getSegmentNo
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala
index 8975027..0565e20 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala
@@ -51,7 +51,7 @@ object CacheUtil {
     if (carbonTable.isTransactionalTable) {
       val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
       val validAndInvalidSegmentsInfo = new 
SegmentStatusManager(absoluteTableIdentifier)
-        .getValidAndInvalidSegments()
+        .getValidAndInvalidSegments(carbonTable.isChildTable)
       // Fire a job to clear the invalid segments cached in the executors.
       if 
(CarbonProperties.getInstance().isDistributedPruningEnabled(carbonTable.getDatabaseName,
         carbonTable.getTableName)) {
@@ -111,7 +111,7 @@ object CacheUtil {
 
   def getBloomCacheKeys(carbonTable: CarbonTable, datamap: DataMapSchema): 
List[String] = {
     val segments = CarbonDataMergerUtil
-      .getValidSegmentList(carbonTable.getAbsoluteTableIdentifier).asScala
+      .getValidSegmentList(carbonTable.getAbsoluteTableIdentifier, 
carbonTable.isChildTable).asScala
 
     // Generate shard Path for the datamap
     val shardPaths = segments.flatMap {
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCacheEventListeners.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCacheEventListeners.scala
index e695f3a..c2181f2 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCacheEventListeners.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCacheEventListeners.scala
@@ -109,7 +109,8 @@ object DropCacheBloomEventListener extends 
OperationEventListener {
         val datamaps = 
DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable)
           .asScala.toList
         val segments = CarbonDataMergerUtil
-          
.getValidSegmentList(carbonTable.getAbsoluteTableIdentifier).asScala.toList
+          .getValidSegmentList(carbonTable.getAbsoluteTableIdentifier, 
carbonTable.isChildTable)
+          .asScala.toList
 
         datamaps.foreach {
           case datamap if datamap.getProviderName
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
index 30cd3ef..b44cf79 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.command.{Checker, 
DataCommand}
 import org.apache.spark.sql.types.StringType
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.datamap.{DataMapStoreManager, DataMapUtil}
 import org.apache.carbondata.core.datamap.status.{DataMapSegmentStatusUtil, 
DataMapStatus, DataMapStatusManager}
 import 
org.apache.carbondata.core.metadata.schema.datamap.{DataMapClassProvider, 
DataMapProperty}
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
@@ -125,7 +125,8 @@ case class CarbonDataMapShowCommand(tableIdentifier: 
Option[TableIdentifier])
                     val iterator = segmentMaps.entrySet().iterator()
                     while (iterator.hasNext) {
                       val entry = iterator.next()
-                      syncInfoMap.put(entry.getKey, 
entry.getValue.get(entry.getValue.size() - 1))
+
+                      syncInfoMap.put(entry.getKey, 
DataMapUtil.getMaxSegmentID(entry.getValue))
                     }
                     val loadEndTime =
                       if (loadMetadataDetails(i).getLoadEndTime ==
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
index 6224d0d..fb20e4f 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
@@ -63,7 +63,8 @@ object HorizontalCompaction {
     val deleteTimeStamp = updateTimeStamp + 1
 
     // get the valid segments
-    var segLists = CarbonDataMergerUtil.getValidSegmentList(absTableIdentifier)
+    var segLists = CarbonDataMergerUtil
+      .getValidSegmentList(absTableIdentifier, carbonTable.isChildTable)
 
     if (segLists == null || segLists.size() == 0) {
       return
@@ -91,7 +92,8 @@ object HorizontalCompaction {
 
     // After Update Compaction perform delete compaction
     compactionTypeIUD = CompactionType.IUD_DELETE_DELTA
-    segLists = CarbonDataMergerUtil.getValidSegmentList(absTableIdentifier)
+    segLists = CarbonDataMergerUtil
+      .getValidSegmentList(absTableIdentifier, carbonTable.isChildTable)
     if (segLists == null || segLists.size() == 0) {
       return
     }
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
index 44e51a1..d6abf86 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
@@ -173,7 +173,7 @@ case class CarbonAlterTableDropHivePartitionCommand(
         ""
       }
       val segments = new SegmentStatusManager(table.getAbsoluteTableIdentifier)
-        .getValidAndInvalidSegments.getValidSegments
+        .getValidAndInvalidSegments(table.isChildTable).getValidSegments
       // First drop the partitions from partition mapper files of each segment
       val tuples = new CarbonDropPartitionRDD(sparkSession,
         table.getTablePath,
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
index b40bb6d..566e44e 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
@@ -203,7 +203,8 @@ case class CarbonAlterTableDropPartitionCommand(
       val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
       val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
       val segmentStatusManager = new 
SegmentStatusManager(absoluteTableIdentifier)
-      val validSegments = 
segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala
+      val validSegments = 
segmentStatusManager.getValidAndInvalidSegments(carbonTable.isChildTable)
+        .getValidSegments.asScala
       val threadArray: Array[Thread] = new Array[Thread](validSegments.size)
       var i = 0
       for (segmentId: Segment <- validSegments) {
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
index 4d32d00..72c3142 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
@@ -221,7 +221,8 @@ case class CarbonAlterTableSplitPartitionCommand(
       val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
       val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
       val segmentStatusManager = new 
SegmentStatusManager(absoluteTableIdentifier)
-      val validSegments = 
segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala
+      val validSegments = 
segmentStatusManager.getValidAndInvalidSegments(carbonTable.isChildTable)
+        .getValidSegments.asScala
       val threadArray: Array[SplitThread] = new 
Array[SplitThread](validSegments.size)
       var i = 0
       validSegments.foreach { segmentId =>
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index ad3eb72..04999a8 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -180,7 +180,7 @@ case class CarbonRelation(
         carbonTable.getAbsoluteTableIdentifier)
       if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) {
         if (new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
-          .getValidAndInvalidSegments.getValidSegments.isEmpty) {
+          
.getValidAndInvalidSegments(carbonTable.isChildTable).getValidSegments.isEmpty) 
{
           sizeInBytesLocalValue = 0L
         } else {
           val tablePath = carbonTable.getTablePath
@@ -188,7 +188,7 @@ case class CarbonRelation(
           if (FileFactory.isFileExist(tablePath, fileType)) {
             // get the valid segments
             val segments = new 
SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
-              .getValidAndInvalidSegments.getValidSegments.asScala
+              
.getValidAndInvalidSegments(carbonTable.isChildTable).getValidSegments.asScala
             var size = 0L
             // for each segment calculate the size
             segments.foreach { validSeg =>
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/util/MergeIndexUtil.scala 
b/integration/spark2/src/main/scala/org/apache/spark/util/MergeIndexUtil.scala
index 85a22cc..d2d39e4 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/util/MergeIndexUtil.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/util/MergeIndexUtil.scala
@@ -73,7 +73,7 @@ object MergeIndexUtil {
     mergedLoads: util.List[String]): Unit = {
     // get only the valid segments of the table
     val validSegments: mutable.Buffer[Segment] = 
CarbonDataMergerUtil.getValidSegmentList(
-      carbonTable.getAbsoluteTableIdentifier).asScala
+      carbonTable.getAbsoluteTableIdentifier, carbonTable.isChildTable).asScala
     val mergedSegmentIds = new util.ArrayList[String]()
     mergedLoads.asScala.foreach(mergedLoad => {
       val loadName = mergedLoad
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 1d51592..f0abf44 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -953,13 +953,14 @@ public final class CarbonDataMergerUtil {
    * @param absoluteTableIdentifier
    * @return
    */
-  public static List<Segment> getValidSegmentList(AbsoluteTableIdentifier 
absoluteTableIdentifier)
+  public static List<Segment> getValidSegmentList(AbsoluteTableIdentifier 
absoluteTableIdentifier,
+      Boolean isChildTable)
           throws IOException {
 
     SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments = 
null;
     try {
-      validAndInvalidSegments =
-              new 
SegmentStatusManager(absoluteTableIdentifier).getValidAndInvalidSegments();
+      validAndInvalidSegments = new 
SegmentStatusManager(absoluteTableIdentifier)
+          .getValidAndInvalidSegments(isChildTable);
     } catch (IOException e) {
       LOGGER.error("Error while getting valid segment list for a table 
identifier");
       throw new IOException();

Reply via email to