This is an automated email from the ASF dual-hosted git repository. ravipesala pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 6fa7fb4 [CARBONDATA-3350] Enhance custom compaction to resort old single segment by new sort_columns 6fa7fb4 is described below commit 6fa7fb4f94ca3082113d0b47b109bdd16cf046a3 Author: QiangCai <qiang...@qq.com> AuthorDate: Wed May 15 16:46:20 2019 +0800 [CARBONDATA-3350] Enhance custom compaction to resort old single segment by new sort_columns This closes #3202 --- .../blockletindex/BlockletDataMapFactory.java | 2 +- .../TableStatusReadCommittedScope.java | 2 +- .../spark/rdd/CarbonTableCompactor.scala | 21 +++- .../processing/merger/CarbonCompactionUtil.java | 132 +++++++++++++++++---- 4 files changed, 128 insertions(+), 29 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java index 446507f..cab1b8b 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java @@ -167,7 +167,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory return dataMaps; } - private Set<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(Segment segment) + public Set<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(Segment segment) throws IOException { Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = segmentMap.get(segment.getSegmentNo()); diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java index 5622efe..e4fd6f4 100644 --- a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java +++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java @@ -55,7 +55,7 @@ public class TableStatusReadCommittedScope implements ReadCommittedScope { } public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier, - LoadMetadataDetails[] loadMetadataDetails, Configuration configuration) throws IOException { + LoadMetadataDetails[] loadMetadataDetails, Configuration configuration) { this.identifier = identifier; this.configuration = configuration; this.loadMetadataDetails = loadMetadataDetails; diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala index afe2927..4c7dd95 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala @@ -29,13 +29,15 @@ import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCa import org.apache.spark.util.MergeIndexUtil import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.constants.SortScopeOptions.SortScope import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment} +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.SegmentFileStore import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events._ import org.apache.carbondata.processing.loading.model.CarbonLoadModel -import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType} +import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType} import org.apache.carbondata.spark.MergeResultImpl /** @@ -50,6 +52,21 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, operationContext: OperationContext) extends Compactor(carbonLoadModel, compactionModel, executor, sqlContext, storeLocation) { + private def needSortSingleSegment( + loadsToMerge: java.util.List[LoadMetadataDetails]): Boolean = { + // support to resort old segment with old sort_columns + if (CompactionType.CUSTOM == compactionModel.compactionType && + loadsToMerge.size() == 1 && + SortScope.NO_SORT != compactionModel.carbonTable.getSortScope) { + !CarbonCompactionUtil.isSortedByCurrentSortColumns( + carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable, + loadsToMerge.get(0), + FileFactory.getConfiguration) + } else { + false + } + } + override def executeCompaction(): Unit = { val sortedSegments: util.List[LoadMetadataDetails] = new util.ArrayList[LoadMetadataDetails]( carbonLoadModel.getLoadMetadataDetails @@ -58,7 +75,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, var loadsToMerge = identifySegmentsToBeMerged() - while (loadsToMerge.size() > 1 || + while (loadsToMerge.size() > 1 || needSortSingleSegment(loadsToMerge) || (CompactionType.IUD_UPDDEL_DELTA == compactionModel.compactionType && loadsToMerge.size() > 0)) { val lastSegment = sortedSegments.get(sortedSegments.size() - 1) diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java index c3017a7..8cf477e 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java @@ -24,11 +24,14 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.carbondata.common.Strings; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.block.TableBlockInfo; import org.apache.carbondata.core.datastore.block.TaskBlockInfo; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; @@ -49,13 +52,16 @@ import org.apache.carbondata.core.scan.expression.conditional.GreaterThanExpress import org.apache.carbondata.core.scan.expression.conditional.LessThanEqualToExpression; import org.apache.carbondata.core.scan.expression.logical.AndExpression; import org.apache.carbondata.core.scan.expression.logical.OrExpression; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.format.IndexHeader; import org.apache.carbondata.hadoop.CarbonInputSplit; import org.apache.commons.lang3.ArrayUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.log4j.Logger; /** @@ -612,6 +618,37 @@ public class CarbonCompactionUtil { return taskIdSet.size(); } + private static boolean compareSortColumns(CarbonTable table, List<ColumnSchema> fileColumns) { + // When sort_columns is modified, it will be consider as no_sort also. + List<CarbonDimension> sortColumnsOfSegment = new ArrayList<>(); + for (ColumnSchema column : fileColumns) { + if (column.isDimensionColumn() && column.isSortColumn()) { + sortColumnsOfSegment.add(new CarbonDimension(column, -1, -1, -1)); + } + } + if (sortColumnsOfSegment.size() < table.getNumberOfSortColumns()) { + return false; + } + List<CarbonDimension> sortColumnsOfTable = new ArrayList<>(); + for (CarbonDimension dimension : table.getDimensions()) { + if (dimension.isSortColumn()) { + sortColumnsOfTable.add(dimension); + } + } + int sortColumnNums = sortColumnsOfTable.size(); + if (sortColumnsOfSegment.size() < sortColumnNums) { + return false; + } + // compare sort_columns + for (int i = 0; i < sortColumnNums; i++) { + if (!RestructureUtil.isColumnMatches(table.isTransactionalTable(), sortColumnsOfTable.get(i), + sortColumnsOfSegment.get(i))) { + return false; + } + } + return true; + } + /** * Returns if the DataFileFooter containing carbondata file contains * sorted data or not. @@ -622,37 +659,82 @@ public class CarbonCompactionUtil { */ public static boolean isSortedByCurrentSortColumns(CarbonTable table, DataFileFooter footer) { if (footer.isSorted()) { - // When sort_columns is modified, it will be consider as no_sort also. - List<CarbonDimension> sortColumnsOfSegment = new ArrayList<>(); - for (ColumnSchema column : footer.getColumnInTable()) { - if (column.isDimensionColumn() && column.isSortColumn()) { - sortColumnsOfSegment.add(new CarbonDimension(column, -1, -1, -1)); + return compareSortColumns(table, footer.getColumnInTable()); + } else { + return false; + } + } + + public static boolean isSortedByCurrentSortColumns( + CarbonTable table, LoadMetadataDetails load, Configuration hadoopConf) { + List<String> sortColumnList = table.getSortColumns(); + if (sortColumnList.isEmpty()) { + return false; + } + // table sort_columns + String sortColumns = Strings.mkString( + sortColumnList.toArray(new String[sortColumnList.size()]), ","); + String segmentPath = + CarbonTablePath.getSegmentPath(table.getTablePath(), load.getLoadName()); + // segment sort_columns + String segmentSortColumns = getSortColumnsOfSegment(segmentPath); + if (segmentSortColumns == null) { + return false; + } else { + return segmentSortColumns.equalsIgnoreCase(sortColumns); + } + } + + private static String mkSortColumnsString( + List<org.apache.carbondata.format.ColumnSchema> columnList) { + StringBuilder builder = new StringBuilder(); + for (org.apache.carbondata.format.ColumnSchema column : columnList) { + if (column.isDimension()) { + Map<String, String> properties = column.getColumnProperties(); + if (properties != null) { + if (properties.get(CarbonCommonConstants.SORT_COLUMNS) != null) { + builder.append(column.column_name).append(","); + } } } - if (sortColumnsOfSegment.size() < table.getNumberOfSortColumns()) { - return false; - } - List<CarbonDimension> sortColumnsOfTable = new ArrayList<>(); - for (CarbonDimension dimension : table.getDimensions()) { - if (dimension.isSortColumn()) { - sortColumnsOfTable.add(dimension); + } + if (builder.length() > 1) { + return builder.substring(0, builder.length() - 1); + } else { + return null; + } + } + + public static String getSortColumnsOfSegment(String segmentFolder) { + CarbonFile[] files = SegmentIndexFileStore.getCarbonIndexFiles( + segmentFolder, FileFactory.getConfiguration()); + Set<Boolean> isSortSet = new HashSet<>(); + Set<String> sortColumnsSet = new HashSet<>(); + if (files != null) { + for (CarbonFile file : files) { + IndexHeader indexHeader = SegmentIndexFileStore.readIndexHeader( + file.getCanonicalPath(), FileFactory.getConfiguration()); + if (indexHeader != null) { + if (indexHeader.isSetIs_sort()) { + isSortSet.add(indexHeader.is_sort); + if (indexHeader.is_sort) { + sortColumnsSet.add(mkSortColumnsString(indexHeader.getTable_columns())); + } + } else { + // if is_sort is not set, it will be old store and consider as local_sort by default. + sortColumnsSet.add(mkSortColumnsString(indexHeader.getTable_columns())); + } } - } - int sortColumnNums = sortColumnsOfTable.size(); - if (sortColumnsOfSegment.size() < sortColumnNums) { - return false; - } - // compare sort_columns - for (int i = 0; i < sortColumnNums; i++) { - if (!RestructureUtil - .isColumnMatches(table.isTransactionalTable(), sortColumnsOfTable.get(i), - sortColumnsOfSegment.get(i))) { - return false; + if (isSortSet.size() >= 2 || sortColumnsSet.size() >= 2) { + break; } } - return true; + } + // for all index files, sort_columns should be same + if (isSortSet.size() <= 1 && sortColumnsSet.size() == 1) { + return sortColumnsSet.iterator().next(); } else { - return false; + return null; } }