This is an automated email from the ASF dual-hosted git repository. liyang pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit d13199db7407552226a58060ceb9b41a7f859c36 Author: Pengfei Zhan <dethr...@gmail.com> AuthorDate: Thu Jun 29 15:28:35 2023 +0800 KYLIN-5744 Answering snapshot first, then answering by metadata Hitting snapshot has a higher priority than answering min/max with metadata. Change the default behavior of routing query to metadata to true 1. Fix timezone problem. Given the fact of min(cal_dt) = '2012-01-01', the result of `select min(cal_dt) from test_kylin_fact` may give '2011-12-31' rather than '2012-01-01'. 2. Fix min/max query hit a index without any dimensions. For example: `select min(cal_dt) as min_cal_dt from test_kylin_fact ` hits index just has a measure of min(cal_dt), the SparkPlan is wrong: ` project --- project --- tableScan `; but the desired SparkPlan is: ` aggregate --- project --- tableScan `. --- .../org/apache/kylin/common/KylinConfigBase.java | 2 +- .../kylin/query/engine/QueryExecWithMetaTest.java | 2 +- src/query-common/pom.xml | 4 ++ .../kylin/query/relnode/KapAggregateRel.java | 58 +++++++++------------- .../apache/kylin/query/relnode/OLAPContext.java | 24 +++++---- .../org/apache/spark/sql/SparderTypeUtil.scala | 2 +- 6 files changed, 46 insertions(+), 46 deletions(-) diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 03c1c06e89..cbd82ed108 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1734,7 +1734,7 @@ public abstract class KylinConfigBase implements Serializable { // ============================================================================ public boolean isRouteToMetadataEnabled() { - return Boolean.parseBoolean(this.getOptional("kylin.query.using-metadata-answer-minmax-of-dimension", FALSE)); + return Boolean.parseBoolean(this.getOptional("kylin.query.using-metadata-answer-minmax-of-dimension", TRUE)); } public boolean partialMatchNonEquiJoins() { diff --git a/src/kylin-it/src/test/java/org/apache/kylin/query/engine/QueryExecWithMetaTest.java b/src/kylin-it/src/test/java/org/apache/kylin/query/engine/QueryExecWithMetaTest.java index ac84de6f06..94d98eb5ea 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/query/engine/QueryExecWithMetaTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/query/engine/QueryExecWithMetaTest.java @@ -230,7 +230,7 @@ public class QueryExecWithMetaTest extends NLocalWithSparkSessionTest { List<String> result = iterator.next(); Assert.assertEquals("2147483648,21474836483289,2132,2147483647,-128,127,0,9,0.0,10000.0," + "0.3255242,85208.3241,10.0000,201.3235,abc,xyz,aaaaaaaa,xxxxxxxxxxxxxxxxxxxxx,abcd,zzzz," - + "2000-12-31,2004-04-16,2004-04-01 00:00:00,2004-04-17 00:32:23.032,false,true," + + "2001-01-01,2004-04-17,2004-04-01 00:00:00,2004-04-17 00:32:23.032,false,true," + "null,null,null,null,null,null,null,null,null,null,null,null", String.join(",", result)); } diff --git a/src/query-common/pom.xml b/src/query-common/pom.xml index d6af343c34..16af9a8028 100644 --- a/src/query-common/pom.xml +++ b/src/query-common/pom.xml @@ -55,6 +55,10 @@ <groupId>org.apache.kylin</groupId> <artifactId>kap-second-storage-core</artifactId> </dependency> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-spark-common</artifactId> + </dependency> </dependencies> <build> <plugins> diff --git a/src/query-common/src/main/java/org/apache/kylin/query/relnode/KapAggregateRel.java b/src/query-common/src/main/java/org/apache/kylin/query/relnode/KapAggregateRel.java index f44272beb5..5a064dd840 100644 --- a/src/query-common/src/main/java/org/apache/kylin/query/relnode/KapAggregateRel.java +++ b/src/query-common/src/main/java/org/apache/kylin/query/relnode/KapAggregateRel.java @@ -45,6 +45,9 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.kylin.common.KapConfig; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; +import org.apache.kylin.guava30.shaded.common.collect.ImmutableList; +import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.kylin.guava30.shaded.common.collect.Sets; import org.apache.kylin.measure.corr.CorrMeasureType; import org.apache.kylin.metadata.cube.cuboid.NLayoutCandidate; import org.apache.kylin.metadata.cube.model.NDataflow; @@ -55,10 +58,6 @@ import org.apache.kylin.metadata.model.PartitionDesc; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.query.util.ICutContextStrategy; -import org.apache.kylin.guava30.shaded.common.collect.ImmutableList; -import org.apache.kylin.guava30.shaded.common.collect.Lists; -import org.apache.kylin.guava30.shaded.common.collect.Sets; - /** * */ @@ -69,7 +68,7 @@ public class KapAggregateRel extends OLAPAggregateRel implements KapRel { private ImmutableList<Integer> rewriteGroupKeys; // preserve the ordering of group keys after CC replacement private List<ImmutableBitSet> rewriteGroupSets; // group sets with cc replaced List<AggregateCall> aggregateCalls; - private Set<TblColRef> groupByInnerColumns = new HashSet<>(); // inner columns in group keys, for CC generation + private final Set<TblColRef> groupByInnerColumns = new HashSet<>(); // inner columns in group keys, for CC generation private Set<OLAPContext> subContexts = Sets.newHashSet(); public KapAggregateRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, boolean indicator, @@ -203,8 +202,8 @@ public class KapAggregateRel extends OLAPAggregateRel implements KapRel { TblColRef originalColumn = inputColumnRowType.getColumnByIndex(i); if (null != this.context && this.context.getGroupCCColRewriteMapping().containsKey(originalColumn)) { groups.add(this.context.getGroupCCColRewriteMapping().get(originalColumn)); - groupKeys.add(inputColumnRowType - .getIndexByName(this.context.getGroupCCColRewriteMapping().get(originalColumn).getName())); + String colName = this.context.getGroupCCColRewriteMapping().get(originalColumn).getName(); + groupKeys.add(inputColumnRowType.getIndexByName(colName)); } else { Set<TblColRef> sourceColumns = inputColumnRowType.getSourceColumnsByIndex(i); groups.addAll(sourceColumns); @@ -283,17 +282,13 @@ public class KapAggregateRel extends OLAPAggregateRel implements KapRel { this.rewriteAggCalls = new ArrayList<>(aggregateCalls.size()); for (int i = 0; i < this.aggregateCalls.size(); i++) { AggregateCall aggCall = this.aggregateCalls.get(i); - if (SqlStdOperatorTable.GROUPING == aggCall.getAggregation()) { + if (SqlStdOperatorTable.GROUPING == aggCall.getAggregation() + || this.aggregations.get(i).isAggregateOnConstant()) { this.rewriteAggCalls.add(aggCall); continue; } - FunctionDesc cubeFunc = this.aggregations.get(i); - if (cubeFunc.isAggregateOnConstant()) { - this.rewriteAggCalls.add(aggCall); - continue; - } - aggCall = rewriteAggCall(aggCall, cubeFunc); + aggCall = rewriteAggCall(aggCall, this.aggregations.get(i)); this.rewriteAggCalls.add(aggCall); } getContext().setExactlyAggregate(isExactlyMatched()); @@ -329,8 +324,9 @@ public class KapAggregateRel extends OLAPAggregateRel implements KapRel { return false; } - if (!checkAggCall()) + if (!checkAggCall()) { return false; + } Set<String> cuboidDimSet = new HashSet<>(); if (getContext() != null && getContext().storageContext.getCandidate() != null) { cuboidDimSet = getContext().storageContext.getCandidate().getLayoutEntity().getOrderedDimensions().values() @@ -343,6 +339,9 @@ public class KapAggregateRel extends OLAPAggregateRel implements KapRel { OLAPRel.logger.info("cuboid dimensions: {}", cuboidDimSet); boolean isDimensionMatch = isDimExactlyMatch(groupByCols, cuboidDimSet); + if (KylinConfig.getInstanceFromEnv().isRouteToMetadataEnabled()) { + isDimensionMatch = isDimensionMatch && !groupByCols.isEmpty(); + } if (!isDimensionMatch) { return false; } @@ -359,17 +358,15 @@ public class KapAggregateRel extends OLAPAggregateRel implements KapRel { && dataflow.getQueryableSegments().get(0).getMultiPartitions().size() <= 1; } - private Boolean checkAggCall() { + private boolean checkAggCall() { for (AggregateCall call : getRewriteAggCalls()) { if (!supportedFunction.contains(OLAPAggregateRel.getAggrFuncName(call))) { return false; } // bitmap uuid is fine with exactly matched cube as what we need to query // from the cube is exactly the binary bitmap - if (OLAPAggregateRel.getAggrFuncName(call).equals("BITMAP_UUID")) { - continue; - } - if (OLAPAggregateRel.getAggrFuncName(call).equals(FunctionDesc.FUNC_BITMAP_BUILD)) { + if (OLAPAggregateRel.getAggrFuncName(call).equals("BITMAP_UUID") + || OLAPAggregateRel.getAggrFuncName(call).equals(FunctionDesc.FUNC_BITMAP_BUILD)) { continue; } if (call instanceof KylinAggregateCall) { @@ -417,11 +414,6 @@ public class KapAggregateRel extends OLAPAggregateRel implements KapRel { this.subContexts = contexts; } - @Override - protected void buildRewriteFieldsAndMetricsColumns() { - super.buildRewriteFieldsAndMetricsColumns(); - } - /** * optimize its Context Rel after context cut off according some rules * 1. push through the Agg Above Join Rel @@ -430,7 +422,7 @@ public class KapAggregateRel extends OLAPAggregateRel implements KapRel { // case 1: Agg push through Join if (context == null) { for (OLAPContext subContext : subContexts) { - if (subContext.aggregations.size() > 0) + if (!subContext.aggregations.isEmpty()) continue; if (ContextUtil.qualifiedForAggInfoPushDown(this, subContext)) { subContext.setTopNode(this); @@ -444,16 +436,14 @@ public class KapAggregateRel extends OLAPAggregateRel implements KapRel { if (this.context == null) return; - for (TblColRef colRef : this.context.getGroupByColumns()) { - if (!colRef.getName().startsWith("_KY_") && context.belongToContextTables(colRef)) - this.context.allColumns.add(colRef); - } + this.context.getGroupByColumns().stream() + .filter(colRef -> !colRef.getName().startsWith("_KY_") && context.belongToContextTables(colRef)) + .forEach(colRef -> this.context.allColumns.add(colRef)); if (!(getInput() instanceof KapProjectRel)) { - for (TblColRef colRef : ((KapRel) getInput()).getColumnRowType().getAllColumns()) { - if (context.belongToContextTables(colRef) && !colRef.getName().startsWith("_KY_")) - context.allColumns.add(colRef); - } + ((KapRel) getInput()).getColumnRowType().getAllColumns().stream() + .filter(colRef -> context.belongToContextTables(colRef) && !colRef.getName().startsWith("_KY_")) + .forEach(colRef -> context.allColumns.add(colRef)); return; } diff --git a/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java b/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java index eadb698c06..5835ab21b1 100644 --- a/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java +++ b/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java @@ -34,6 +34,7 @@ import org.apache.calcite.rel.AbstractRelNode; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexNode; @@ -47,6 +48,7 @@ import org.apache.kylin.metadata.cube.cuboid.NLayoutCandidate; import org.apache.kylin.metadata.cube.model.DimensionRangeInfo; import org.apache.kylin.metadata.cube.model.NDataSegment; import org.apache.kylin.metadata.cube.model.NDataflow; +import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.JoinDesc; import org.apache.kylin.metadata.model.MeasureDesc; @@ -60,11 +62,12 @@ import org.apache.kylin.metadata.query.QueryMetrics; import org.apache.kylin.metadata.realization.HybridRealization; import org.apache.kylin.metadata.realization.IRealization; import org.apache.kylin.metadata.realization.SQLDigest; -import org.apache.kylin.metadata.tuple.Tuple; import org.apache.kylin.metadata.tuple.TupleInfo; import org.apache.kylin.query.routing.RealizationCheck; import org.apache.kylin.query.schema.OLAPSchema; +import org.apache.kylin.query.schema.OLAPTable; import org.apache.kylin.storage.StorageContext; +import org.apache.spark.sql.util.SparderTypeUtil; import org.apache.logging.log4j.util.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -590,9 +593,6 @@ public class OLAPContext { public String genExecFunc(OLAPRel rel, String tableName) { setReturnTupleInfo(rel.getRowType(), rel.getColumnRowType()); - if (canMinMaxDimAnsweredByMetadata(rel)) { - return "executeMetadataQuery"; - } if (isConstantQueryWithAggregations()) { return "executeSimpleAggregationQuery"; @@ -603,6 +603,10 @@ public class OLAPContext { return "executeLookupTableQuery"; } + if (canMinMaxDimAnsweredByMetadata(rel)) { + return "executeMetadataQuery"; + } + return "executeOLAPQuery"; } @@ -681,6 +685,7 @@ public class OLAPContext { List<TblColRef> colRefs = tableScan.getColumnRowType().getAllColumns(); allFields.addAll(colRefs); }); + RelDataTypeFactory typeFactory = this.getTopNode().getCluster().getTypeFactory(); List<Object[]> result = new ArrayList<>(); for (NDataSegment segment : ((NDataflow) realization).getSegments()) { if (segment.getStatus() != SegmentStatusEnum.READY) { @@ -690,16 +695,17 @@ public class OLAPContext { Object[] minList = new Object[allFields.size()]; Object[] maxList = new Object[allFields.size()]; for (TblColRef col : cols) { - String dataType = col.getColumnDesc().getUpgradedType().getName(); int colId = allFields.indexOf(col); String tblColRefIndex = getTblColRefIndex(col, realization); - DimensionRangeInfo dimensionRangeInfo = infoMap.get(tblColRefIndex); - if (dimensionRangeInfo == null) { + DimensionRangeInfo rangeInfo = infoMap.get(tblColRefIndex); + if (rangeInfo == null) { minList[colId] = null; maxList[colId] = null; } else { - minList[colId] = Tuple.convertOptiqCellValue(dimensionRangeInfo.getMin(), dataType); - maxList[colId] = Tuple.convertOptiqCellValue(dimensionRangeInfo.getMax(), dataType); + ColumnDesc c = col.getColumnDesc(); + RelDataType sqlType = OLAPTable.createSqlType(typeFactory, c.getUpgradedType(), c.isNullable()); + minList[colId] = SparderTypeUtil.convertToStringWithCalciteType(rangeInfo.getMin(), sqlType, false); + maxList[colId] = SparderTypeUtil.convertToStringWithCalciteType(rangeInfo.getMax(), sqlType, false); } } diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparderTypeUtil.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparderTypeUtil.scala index 79d4aaf451..eeda408931 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparderTypeUtil.scala +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparderTypeUtil.scala @@ -129,7 +129,7 @@ object SparderTypeUtil extends Logging { case "decimal" => new java.math.BigDecimal(s.toString) case "date" => new java.sql.Date(DateFormat.stringToMillis(s.toString)) case "time" | "timestamp" | "datetime" => - val l = java.lang.Long.parseLong(s.toString) + val l = DateFormat.stringToMillis(s.toString) Timestamp.valueOf(DateFormat.castTimestampToString(l)) case "tinyint" => s.toString.toByte case "smallint" => s.toString.toShort