This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit a5875f29c1d168d447d41f68622ed78045ea133f Author: songzhxlh-max <59271430+songzhxlh-...@users.noreply.github.com> AuthorDate: Sat Oct 8 09:53:54 2022 +0800 KYLIN-5324 tableindex answer select start --- .../org/apache/kylin/common/KylinConfigBase.java | 11 ++++++-- .../metadata/cube/cuboid/NQueryLayoutChooser.java | 33 ++++++++++++++-------- .../metadata/cube/cuboid/TableIndexMatcher.java | 24 ++++++++++++++-- .../kylin/metadata/cube/model/IndexPlan.java | 22 +++++++++++++-- .../kylin/metadata/cube/model/NDataflow.java | 4 +++ .../org/apache/kylin/query/schema/OLAPTable.java | 32 +++++++++++++++++++-- .../apache/kylin/query/util/RuntimeHelper.scala | 17 +++++++++-- 7 files changed, 118 insertions(+), 25 deletions(-) diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 5fe045c115..dec30004b8 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -617,7 +617,7 @@ public abstract class KylinConfigBase implements Serializable { String uuid = RandomUtil.randomUUIDStr().toUpperCase(Locale.ROOT).substring(0, 6); String packageName = DIAG_ID_PREFIX + new SimpleDateFormat("yyyy_MM_dd_HH_mm_ss", Locale.getDefault(Locale.Category.FORMAT)) - .format(new Date()) + .format(new Date()) + "_" + uuid; String workDir = KylinConfigBase.getKylinHomeWithoutWarn(); String diagPath = "diag_dump/" + packageName; @@ -2036,7 +2036,7 @@ public abstract class KylinConfigBase implements Serializable { String value = getOptional("kylin.query.table-detect-transformers"); return value == null ? new String[] { POWER_BI_CONVERTER, "org.apache.kylin.query.util.DefaultQueryTransformer", - "org.apache.kylin.query.util.EscapeTransformer" } + "org.apache.kylin.query.util.EscapeTransformer" } : getOptionalStringArray("kylin.query.table-detect-transformers", new String[0]); } @@ -2973,7 +2973,7 @@ public abstract class KylinConfigBase implements Serializable { } private double getConfigItemDoubleValue(String configItem, double defaultDoubleValue, double rangeStart, - double rangeEnd) { + double rangeEnd) { double resultValue = defaultDoubleValue; try { resultValue = Integer.parseInt(getOptional(configItem, String.valueOf(defaultDoubleValue))); @@ -3632,6 +3632,11 @@ public abstract class KylinConfigBase implements Serializable { return Boolean.parseBoolean(getOptional("kylin.build.resource.skip-resource-check", FALSE)); } + public boolean useTableIndexAnswerSelectStarEnabled() { + return Boolean.parseBoolean(getOptional("kylin.query.use-tableindex-answer-select-star.enabled", FALSE)); + } + + public int getSecondStorageSkippingIndexGranularity() { int granularity = Integer.parseInt(getOptional("kylin.second-storage.skipping-index.granularity", "3")); return granularity <= 0 ? 3 : granularity; diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NQueryLayoutChooser.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NQueryLayoutChooser.java index 5af671f842..0b32223e33 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NQueryLayoutChooser.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NQueryLayoutChooser.java @@ -67,7 +67,8 @@ public class NQueryLayoutChooser { List<NDataSegment> toRemovedSegments = Lists.newArrayList(); for (NDataSegment segment : prunedSegments) { if (candidate == null) { - candidate = selectLayoutCandidate(dataflow, Lists.newArrayList(segment), sqlDigest, secondStorageSegmentLayoutMap); + candidate = selectLayoutCandidate(dataflow, Lists.newArrayList(segment), sqlDigest, + secondStorageSegmentLayoutMap); if (candidate == null) { toRemovedSegments.add(segment); } @@ -87,8 +88,7 @@ public class NQueryLayoutChooser { return NLayoutCandidate.EMPTY; } List<NLayoutCandidate> candidates = new ArrayList<>(); - val commonLayouts = getLayoutsFromSegments(prunedSegments, dataflow, - secondStorageSegmentLayoutMap); + val commonLayouts = getLayoutsFromSegments(prunedSegments, dataflow, secondStorageSegmentLayoutMap); val model = dataflow.getModel(); log.info("Matching dataflow with seg num: {} layout num: {}", prunedSegments.size(), commonLayouts.size()); KylinConfig config = KylinConfig.getInstanceFromEnv(); @@ -108,6 +108,9 @@ public class NQueryLayoutChooser { if (!aggIndexMatcher.valid() && !tableIndexMatcher.valid()) { return null; } + val projectInstance = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv()) + .getProject(dataflow.getProject()); + double influenceFactor = 1.0; for (NDataLayout dataLayout : commonLayouts) { log.trace("Matching layout {}", dataLayout); CapabilityResult tempResult = new CapabilityResult(); @@ -119,6 +122,8 @@ public class NQueryLayoutChooser { var matchResult = tableIndexMatcher.match(layout); if (!matchResult.isMatched()) { matchResult = aggIndexMatcher.match(layout); + } else if (projectInstance.getConfig().useTableIndexAnswerSelectStarEnabled()) { + influenceFactor += influenceFactor + tableIndexMatcher.getLayoutUnmatchedColsSize(); } if (!matchResult.isMatched()) { log.trace("Matching failed"); @@ -127,7 +132,7 @@ public class NQueryLayoutChooser { NLayoutCandidate candidate = new NLayoutCandidate(layout); tempResult.influences = matchResult.getInfluences(); - candidate.setCost(dataLayout.getRows() * (tempResult.influences.size() + 1.0)); + candidate.setCost(dataLayout.getRows() * (tempResult.influences.size() + influenceFactor)); if (!matchResult.getNeedDerive().isEmpty()) { candidate.setDerivedToHostMap(matchResult.getNeedDerive()); candidate.setDerivedTableSnapshots(candidate.getDerivedToHostMap().keySet().stream() @@ -151,7 +156,7 @@ public class NQueryLayoutChooser { } private static Collection<NDataLayout> getLayoutsFromSegments(List<NDataSegment> segments, NDataflow dataflow, - Map<String, Set<Long>> secondStorageSegmentLayoutMap) { + Map<String, Set<Long>> secondStorageSegmentLayoutMap) { KylinConfig config = KylinConfig.getInstanceFromEnv(); val projectInstance = NProjectManager.getInstance(config).getProject(dataflow.getProject()); if (!projectInstance.getConfig().isHeterogeneousSegmentEnabled()) { @@ -166,10 +171,13 @@ public class NQueryLayoutChooser { for (int i = 0; i < segments.size(); i++) { val dataSegment = segments.get(i); var layoutIdMapToDataLayout = dataSegment.getLayoutsMap(); - if (SegmentOnlineMode.ANY.toString().equalsIgnoreCase(projectInstance.getConfig().getKylinEngineSegmentOnlineMode()) + if (SegmentOnlineMode.ANY.toString() + .equalsIgnoreCase(projectInstance.getConfig().getKylinEngineSegmentOnlineMode()) && MapUtils.isNotEmpty(secondStorageSegmentLayoutMap)) { - Set<Long> chLayouts = secondStorageSegmentLayoutMap.getOrDefault(dataSegment.getId(), Sets.newHashSet()); - Map<Long, NDataLayout> nDataLayoutMap = chLayouts.stream().map(id -> NDataLayout.newDataLayout(dataflow, dataSegment.getId(), id)) + Set<Long> chLayouts = secondStorageSegmentLayoutMap.getOrDefault(dataSegment.getId(), + Sets.newHashSet()); + Map<Long, NDataLayout> nDataLayoutMap = chLayouts.stream() + .map(id -> NDataLayout.newDataLayout(dataflow, dataSegment.getId(), id)) .collect(Collectors.toMap(NDataLayout::getLayoutId, nDataLayout -> nDataLayout)); nDataLayoutMap.putAll(layoutIdMapToDataLayout); @@ -202,8 +210,7 @@ public class NQueryLayoutChooser { .collect(Collectors.toList()); Ordering<NLayoutCandidate> ordering = Ordering // - .from(priorityLayoutComparator()).compound(derivedLayoutComparator()) - .compound(rowSizeComparator()) // L1 comparator, compare cuboid rows + .from(priorityLayoutComparator()).compound(derivedLayoutComparator()).compound(rowSizeComparator()) // L1 comparator, compare cuboid rows .compound(filterColumnComparator(filterColIds, chooserContext)) // L2 comparator, order filter columns .compound(dimensionSizeComparator()) // the lower dimension the best .compound(measureSizeComparator()) // L3 comparator, order size of cuboid columns @@ -216,9 +223,11 @@ public class NQueryLayoutChooser { if (!KylinConfig.getInstanceFromEnv().isPreferAggIndex()) { return 0; } - if (!layoutCandidate1.getLayoutEntity().getIndex().isTableIndex() && layoutCandidate2.getLayoutEntity().getIndex().isTableIndex()) { + if (!layoutCandidate1.getLayoutEntity().getIndex().isTableIndex() + && layoutCandidate2.getLayoutEntity().getIndex().isTableIndex()) { return -1; - } else if (layoutCandidate1.getLayoutEntity().getIndex().isTableIndex() && !layoutCandidate2.getLayoutEntity().getIndex().isTableIndex()) { + } else if (layoutCandidate1.getLayoutEntity().getIndex().isTableIndex() + && !layoutCandidate2.getLayoutEntity().getIndex().isTableIndex()) { return 1; } return 0; diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/TableIndexMatcher.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/TableIndexMatcher.java index 9253df0b73..f3eaee1b36 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/TableIndexMatcher.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/TableIndexMatcher.java @@ -23,11 +23,16 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.metadata.cube.model.IndexEntity; +import org.apache.kylin.metadata.cube.model.LayoutEntity; +import org.apache.kylin.metadata.cube.model.NDataflow; +import org.apache.kylin.metadata.cube.model.NDataflowManager; import org.apache.kylin.metadata.model.DeriveInfo; +import org.apache.kylin.metadata.project.NProjectManager; +import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.realization.CapabilityResult; import org.apache.kylin.metadata.realization.SQLDigest; -import org.apache.kylin.metadata.cube.model.IndexEntity; -import org.apache.kylin.metadata.cube.model.LayoutEntity; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -41,12 +46,14 @@ public class TableIndexMatcher extends IndexMatcher { private final boolean isUseTableIndexAnswerNonRawQuery; private Set<Integer> sqlColumns; private final boolean valid; + private int layoutUnmatchedColsSize; public TableIndexMatcher(SQLDigest sqlDigest, ChooserContext chooserContext, Set<String> excludedTables, boolean isUseTableIndexAnswerNonRawQuery) { super(sqlDigest, chooserContext, excludedTables); this.isUseTableIndexAnswerNonRawQuery = isUseTableIndexAnswerNonRawQuery; valid = init(); + this.layoutUnmatchedColsSize = 0; } private boolean init() { @@ -72,6 +79,15 @@ public class TableIndexMatcher extends IndexMatcher { unmatchedCols.removeAll(layout.getStreamingColumns().keySet()); } unmatchedCols.removeAll(layout.getOrderedDimensions().keySet()); + ProjectInstance projectInstance = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv()) + .getProject(model.getProject()); + if (projectInstance.getConfig().useTableIndexAnswerSelectStarEnabled()) { + layoutUnmatchedColsSize = unmatchedCols.size(); + NDataflowManager dataflowManager = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), + model.getProject()); + NDataflow dataflow = dataflowManager.getDataflow(layout.getModel().getId()); + unmatchedCols.removeAll(dataflow.getAllColumnsIndex()); + } goThruDerivedDims(layout.getIndex(), needDerive, unmatchedCols); if (!unmatchedCols.isEmpty()) { if (log.isDebugEnabled()) { @@ -88,4 +104,8 @@ public class TableIndexMatcher extends IndexMatcher { boolean isUseTableIndex = isUseTableIndexAnswerNonRawQuery && !nonSupportFunTableIndex(sqlDigest.aggregations); return index.isTableIndex() && (sqlDigest.isRawQuery || isUseTableIndex); } + + public int getLayoutUnmatchedColsSize() { + return layoutUnmatchedColsSize; + } } diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/IndexPlan.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/IndexPlan.java index 1df0411e62..c004d53135 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/IndexPlan.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/IndexPlan.java @@ -29,6 +29,7 @@ import java.util.BitSet; import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; @@ -50,12 +51,12 @@ import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.model.IEngineAware; import org.apache.kylin.metadata.model.JoinTableDesc; -import org.apache.kylin.metadata.model.SegmentStatusEnum; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.model.NDataModel; import org.apache.kylin.metadata.model.NDataModelManager; +import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.project.NProjectManager; +import org.apache.kylin.metadata.project.ProjectInstance; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; @@ -65,6 +66,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -159,6 +161,8 @@ public class IndexPlan extends RootPersistentEntity implements Serializable, IEn private final LinkedHashSet<TblColRef> allColumns = Sets.newLinkedHashSet(); + private Set<Integer> allColumnsIndex = new HashSet<>(); + private List<LayoutEntity> ruleBasedLayouts = Lists.newArrayList(); @Setter @Getter @@ -276,6 +280,14 @@ public class IndexPlan extends RootPersistentEntity implements Serializable, IEn //all lookup tables are automatically derived allColumns.addAll(join.getTableRef().getColumns()); } + initAllColumnsIndex(); + } + + private void initAllColumnsIndex() { + Map<TblColRef, Integer> tblColMap = Maps.newHashMap(); + ImmutableBiMap<Integer, TblColRef> effectiveCols = getModel().getEffectiveCols(); + effectiveCols.forEach((key, value) -> tblColMap.put(value, key)); + allColumnsIndex = allColumns.stream().map(tblColMap::get).collect(Collectors.toSet()); } private void initDictionaryDesc() { @@ -367,6 +379,10 @@ public class IndexPlan extends RootPersistentEntity implements Serializable, IEn return allColumns; } + public Set<Integer> listAllTblColRefsIndex() { + return allColumnsIndex; + } + private void addLayout2TargetIndex(LayoutEntity sourceLayout, IndexEntity targetIndex) { addLayout2TargetIndex(sourceLayout, targetIndex, false); } diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflow.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflow.java index 58e3c3abb2..c72b4842e5 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflow.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflow.java @@ -247,6 +247,10 @@ public class NDataflow extends RootPersistentEntity implements Serializable, IRe return getIndexPlan().listAllTblColRefs(); } + public Set<Integer> getAllColumnsIndex() { + return getIndexPlan().listAllTblColRefsIndex(); + } + @Override public List<TblColRef> getAllDimensions() { return Lists.newArrayList(getIndexPlan().getEffectiveDimCols().values()); diff --git a/src/query-common/src/main/java/org/apache/kylin/query/schema/OLAPTable.java b/src/query-common/src/main/java/org/apache/kylin/query/schema/OLAPTable.java index dfd9368873..96e8fa02f4 100644 --- a/src/query-common/src/main/java/org/apache/kylin/query/schema/OLAPTable.java +++ b/src/query-common/src/main/java/org/apache/kylin/query/schema/OLAPTable.java @@ -74,6 +74,8 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.util.CollectionUtil; import org.apache.kylin.measure.topn.TopNMeasureType; +import org.apache.kylin.metadata.cube.model.NDataflow; +import org.apache.kylin.metadata.cube.model.NDataflowManager; import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.ComputedColumnDesc; @@ -83,6 +85,8 @@ import org.apache.kylin.metadata.model.NDataModel; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.util.ComputedColumnUtil; import org.apache.kylin.metadata.project.NProjectManager; +import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.apache.kylin.query.QueryExtension; import org.apache.kylin.query.enumerator.OLAPQuery; import org.apache.kylin.query.relnode.OLAPTableScan; @@ -268,10 +272,34 @@ public class OLAPTable extends AbstractQueryableTable implements TranslatableTab } private List<ColumnDesc> listTableColumnsIncludingCC() { - val allColumns = Lists.newArrayList(sourceTable.getColumns()); + List<ColumnDesc> allColumns = Lists.newArrayList(sourceTable.getColumns()); - if (!modelsMap.containsKey(sourceTable.getIdentity())) + if (!modelsMap.containsKey(sourceTable.getIdentity())) { return allColumns; + } + + ProjectInstance projectInstance = NProjectManager.getInstance(olapSchema.getConfig()) + .getProject(sourceTable.getProject()); + NDataflowManager dataflowManager = NDataflowManager.getInstance(olapSchema.getConfig(), + sourceTable.getProject()); + if (projectInstance.getConfig().useTableIndexAnswerSelectStarEnabled()) { + Set<ColumnDesc> exposeColumnDescSet = new HashSet<>(); + String tableName = sourceTable.getIdentity(); + List<NDataModel> modelList = modelsMap.get(tableName); + for (NDataModel dataModel : modelList) { + NDataflow dataflow = dataflowManager.getDataflow(dataModel.getId()); + if (dataflow.getStatus() == RealizationStatusEnum.ONLINE) { + dataflow.getAllColumns().forEach(tblColRef -> { + if (tblColRef.getTable().equalsIgnoreCase(tableName)) { + exposeColumnDescSet.add(tblColRef.getColumnDesc()); + } + }); + } + } + if (!exposeColumnDescSet.isEmpty()) { + allColumns = Lists.newArrayList(exposeColumnDescSet); + } + } val authorizedCC = getAuthorizedCC(); if (CollectionUtils.isNotEmpty(authorizedCC)) { diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/util/RuntimeHelper.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/util/RuntimeHelper.scala index 4fc0c31cff..3e6d0b1933 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/util/RuntimeHelper.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/util/RuntimeHelper.scala @@ -18,11 +18,12 @@ package org.apache.kylin.query.util +import org.apache.kylin.common.KylinConfig import org.apache.kylin.common.util.ImmutableBitSet -import org.apache.kylin.query.util.SparderDerivedUtil import org.apache.kylin.metadata.datatype.DataType import org.apache.kylin.metadata.model.DeriveInfo.DeriveType import org.apache.kylin.metadata.model.TblColRef +import org.apache.kylin.metadata.project.NProjectManager import org.apache.spark.internal.Logging import org.apache.spark.sql.Column import org.apache.spark.sql.catalyst.expressions.Literal @@ -35,8 +36,9 @@ import scala.collection.JavaConverters._ import scala.collection.mutable // scalastyle:off -object RuntimeHelper extends Logging { +object RuntimeHelper extends Logging { + final val literalZero = new Column(Literal(0, DataTypes.IntegerType)) final val literalOne = new Column(Literal(1, DataTypes.IntegerType)) final val literalTs = new Column(Literal(null, DataTypes.TimestampType)) final val literalString = new Column(Literal(null, DataTypes.StringType)) @@ -95,6 +97,8 @@ object RuntimeHelper extends Logging { }.toMap } + val projectInstance = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv) + .getProject(derivedUtil.model.getProject) // may have multi TopN measures. val topNIndexs = sourceSchema.fields.map(_.dataType).zipWithIndex.filter(_._1.isInstanceOf[ArrayType]) allColumns.indices @@ -116,6 +120,13 @@ object RuntimeHelper extends Logging { if (hasTopN && topNIndexs.map(_._2).contains(gTInfoIndex)) { // topn measure will be erase when calling inline literalOne.as(s"${factTableName}_${columnName}") + } else if (projectInstance.getConfig.useTableIndexAnswerSelectStarEnabled() + && gTInfoIndex < 0) { + if (column.getColumnDesc.getType.isNumberFamily) { + literalZero.as(s"${factTableName}_${columnName}") + } else { + literalString.as(s"${factTableName}_${columnName}") + } } else if (primaryKey.get(gTInfoIndex)) { // primary key col(gTInfoNames.apply(gTInfoIndex)) @@ -132,7 +143,7 @@ object RuntimeHelper extends Logging { } } else if (deriveMap.contains(index)) { deriveMap.apply(index) - } else if( DataType.DATETIME_FAMILY.contains(column.getType.getName)) { + } else if (DataType.DATETIME_FAMILY.contains(column.getType.getName)) { // https://github.com/Kyligence/KAP/issues/14561 literalTs.as(s"${factTableName}_${columnName}") } else if (DataType.STRING_FAMILY.contains(column.getType.getName)) {