This is an automated email from the ASF dual-hosted git repository. ravipesala pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new cdf0594 [CARBONDATA-3456] Fix DataLoading to MV table when Yarn-Application is killed cdf0594 is described below commit cdf0594cb4fefcec6a892692daca2d73f40ccd19 Author: Indhumathi27 <indhumathi...@gmail.com> AuthorDate: Thu Jun 27 18:16:04 2019 +0530 [CARBONDATA-3456] Fix DataLoading to MV table when Yarn-Application is killed Problem: When dataLoad is triggered on datamaptable and new LoadMetaDetail with SegmentStatus as InsertInProgress and segmentMappingInfo is created and then yarn-application is killed. Then on next load, stale loadMetadetail is still in InsertInProgress state and mainTableSegemnts mapped to that loadMetaDetail is not considered for nextLoad resulted in dataMismatch between main table and datamap table Solution: Clean up the old invalid segment before creating a new entry for new Load. This closes #3310 --- .../carbondata/core/datamap/DataMapProvider.java | 25 ++++++++++++++++---- .../carbondata/core/datamap/DataMapUtil.java | 18 ++++++++++++++- .../core/datamap/dev/DataMapSyncStatus.java | 19 ++++++++------- .../carbondata/core/metadata/SegmentFileStore.java | 2 +- .../core/statusmanager/SegmentStatusManager.java | 27 ++++++++++++++++++---- .../apache/carbondata/core/util/CarbonUtil.java | 2 +- .../bloom/BloomCoarseGrainDataMapFactory.java | 3 ++- .../datamap/lucene/LuceneDataMapFactoryBase.java | 3 ++- .../carbondata/mv/datamap/MVDataMapProvider.scala | 8 ++----- .../mv/rewrite/MVIncrementalLoadingTestcase.scala | 6 +++-- .../hadoop/api/CarbonOutputCommitter.java | 5 ++-- .../hadoop/api/CarbonTableInputFormat.java | 6 +++-- .../carbondata/datamap/IndexDataMapProvider.java | 4 ++-- .../datamap/PreAggregateDataMapProvider.java | 4 ++-- .../datamap/IndexDataMapRebuildRDD.scala | 3 ++- .../spark/rdd/CarbonDataRDDFactory.scala | 1 + .../spark/sql/events/MergeIndexEventListener.scala | 2 +- .../sql/execution/command/cache/CacheUtil.scala | 4 ++-- .../command/cache/DropCacheEventListeners.scala | 3 ++- .../command/datamap/CarbonDataMapShowCommand.scala | 5 ++-- .../command/mutation/HorizontalCompaction.scala | 6 +++-- .../CarbonAlterTableDropHivePartitionCommand.scala | 2 +- .../CarbonAlterTableDropPartitionCommand.scala | 3 ++- .../CarbonAlterTableSplitPartitionCommand.scala | 3 ++- .../org/apache/spark/sql/hive/CarbonRelation.scala | 4 ++-- .../org/apache/spark/util/MergeIndexUtil.scala | 2 +- .../processing/merger/CarbonDataMergerUtil.java | 7 +++--- 27 files changed, 120 insertions(+), 57 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java index d0b66f3..c320226 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java @@ -129,10 +129,15 @@ public abstract class DataMapProvider { } String newLoadName = ""; String segmentMap = ""; - AbsoluteTableIdentifier dataMapTableAbsoluteTableIdentifier = AbsoluteTableIdentifier - .from(dataMapSchema.getRelationIdentifier().getTablePath(), + CarbonTable dataMapTable = CarbonTable + .buildFromTablePath(dataMapSchema.getRelationIdentifier().getTableName(), dataMapSchema.getRelationIdentifier().getDatabaseName(), - dataMapSchema.getRelationIdentifier().getTableName()); + dataMapSchema.getRelationIdentifier().getTablePath(), + dataMapSchema.getRelationIdentifier().getTableId()); + AbsoluteTableIdentifier dataMapTableAbsoluteTableIdentifier = + dataMapTable.getAbsoluteTableIdentifier(); + // Clean up the old invalid segment data before creating a new entry for new load. + SegmentStatusManager.deleteLoadsAndUpdateMetadata(dataMapTable, false, null); SegmentStatusManager segmentStatusManager = new SegmentStatusManager(dataMapTableAbsoluteTableIdentifier); Map<String, List<String>> segmentMapping = new HashMap<>(); @@ -148,6 +153,15 @@ public abstract class DataMapProvider { CarbonTablePath.getMetadataPath(dataMapSchema.getRelationIdentifier().getTablePath()); LoadMetadataDetails[] loadMetaDataDetails = SegmentStatusManager.readLoadMetadata(dataMapTableMetadataPath); + // Mark for delete all stale loadMetadetail + for (LoadMetadataDetails loadMetadataDetail : loadMetaDataDetails) { + if ((loadMetadataDetail.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS + || loadMetadataDetail.getSegmentStatus() + == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) && loadMetadataDetail.getVisibility() + .equalsIgnoreCase("false")) { + loadMetadataDetail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE); + } + } List<LoadMetadataDetails> listOfLoadFolderDetails = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); Collections.addAll(listOfLoadFolderDetails, loadMetaDataDetails); @@ -223,7 +237,7 @@ public abstract class DataMapProvider { + " during table status updation"); } } - return rebuildInternal(newLoadName, segmentMapping); + return rebuildInternal(newLoadName, segmentMapping, dataMapTable); } /** @@ -395,5 +409,6 @@ public abstract class DataMapProvider { public abstract boolean supportRebuild(); - public abstract boolean rebuildInternal(String newLoadName, Map<String, List<String>> segmentMap); + public abstract boolean rebuildInternal(String newLoadName, Map<String, List<String>> segmentMap, + CarbonTable carbonTable); } diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java index 394a1dc..dd9debc 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java @@ -19,6 +19,7 @@ package org.apache.carbondata.core.datamap; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -258,7 +259,7 @@ public class DataMapUtil { CarbonTable carbonTable, Configuration configuration) throws IOException { SegmentStatusManager ssm = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier(), configuration); - return ssm.getValidAndInvalidSegments(); + return ssm.getValidAndInvalidSegments(carbonTable.isChildTable()); } /** @@ -280,4 +281,19 @@ public class DataMapUtil { return segmentList; } + public static String getMaxSegmentID(List<String> segmentList) { + double[] segment = new double[segmentList.size()]; + int i = 0; + for (String id : segmentList) { + segment[i] = Double.parseDouble(id); + i++; + } + Arrays.sort(segment); + String maxId = Double.toString(segment[segmentList.size() - 1]); + if (maxId.endsWith(".0")) { + maxId = maxId.substring(0, maxId.indexOf(".")); + } + return maxId; + } + } diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapSyncStatus.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapSyncStatus.java index eb7bf47..d7d60d2 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapSyncStatus.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapSyncStatus.java @@ -30,6 +30,7 @@ import org.apache.carbondata.core.datamap.status.DataMapSegmentStatusUtil; import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.util.path.CarbonTablePath; @@ -55,14 +56,16 @@ public abstract class DataMapSyncStatus { SegmentStatusManager.readLoadMetadata(metaDataPath); Map<String, List<String>> dataMapSegmentMap = new HashMap<>(); for (LoadMetadataDetails loadMetadataDetail : dataMapLoadMetadataDetails) { - Map<String, List<String>> segmentMap = - DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetail.getExtraInfo()); - if (dataMapSegmentMap.isEmpty()) { - dataMapSegmentMap.putAll(segmentMap); - } else { - for (Map.Entry<String, List<String>> entry : segmentMap.entrySet()) { - if (null != dataMapSegmentMap.get(entry.getKey())) { - dataMapSegmentMap.get(entry.getKey()).addAll(entry.getValue()); + if (loadMetadataDetail.getSegmentStatus() == SegmentStatus.SUCCESS) { + Map<String, List<String>> segmentMap = + DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetail.getExtraInfo()); + if (dataMapSegmentMap.isEmpty()) { + dataMapSegmentMap.putAll(segmentMap); + } else { + for (Map.Entry<String, List<String>> entry : segmentMap.entrySet()) { + if (null != dataMapSegmentMap.get(entry.getKey())) { + dataMapSegmentMap.get(entry.getKey()).addAll(entry.getValue()); + } } } } diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java index bc4f05b..1c2d50d 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java @@ -747,7 +747,7 @@ public class SegmentFileStore { if (toBeDeleteSegments.size() > 0 || toBeUpdatedSegments.size() > 0) { Set<Segment> segmentSet = new HashSet<>( new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier()) - .getValidAndInvalidSegments().getValidSegments()); + .getValidAndInvalidSegments(carbonTable.isChildTable()).getValidSegments()); CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, uniqueId, true, Segment.toSegmentList(toBeDeleteSegments, null), Segment.toSegmentList(toBeUpdatedSegments, null), uuid); diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java index 77b9c52..1f645f6 100755 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java @@ -103,13 +103,18 @@ public class SegmentStatusManager { } public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments() throws IOException { - return getValidAndInvalidSegments(null, null); + return getValidAndInvalidSegments(false, null, null); + } + + public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(Boolean isChildTable) + throws IOException { + return getValidAndInvalidSegments(isChildTable, null, null); } /** * get valid segment for given load status details. */ - public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments( + public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(Boolean isChildTable, LoadMetadataDetails[] loadMetadataDetails, ReadCommittedScope readCommittedScope) throws IOException { @@ -162,9 +167,21 @@ public class SegmentStatusManager { new Segment(segment.getLoadName(), segment.getSegmentFile(), readCommittedScope)); continue; } - listOfValidSegments.add( - new Segment(segment.getLoadName(), segment.getSegmentFile(), readCommittedScope, - segment)); + // In case of child table, during loading, if no record is loaded to the segment, then + // segmentStatus will be marked as 'Success'. During query, don't need to add that segment + // to validSegment list, as segment does not exists + if (isChildTable) { + if (!segment.getDataSize().equalsIgnoreCase("0") && !segment.getIndexSize() + .equalsIgnoreCase("0")) { + listOfValidSegments.add( + new Segment(segment.getLoadName(), segment.getSegmentFile(), readCommittedScope, + segment)); + } + } else { + listOfValidSegments.add( + new Segment(segment.getLoadName(), segment.getSegmentFile(), readCommittedScope, + segment)); + } } else if ((SegmentStatus.LOAD_FAILURE == segment.getSegmentStatus() || SegmentStatus.COMPACTED == segment.getSegmentStatus() || SegmentStatus.MARKED_FOR_DELETE == segment.getSegmentStatus())) { diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index 376c757..7eeff90 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -3143,7 +3143,7 @@ public final class CarbonUtil { SegmentStatusManager segmentStatusManager = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier()); SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegmentsInfo = - segmentStatusManager.getValidAndInvalidSegments(); + segmentStatusManager.getValidAndInvalidSegments(carbonTable.isChildTable()); List<Segment> validSegments = validAndInvalidSegmentsInfo.getValidSegments(); if (validSegments.isEmpty()) { return carbonProperties.getFormatVersion(); diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java index f261871..071e172 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java @@ -374,7 +374,8 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa SegmentStatusManager ssm = new SegmentStatusManager(getCarbonTable().getAbsoluteTableIdentifier()); try { - List<Segment> validSegments = ssm.getValidAndInvalidSegments().getValidSegments(); + List<Segment> validSegments = + ssm.getValidAndInvalidSegments(getCarbonTable().isChildTable()).getValidSegments(); for (Segment segment : validSegments) { deleteDatamapData(segment); } diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java index 33f30b6..88c6969 100644 --- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java +++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java @@ -179,7 +179,8 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> extends DataMapFactor private void deleteDatamap() throws MalformedDataMapCommandException { SegmentStatusManager ssm = new SegmentStatusManager(tableIdentifier); try { - List<Segment> validSegments = ssm.getValidAndInvalidSegments().getValidSegments(); + List<Segment> validSegments = + ssm.getValidAndInvalidSegments(getCarbonTable().isChildTable()).getValidSegments(); for (Segment segment : validSegments) { deleteDatamapData(segment); } diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala index 8bdac4e..b2d9d3b 100644 --- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala @@ -103,7 +103,8 @@ class MVDataMapProvider( @throws[IOException] override def rebuildInternal(newLoadName: String, - segmentMap: java.util.Map[String, java.util.List[String]]): Boolean = { + segmentMap: java.util.Map[String, java.util.List[String]], + dataMapTable: CarbonTable): Boolean = { val ctasQuery = dataMapSchema.getCtasQuery if (ctasQuery != null) { val identifier = dataMapSchema.getRelationIdentifier @@ -129,11 +130,6 @@ class MVDataMapProvider( if (isFullRefresh) { isOverwriteTable = true } - val dataMapTable = CarbonTable - .buildFromTablePath(identifier.getTableName, - identifier.getDatabaseName, - identifier.getTablePath, - identifier.getTableId) // Set specified segments for incremental load val segmentMapIterator = segmentMap.entrySet().iterator() while (segmentMapIterator.hasNext) { diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala index 3f07cda..b47dc47 100644 --- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala +++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala @@ -580,15 +580,17 @@ class MVIncrementalLoadingTestcase extends QueryTest with BeforeAndAfterAll { loadDataToFactTable("test_table") sql("drop datamap if exists datamap1") sql("create datamap datamap_com using 'mv' as select empname, designation from test_table") - for (i <- 0 to 4) { + for (i <- 0 to 16) { loadDataToFactTable("test_table") } createTableFactTable("test_table1") - for (i <- 0 to 5) { + for (i <- 0 to 17) { loadDataToFactTable("test_table1") } checkAnswer(sql("select empname, designation from test_table"), sql("select empname, designation from test_table1")) + val result = sql("show datamap on table test_table").collectAsList() + assert(result.get(0).get(5).toString.contains("\"default.test_table\":\"12.1\"")) val df = sql(s""" select empname, designation from test_table""".stripMargin) val analyzed = df.queryExecution.analyzed assert(TestUtil.verifyMVDataMap(analyzed, "datamap_com")) diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java index 02135e2..03445ac 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java @@ -196,7 +196,8 @@ public class CarbonOutputCommitter extends FileOutputCommitter { List<Segment> segmentDeleteList = Segment.toSegmentList(segmentsToBeDeleted.split(","), null); Set<Segment> segmentSet = new HashSet<>( new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier(), - context.getConfiguration()).getValidAndInvalidSegments().getValidSegments()); + context.getConfiguration()).getValidAndInvalidSegments(carbonTable.isChildTable()) + .getValidSegments()); if (updateTime != null) { CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, updateTime, true, segmentDeleteList); @@ -231,7 +232,7 @@ public class CarbonOutputCommitter extends FileOutputCommitter { if (partitionSpecs != null && partitionSpecs.size() > 0) { List<Segment> validSegments = new SegmentStatusManager(table.getAbsoluteTableIdentifier()) - .getValidAndInvalidSegments().getValidSegments(); + .getValidAndInvalidSegments(table.isChildTable()).getValidSegments(); String uniqueId = String.valueOf(System.currentTimeMillis()); List<String> tobeUpdatedSegs = new ArrayList<>(); List<String> tobeDeletedSegs = new ArrayList<>(); diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index 90ff520..3b7a800 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -122,7 +122,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier, readCommittedScope.getConfiguration()); SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = segmentStatusManager - .getValidAndInvalidSegments(loadMetadataDetails, this.readCommittedScope); + .getValidAndInvalidSegments(carbonTable.isChildTable(), loadMetadataDetails, + this.readCommittedScope); // to check whether only streaming segments access is enabled or not, // if access streaming segment is true then data will be read from streaming segments @@ -523,7 +524,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { table, loadMetadataDetails); SegmentStatusManager.ValidAndInvalidSegmentsInfo allSegments = new SegmentStatusManager(identifier, readCommittedScope.getConfiguration()) - .getValidAndInvalidSegments(loadMetadataDetails, readCommittedScope); + .getValidAndInvalidSegments(table.isChildTable(), loadMetadataDetails, + readCommittedScope); Map<String, Long> blockRowCountMapping = new HashMap<>(); Map<String, Long> segmentAndBlockCountMapping = new HashMap<>(); diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java index f6031d1..f75ba48 100644 --- a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java +++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java @@ -135,8 +135,8 @@ public class IndexDataMapProvider extends DataMapProvider { return dataMapFactory.supportRebuild(); } - @Override - public boolean rebuildInternal(String newLoadName, Map<String, List<String>> segmentMap) { + @Override public boolean rebuildInternal(String newLoadName, Map<String, List<String>> segmentMap, + CarbonTable carbonTable) { return false; } } diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java index 72390ce..12da32f 100644 --- a/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java +++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java @@ -112,8 +112,8 @@ public class PreAggregateDataMapProvider extends DataMapProvider { return false; } - @Override - public boolean rebuildInternal(String newLoadName, Map<String, List<String>> segmentMap) { + @Override public boolean rebuildInternal(String newLoadName, Map<String, List<String>> segmentMap, + CarbonTable carbonTable) { return false; } } diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala index a35de58..31d1390 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala @@ -79,7 +79,8 @@ object IndexDataMapRebuildRDD { ): Unit = { val tableIdentifier = carbonTable.getAbsoluteTableIdentifier val segmentStatusManager = new SegmentStatusManager(tableIdentifier) - val validAndInvalidSegments = segmentStatusManager.getValidAndInvalidSegments() + val validAndInvalidSegments = segmentStatusManager + .getValidAndInvalidSegments(carbonTable.isChildTable) val validSegments = validAndInvalidSegments.getValidSegments val indexedCarbonColumns = carbonTable.getIndexedColumns(schema) val operationContext = new OperationContext() diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 2e5f0ea..53e2eba 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -518,6 +518,7 @@ object CarbonDataRDDFactory { val newEntryLoadStatus = if (carbonLoadModel.isCarbonTransactionalTable && !carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap && + !carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildTable && !CarbonLoaderUtil.isValidSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)) { LOGGER.warn("Cannot write load metadata file as there is no data to load") SegmentStatus.MARKED_FOR_DELETE diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala index 7844f28..967f390 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala @@ -85,7 +85,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging { .getTableName }") val validSegments: mutable.Buffer[Segment] = CarbonDataMergerUtil.getValidSegmentList( - carbonMainTable.getAbsoluteTableIdentifier).asScala + carbonMainTable.getAbsoluteTableIdentifier, carbonMainTable.isChildTable).asScala val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]() validSegments.foreach { segment => validSegmentIds += segment.getSegmentNo diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala index 8975027..0565e20 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala @@ -51,7 +51,7 @@ object CacheUtil { if (carbonTable.isTransactionalTable) { val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier val validAndInvalidSegmentsInfo = new SegmentStatusManager(absoluteTableIdentifier) - .getValidAndInvalidSegments() + .getValidAndInvalidSegments(carbonTable.isChildTable) // Fire a job to clear the invalid segments cached in the executors. if (CarbonProperties.getInstance().isDistributedPruningEnabled(carbonTable.getDatabaseName, carbonTable.getTableName)) { @@ -111,7 +111,7 @@ object CacheUtil { def getBloomCacheKeys(carbonTable: CarbonTable, datamap: DataMapSchema): List[String] = { val segments = CarbonDataMergerUtil - .getValidSegmentList(carbonTable.getAbsoluteTableIdentifier).asScala + .getValidSegmentList(carbonTable.getAbsoluteTableIdentifier, carbonTable.isChildTable).asScala // Generate shard Path for the datamap val shardPaths = segments.flatMap { diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCacheEventListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCacheEventListeners.scala index e695f3a..c2181f2 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCacheEventListeners.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCacheEventListeners.scala @@ -109,7 +109,8 @@ object DropCacheBloomEventListener extends OperationEventListener { val datamaps = DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable) .asScala.toList val segments = CarbonDataMergerUtil - .getValidSegmentList(carbonTable.getAbsoluteTableIdentifier).asScala.toList + .getValidSegmentList(carbonTable.getAbsoluteTableIdentifier, carbonTable.isChildTable) + .asScala.toList datamaps.foreach { case datamap if datamap.getProviderName diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala index 30cd3ef..b44cf79 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.command.{Checker, DataCommand} import org.apache.spark.sql.types.StringType import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.datamap.{DataMapStoreManager, DataMapUtil} import org.apache.carbondata.core.datamap.status.{DataMapSegmentStatusUtil, DataMapStatus, DataMapStatusManager} import org.apache.carbondata.core.metadata.schema.datamap.{DataMapClassProvider, DataMapProperty} import org.apache.carbondata.core.metadata.schema.table.DataMapSchema @@ -125,7 +125,8 @@ case class CarbonDataMapShowCommand(tableIdentifier: Option[TableIdentifier]) val iterator = segmentMaps.entrySet().iterator() while (iterator.hasNext) { val entry = iterator.next() - syncInfoMap.put(entry.getKey, entry.getValue.get(entry.getValue.size() - 1)) + + syncInfoMap.put(entry.getKey, DataMapUtil.getMaxSegmentID(entry.getValue)) } val loadEndTime = if (loadMetadataDetails(i).getLoadEndTime == diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala index 6224d0d..fb20e4f 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala @@ -63,7 +63,8 @@ object HorizontalCompaction { val deleteTimeStamp = updateTimeStamp + 1 // get the valid segments - var segLists = CarbonDataMergerUtil.getValidSegmentList(absTableIdentifier) + var segLists = CarbonDataMergerUtil + .getValidSegmentList(absTableIdentifier, carbonTable.isChildTable) if (segLists == null || segLists.size() == 0) { return @@ -91,7 +92,8 @@ object HorizontalCompaction { // After Update Compaction perform delete compaction compactionTypeIUD = CompactionType.IUD_DELETE_DELTA - segLists = CarbonDataMergerUtil.getValidSegmentList(absTableIdentifier) + segLists = CarbonDataMergerUtil + .getValidSegmentList(absTableIdentifier, carbonTable.isChildTable) if (segLists == null || segLists.size() == 0) { return } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala index 44e51a1..d6abf86 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala @@ -173,7 +173,7 @@ case class CarbonAlterTableDropHivePartitionCommand( "" } val segments = new SegmentStatusManager(table.getAbsoluteTableIdentifier) - .getValidAndInvalidSegments.getValidSegments + .getValidAndInvalidSegments(table.isChildTable).getValidSegments // First drop the partitions from partition mapper files of each segment val tuples = new CarbonDropPartitionRDD(sparkSession, table.getTablePath, diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala index b40bb6d..566e44e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala @@ -203,7 +203,8 @@ case class CarbonAlterTableDropPartitionCommand( val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier val segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier) - val validSegments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala + val validSegments = segmentStatusManager.getValidAndInvalidSegments(carbonTable.isChildTable) + .getValidSegments.asScala val threadArray: Array[Thread] = new Array[Thread](validSegments.size) var i = 0 for (segmentId: Segment <- validSegments) { diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala index 4d32d00..72c3142 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala @@ -221,7 +221,8 @@ case class CarbonAlterTableSplitPartitionCommand( val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier val segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier) - val validSegments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala + val validSegments = segmentStatusManager.getValidAndInvalidSegments(carbonTable.isChildTable) + .getValidSegments.asScala val threadArray: Array[SplitThread] = new Array[SplitThread](validSegments.size) var i = 0 validSegments.foreach { segmentId => diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala index ad3eb72..04999a8 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala @@ -180,7 +180,7 @@ case class CarbonRelation( carbonTable.getAbsoluteTableIdentifier) if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) { if (new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier) - .getValidAndInvalidSegments.getValidSegments.isEmpty) { + .getValidAndInvalidSegments(carbonTable.isChildTable).getValidSegments.isEmpty) { sizeInBytesLocalValue = 0L } else { val tablePath = carbonTable.getTablePath @@ -188,7 +188,7 @@ case class CarbonRelation( if (FileFactory.isFileExist(tablePath, fileType)) { // get the valid segments val segments = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier) - .getValidAndInvalidSegments.getValidSegments.asScala + .getValidAndInvalidSegments(carbonTable.isChildTable).getValidSegments.asScala var size = 0L // for each segment calculate the size segments.foreach { validSeg => diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/MergeIndexUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/MergeIndexUtil.scala index 85a22cc..d2d39e4 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/MergeIndexUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/MergeIndexUtil.scala @@ -73,7 +73,7 @@ object MergeIndexUtil { mergedLoads: util.List[String]): Unit = { // get only the valid segments of the table val validSegments: mutable.Buffer[Segment] = CarbonDataMergerUtil.getValidSegmentList( - carbonTable.getAbsoluteTableIdentifier).asScala + carbonTable.getAbsoluteTableIdentifier, carbonTable.isChildTable).asScala val mergedSegmentIds = new util.ArrayList[String]() mergedLoads.asScala.foreach(mergedLoad => { val loadName = mergedLoad diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java index 1d51592..f0abf44 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java @@ -953,13 +953,14 @@ public final class CarbonDataMergerUtil { * @param absoluteTableIdentifier * @return */ - public static List<Segment> getValidSegmentList(AbsoluteTableIdentifier absoluteTableIdentifier) + public static List<Segment> getValidSegmentList(AbsoluteTableIdentifier absoluteTableIdentifier, + Boolean isChildTable) throws IOException { SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments = null; try { - validAndInvalidSegments = - new SegmentStatusManager(absoluteTableIdentifier).getValidAndInvalidSegments(); + validAndInvalidSegments = new SegmentStatusManager(absoluteTableIdentifier) + .getValidAndInvalidSegments(isChildTable); } catch (IOException e) { LOGGER.error("Error while getting valid segment list for a table identifier"); throw new IOException();