This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit d13199db7407552226a58060ceb9b41a7f859c36
Author: Pengfei Zhan <dethr...@gmail.com>
AuthorDate: Thu Jun 29 15:28:35 2023 +0800

    KYLIN-5744 Answering snapshot first, then answering by metadata
    
    Hitting snapshot has a higher priority than
    answering min/max with metadata.
    
    Change the default behavior of routing query to metadata to true
    
    1. Fix timezone problem.
    Given the fact of min(cal_dt) = '2012-01-01',
    the result of `select min(cal_dt)  from test_kylin_fact`
    may give '2011-12-31' rather than '2012-01-01'.
    
    2. Fix min/max query hit a index without any dimensions.
    For example: `select min(cal_dt) as min_cal_dt from test_kylin_fact `
    hits index just has a measure of min(cal_dt),
    the SparkPlan is wrong: ` project --- project --- tableScan `;
    but the desired SparkPlan is: ` aggregate --- project --- tableScan `.
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  2 +-
 .../kylin/query/engine/QueryExecWithMetaTest.java  |  2 +-
 src/query-common/pom.xml                           |  4 ++
 .../kylin/query/relnode/KapAggregateRel.java       | 58 +++++++++-------------
 .../apache/kylin/query/relnode/OLAPContext.java    | 24 +++++----
 .../org/apache/spark/sql/SparderTypeUtil.scala     |  2 +-
 6 files changed, 46 insertions(+), 46 deletions(-)

diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 03c1c06e89..cbd82ed108 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1734,7 +1734,7 @@ public abstract class KylinConfigBase implements 
Serializable {
     // 
============================================================================
 
     public boolean isRouteToMetadataEnabled() {
-        return 
Boolean.parseBoolean(this.getOptional("kylin.query.using-metadata-answer-minmax-of-dimension",
 FALSE));
+        return 
Boolean.parseBoolean(this.getOptional("kylin.query.using-metadata-answer-minmax-of-dimension",
 TRUE));
     }
 
     public boolean partialMatchNonEquiJoins() {
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/query/engine/QueryExecWithMetaTest.java
 
b/src/kylin-it/src/test/java/org/apache/kylin/query/engine/QueryExecWithMetaTest.java
index ac84de6f06..94d98eb5ea 100644
--- 
a/src/kylin-it/src/test/java/org/apache/kylin/query/engine/QueryExecWithMetaTest.java
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/query/engine/QueryExecWithMetaTest.java
@@ -230,7 +230,7 @@ public class QueryExecWithMetaTest extends 
NLocalWithSparkSessionTest {
         List<String> result = iterator.next();
         
Assert.assertEquals("2147483648,21474836483289,2132,2147483647,-128,127,0,9,0.0,10000.0,"
                 + 
"0.3255242,85208.3241,10.0000,201.3235,abc,xyz,aaaaaaaa,xxxxxxxxxxxxxxxxxxxxx,abcd,zzzz,"
-                + "2000-12-31,2004-04-16,2004-04-01 00:00:00,2004-04-17 
00:32:23.032,false,true,"
+                + "2001-01-01,2004-04-17,2004-04-01 00:00:00,2004-04-17 
00:32:23.032,false,true,"
                 + 
"null,null,null,null,null,null,null,null,null,null,null,null", String.join(",", 
result));
     }
 
diff --git a/src/query-common/pom.xml b/src/query-common/pom.xml
index d6af343c34..16af9a8028 100644
--- a/src/query-common/pom.xml
+++ b/src/query-common/pom.xml
@@ -55,6 +55,10 @@
             <groupId>org.apache.kylin</groupId>
             <artifactId>kap-second-storage-core</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-spark-common</artifactId>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/relnode/KapAggregateRel.java
 
b/src/query-common/src/main/java/org/apache/kylin/query/relnode/KapAggregateRel.java
index f44272beb5..5a064dd840 100644
--- 
a/src/query-common/src/main/java/org/apache/kylin/query/relnode/KapAggregateRel.java
+++ 
b/src/query-common/src/main/java/org/apache/kylin/query/relnode/KapAggregateRel.java
@@ -45,6 +45,9 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.kylin.common.KapConfig;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.guava30.shaded.common.collect.ImmutableList;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
+import org.apache.kylin.guava30.shaded.common.collect.Sets;
 import org.apache.kylin.measure.corr.CorrMeasureType;
 import org.apache.kylin.metadata.cube.cuboid.NLayoutCandidate;
 import org.apache.kylin.metadata.cube.model.NDataflow;
@@ -55,10 +58,6 @@ import org.apache.kylin.metadata.model.PartitionDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.query.util.ICutContextStrategy;
 
-import org.apache.kylin.guava30.shaded.common.collect.ImmutableList;
-import org.apache.kylin.guava30.shaded.common.collect.Lists;
-import org.apache.kylin.guava30.shaded.common.collect.Sets;
-
 /**
  *
  */
@@ -69,7 +68,7 @@ public class KapAggregateRel extends OLAPAggregateRel 
implements KapRel {
     private ImmutableList<Integer> rewriteGroupKeys; // preserve the ordering 
of group keys after CC replacement
     private List<ImmutableBitSet> rewriteGroupSets; // group sets with cc 
replaced
     List<AggregateCall> aggregateCalls;
-    private Set<TblColRef> groupByInnerColumns = new HashSet<>(); // inner 
columns in group keys, for CC generation
+    private final Set<TblColRef> groupByInnerColumns = new HashSet<>(); // 
inner columns in group keys, for CC generation
     private Set<OLAPContext> subContexts = Sets.newHashSet();
 
     public KapAggregateRel(RelOptCluster cluster, RelTraitSet traits, RelNode 
child, boolean indicator,
@@ -203,8 +202,8 @@ public class KapAggregateRel extends OLAPAggregateRel 
implements KapRel {
             TblColRef originalColumn = inputColumnRowType.getColumnByIndex(i);
             if (null != this.context && 
this.context.getGroupCCColRewriteMapping().containsKey(originalColumn)) {
                 
groups.add(this.context.getGroupCCColRewriteMapping().get(originalColumn));
-                groupKeys.add(inputColumnRowType
-                        
.getIndexByName(this.context.getGroupCCColRewriteMapping().get(originalColumn).getName()));
+                String colName = 
this.context.getGroupCCColRewriteMapping().get(originalColumn).getName();
+                groupKeys.add(inputColumnRowType.getIndexByName(colName));
             } else {
                 Set<TblColRef> sourceColumns = 
inputColumnRowType.getSourceColumnsByIndex(i);
                 groups.addAll(sourceColumns);
@@ -283,17 +282,13 @@ public class KapAggregateRel extends OLAPAggregateRel 
implements KapRel {
             this.rewriteAggCalls = new ArrayList<>(aggregateCalls.size());
             for (int i = 0; i < this.aggregateCalls.size(); i++) {
                 AggregateCall aggCall = this.aggregateCalls.get(i);
-                if (SqlStdOperatorTable.GROUPING == aggCall.getAggregation()) {
+                if (SqlStdOperatorTable.GROUPING == aggCall.getAggregation()
+                        || this.aggregations.get(i).isAggregateOnConstant()) {
                     this.rewriteAggCalls.add(aggCall);
                     continue;
                 }
 
-                FunctionDesc cubeFunc = this.aggregations.get(i);
-                if (cubeFunc.isAggregateOnConstant()) {
-                    this.rewriteAggCalls.add(aggCall);
-                    continue;
-                }
-                aggCall = rewriteAggCall(aggCall, cubeFunc);
+                aggCall = rewriteAggCall(aggCall, this.aggregations.get(i));
                 this.rewriteAggCalls.add(aggCall);
             }
             getContext().setExactlyAggregate(isExactlyMatched());
@@ -329,8 +324,9 @@ public class KapAggregateRel extends OLAPAggregateRel 
implements KapRel {
             return false;
         }
 
-        if (!checkAggCall())
+        if (!checkAggCall()) {
             return false;
+        }
         Set<String> cuboidDimSet = new HashSet<>();
         if (getContext() != null && getContext().storageContext.getCandidate() 
!= null) {
             cuboidDimSet = 
getContext().storageContext.getCandidate().getLayoutEntity().getOrderedDimensions().values()
@@ -343,6 +339,9 @@ public class KapAggregateRel extends OLAPAggregateRel 
implements KapRel {
         OLAPRel.logger.info("cuboid dimensions: {}", cuboidDimSet);
 
         boolean isDimensionMatch = isDimExactlyMatch(groupByCols, 
cuboidDimSet);
+        if (KylinConfig.getInstanceFromEnv().isRouteToMetadataEnabled()) {
+            isDimensionMatch = isDimensionMatch && !groupByCols.isEmpty();
+        }
         if (!isDimensionMatch) {
             return false;
         }
@@ -359,17 +358,15 @@ public class KapAggregateRel extends OLAPAggregateRel 
implements KapRel {
                 && 
dataflow.getQueryableSegments().get(0).getMultiPartitions().size() <= 1;
     }
 
-    private Boolean checkAggCall() {
+    private boolean checkAggCall() {
         for (AggregateCall call : getRewriteAggCalls()) {
             if 
(!supportedFunction.contains(OLAPAggregateRel.getAggrFuncName(call))) {
                 return false;
             }
             // bitmap uuid is fine with exactly matched cube as what we need 
to query
             // from the cube is exactly the binary bitmap
-            if (OLAPAggregateRel.getAggrFuncName(call).equals("BITMAP_UUID")) {
-                continue;
-            }
-            if 
(OLAPAggregateRel.getAggrFuncName(call).equals(FunctionDesc.FUNC_BITMAP_BUILD)) 
{
+            if (OLAPAggregateRel.getAggrFuncName(call).equals("BITMAP_UUID")
+                    || 
OLAPAggregateRel.getAggrFuncName(call).equals(FunctionDesc.FUNC_BITMAP_BUILD)) {
                 continue;
             }
             if (call instanceof KylinAggregateCall) {
@@ -417,11 +414,6 @@ public class KapAggregateRel extends OLAPAggregateRel 
implements KapRel {
         this.subContexts = contexts;
     }
 
-    @Override
-    protected void buildRewriteFieldsAndMetricsColumns() {
-        super.buildRewriteFieldsAndMetricsColumns();
-    }
-
     /**
      * optimize its Context Rel after context cut off according some rules
      * 1. push through the Agg Above Join Rel
@@ -430,7 +422,7 @@ public class KapAggregateRel extends OLAPAggregateRel 
implements KapRel {
         // case 1: Agg push through Join
         if (context == null) {
             for (OLAPContext subContext : subContexts) {
-                if (subContext.aggregations.size() > 0)
+                if (!subContext.aggregations.isEmpty())
                     continue;
                 if (ContextUtil.qualifiedForAggInfoPushDown(this, subContext)) 
{
                     subContext.setTopNode(this);
@@ -444,16 +436,14 @@ public class KapAggregateRel extends OLAPAggregateRel 
implements KapRel {
         if (this.context == null)
             return;
 
-        for (TblColRef colRef : this.context.getGroupByColumns()) {
-            if (!colRef.getName().startsWith("_KY_") && 
context.belongToContextTables(colRef))
-                this.context.allColumns.add(colRef);
-        }
+        this.context.getGroupByColumns().stream()
+                .filter(colRef -> !colRef.getName().startsWith("_KY_") && 
context.belongToContextTables(colRef))
+                .forEach(colRef -> this.context.allColumns.add(colRef));
 
         if (!(getInput() instanceof KapProjectRel)) {
-            for (TblColRef colRef : ((KapRel) 
getInput()).getColumnRowType().getAllColumns()) {
-                if (context.belongToContextTables(colRef) && 
!colRef.getName().startsWith("_KY_"))
-                    context.allColumns.add(colRef);
-            }
+            ((KapRel) getInput()).getColumnRowType().getAllColumns().stream()
+                    .filter(colRef -> context.belongToContextTables(colRef) && 
!colRef.getName().startsWith("_KY_"))
+                    .forEach(colRef -> context.allColumns.add(colRef));
             return;
         }
 
diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
 
b/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
index eadb698c06..5835ab21b1 100644
--- 
a/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
+++ 
b/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
@@ -34,6 +34,7 @@ import org.apache.calcite.rel.AbstractRelNode;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
@@ -47,6 +48,7 @@ import org.apache.kylin.metadata.cube.cuboid.NLayoutCandidate;
 import org.apache.kylin.metadata.cube.model.DimensionRangeInfo;
 import org.apache.kylin.metadata.cube.model.NDataSegment;
 import org.apache.kylin.metadata.cube.model.NDataflow;
+import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.JoinDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
@@ -60,11 +62,12 @@ import org.apache.kylin.metadata.query.QueryMetrics;
 import org.apache.kylin.metadata.realization.HybridRealization;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.tuple.Tuple;
 import org.apache.kylin.metadata.tuple.TupleInfo;
 import org.apache.kylin.query.routing.RealizationCheck;
 import org.apache.kylin.query.schema.OLAPSchema;
+import org.apache.kylin.query.schema.OLAPTable;
 import org.apache.kylin.storage.StorageContext;
+import org.apache.spark.sql.util.SparderTypeUtil;
 import org.apache.logging.log4j.util.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -590,9 +593,6 @@ public class OLAPContext {
 
     public String genExecFunc(OLAPRel rel, String tableName) {
         setReturnTupleInfo(rel.getRowType(), rel.getColumnRowType());
-        if (canMinMaxDimAnsweredByMetadata(rel)) {
-            return "executeMetadataQuery";
-        }
 
         if (isConstantQueryWithAggregations()) {
             return "executeSimpleAggregationQuery";
@@ -603,6 +603,10 @@ public class OLAPContext {
             return "executeLookupTableQuery";
         }
 
+        if (canMinMaxDimAnsweredByMetadata(rel)) {
+            return "executeMetadataQuery";
+        }
+
         return "executeOLAPQuery";
     }
 
@@ -681,6 +685,7 @@ public class OLAPContext {
             List<TblColRef> colRefs = 
tableScan.getColumnRowType().getAllColumns();
             allFields.addAll(colRefs);
         });
+        RelDataTypeFactory typeFactory = 
this.getTopNode().getCluster().getTypeFactory();
         List<Object[]> result = new ArrayList<>();
         for (NDataSegment segment : ((NDataflow) realization).getSegments()) {
             if (segment.getStatus() != SegmentStatusEnum.READY) {
@@ -690,16 +695,17 @@ public class OLAPContext {
             Object[] minList = new Object[allFields.size()];
             Object[] maxList = new Object[allFields.size()];
             for (TblColRef col : cols) {
-                String dataType = 
col.getColumnDesc().getUpgradedType().getName();
                 int colId = allFields.indexOf(col);
                 String tblColRefIndex = getTblColRefIndex(col, realization);
-                DimensionRangeInfo dimensionRangeInfo = 
infoMap.get(tblColRefIndex);
-                if (dimensionRangeInfo == null) {
+                DimensionRangeInfo rangeInfo = infoMap.get(tblColRefIndex);
+                if (rangeInfo == null) {
                     minList[colId] = null;
                     maxList[colId] = null;
                 } else {
-                    minList[colId] = 
Tuple.convertOptiqCellValue(dimensionRangeInfo.getMin(), dataType);
-                    maxList[colId] = 
Tuple.convertOptiqCellValue(dimensionRangeInfo.getMax(), dataType);
+                    ColumnDesc c = col.getColumnDesc();
+                    RelDataType sqlType = OLAPTable.createSqlType(typeFactory, 
c.getUpgradedType(), c.isNullable());
+                    minList[colId] = 
SparderTypeUtil.convertToStringWithCalciteType(rangeInfo.getMin(), sqlType, 
false);
+                    maxList[colId] = 
SparderTypeUtil.convertToStringWithCalciteType(rangeInfo.getMax(), sqlType, 
false);
                 }
             }
 
diff --git 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparderTypeUtil.scala
 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparderTypeUtil.scala
index 79d4aaf451..eeda408931 100644
--- 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparderTypeUtil.scala
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparderTypeUtil.scala
@@ -129,7 +129,7 @@ object SparderTypeUtil extends Logging {
         case "decimal" => new java.math.BigDecimal(s.toString)
         case "date" => new java.sql.Date(DateFormat.stringToMillis(s.toString))
         case "time" | "timestamp" | "datetime" =>
-          val l = java.lang.Long.parseLong(s.toString)
+          val l = DateFormat.stringToMillis(s.toString)
           Timestamp.valueOf(DateFormat.castTimestampToString(l))
         case "tinyint" => s.toString.toByte
         case "smallint" => s.toString.toShort

Reply via email to