This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin5 by this push:
     new 4093d3f0ac KYLIN-5681 fix scd2 modeling
4093d3f0ac is described below

commit 4093d3f0acb4b29788e20248f4d78eff321a7f17
Author: Pengfei Zhan <dethr...@gmail.com>
AuthorDate: Wed Aug 9 12:45:25 2023 +0800

    KYLIN-5681 fix scd2 modeling
---
 src/modeling-service/pom.xml                       |  4 +
 .../kylin/rest/service/ModelSemanticHelper.java    | 86 +++++++++++++---------
 .../kylin/rest/service/ModelSmartSupporter.java    | 27 -------
 .../org/apache/kylin/query/engine/QueryExec.java   | 25 ++++++-
 .../kylin/query/util/QueryContextCutter.java       |  9 +++
 5 files changed, 87 insertions(+), 64 deletions(-)

diff --git a/src/modeling-service/pom.xml b/src/modeling-service/pom.xml
index 838a96531c..f152bdcd3b 100644
--- a/src/modeling-service/pom.xml
+++ b/src/modeling-service/pom.xml
@@ -37,6 +37,10 @@
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-common-service</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-query</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-datasource-service</artifactId>
diff --git 
a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java
 
b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java
index 0c8435af9a..0badc8532a 100644
--- 
a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java
+++ 
b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiConsumer;
@@ -43,6 +44,7 @@ import org.apache.calcite.sql.util.SqlVisitor;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.exception.CommonErrorCode;
 import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.exception.QueryErrorCode;
@@ -53,6 +55,11 @@ import 
org.apache.kylin.common.util.ModifyTableNameSqlVisitor;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.RandomUtil;
 import org.apache.kylin.engine.spark.utils.ComputedColumnEvalUtil;
+import org.apache.kylin.guava30.shaded.common.base.Throwables;
+import org.apache.kylin.guava30.shaded.common.collect.ImmutableBiMap;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
+import org.apache.kylin.guava30.shaded.common.collect.Maps;
+import org.apache.kylin.guava30.shaded.common.collect.Sets;
 import org.apache.kylin.job.manager.JobManager;
 import org.apache.kylin.job.model.JobParam;
 import org.apache.kylin.metadata.cube.cuboid.NAggregationGroup;
@@ -91,10 +98,13 @@ import 
org.apache.kylin.metadata.model.util.ExpandableMeasureUtil;
 import org.apache.kylin.metadata.model.util.scd2.SCD2CondChecker;
 import org.apache.kylin.metadata.model.util.scd2.SCD2Exception;
 import org.apache.kylin.metadata.model.util.scd2.SCD2NonEquiCondSimplification;
+import org.apache.kylin.metadata.model.util.scd2.SCD2SqlConverter;
 import org.apache.kylin.metadata.model.util.scd2.SimplifiedJoinDesc;
 import org.apache.kylin.metadata.model.util.scd2.SimplifiedJoinTableDesc;
 import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.recommendation.ref.OptRecManagerV2;
+import org.apache.kylin.query.engine.QueryExec;
+import org.apache.kylin.query.relnode.OLAPContext;
 import org.apache.kylin.query.util.PushDownUtil;
 import org.apache.kylin.query.util.QueryUtil;
 import org.apache.kylin.rest.request.ModelRequest;
@@ -106,18 +116,10 @@ import org.apache.kylin.rest.util.SpringContext;
 import org.apache.kylin.source.SourceFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import org.apache.kylin.guava30.shaded.common.base.Throwables;
-import org.apache.kylin.guava30.shaded.common.collect.ImmutableBiMap;
-import org.apache.kylin.guava30.shaded.common.collect.Lists;
-import org.apache.kylin.guava30.shaded.common.collect.Maps;
-import org.apache.kylin.guava30.shaded.common.collect.Sets;
-
 import io.kyligence.kap.secondstorage.SecondStorageUpdater;
 import io.kyligence.kap.secondstorage.SecondStorageUtil;
-import lombok.Setter;
 import lombok.val;
 import lombok.var;
 import lombok.extern.slf4j.Slf4j;
@@ -126,10 +128,6 @@ import lombok.extern.slf4j.Slf4j;
 @Service
 public class ModelSemanticHelper extends BasicService {
 
-    @Setter
-    @Autowired(required = false)
-    private ModelSmartSupporter modelSmartSupporter;
-
     private static final Logger logger = 
LoggerFactory.getLogger(ModelSemanticHelper.class);
     private final ExpandableMeasureUtil expandableMeasureUtil = new 
ExpandableMeasureUtil((model, ccDesc) -> {
         String ccExpression = PushDownUtil.massageComputedColumn(model, 
model.getProject(), ccDesc,
@@ -257,11 +255,13 @@ public class ModelSemanticHelper extends BasicService {
 
         HashSet<JoinDescNonEquiCompBean> scd2NonEquiCondSets = new HashSet<>();
 
-        val projectKylinConfig = 
getManager(NProjectManager.class).getProject(dataModel.getProject()).getConfig();
+        String project = dataModel.getProject();
+        val projectKylinConfig = NProjectManager.getProjectConfig(project);
         boolean isScd2Enabled = 
projectKylinConfig.isQueryNonEquiJoinModelEnabled();
-
+        
QueryContext.current().setAclInfo(AclPermissionUtil.createAclInfo(project, 
getCurrentUserGroups()));
+        QueryExec queryExec = new QueryExec(project, projectKylinConfig, 
false);
         for (int i = 0; i < requestJoinTableDescs.size(); i++) {
-            final JoinDesc modelJoinDesc = 
dataModel.getJoinTables().get(i).getJoin();
+            final JoinDesc joinWithoutNonEquivInfo = 
dataModel.getJoinTables().get(i).getJoin();
             final SimplifiedJoinDesc requestJoinDesc = 
requestJoinTableDescs.get(i).getSimplifiedJoinDesc();
 
             if 
(CollectionUtils.isEmpty(requestJoinDesc.getSimplifiedNonEquiJoinConditions())) 
{
@@ -277,41 +277,42 @@ public class ModelSemanticHelper extends BasicService {
             checkRequestNonEquiJoinConds(requestJoinDesc);
 
             //3. suggest nonEquiModel
-            final JoinDesc suggModelJoin = 
modelSmartSupporter.suggNonEquiJoinModel(projectKylinConfig,
-                    dataModel.getProject(), modelJoinDesc, requestJoinDesc);
+            String scd2Sql = 
SCD2SqlConverter.INSTANCE.genSCD2SqlStr(joinWithoutNonEquivInfo,
+                    requestJoinDesc.getSimplifiedNonEquiJoinConditions());
+            final JoinDesc analyzedJoin = deriveJoins(queryExec, scd2Sql);
             // restore table alias in non-equi conditions
             final NonEquiJoinCondition nonEquiCondWithAliasRestored = new 
NonEquiJoinConditionVisitor() {
                 @Override
                 public NonEquiJoinCondition visitColumn(NonEquiJoinCondition 
cond) {
                     TableRef originalTableRef;
                     if (cond.getColRef().getTableRef().getTableIdentity()
-                            
.equals(modelJoinDesc.getPKSide().getTableIdentity())) {
-                        originalTableRef = modelJoinDesc.getPKSide();
+                            
.equals(joinWithoutNonEquivInfo.getPKSide().getTableIdentity())) {
+                        originalTableRef = joinWithoutNonEquivInfo.getPKSide();
                     } else {
-                        originalTableRef = modelJoinDesc.getFKSide();
+                        originalTableRef = joinWithoutNonEquivInfo.getFKSide();
                     }
 
                     return new 
NonEquiJoinCondition(originalTableRef.getColumn(cond.getColRef().getName()),
                             cond.getDataType());
                 }
-            }.visit(suggModelJoin.getNonEquiJoinCondition());
-            
suggModelJoin.setNonEquiJoinCondition(nonEquiCondWithAliasRestored);
-            String expr = suggModelJoin.getNonEquiJoinCondition().getExpr();
-            expr = expr.replaceAll(suggModelJoin.getPKSide().getAlias(), 
modelJoinDesc.getPKSide().getAlias());
-            expr = expr.replaceAll(suggModelJoin.getFKSide().getAlias(), 
modelJoinDesc.getFKSide().getAlias());
-            suggModelJoin.getNonEquiJoinCondition().setExpr(expr);
-            suggModelJoin.setPrimaryTableRef(modelJoinDesc.getPKSide());
-            suggModelJoin.setPrimaryTable(modelJoinDesc.getPrimaryTable());
-            suggModelJoin.setForeignTableRef(modelJoinDesc.getFKSide());
-            suggModelJoin.setForeignTable(modelJoinDesc.getForeignTable());
+            }.visit(analyzedJoin.getNonEquiJoinCondition());
+            analyzedJoin.setNonEquiJoinCondition(nonEquiCondWithAliasRestored);
+            String expr = analyzedJoin.getNonEquiJoinCondition().getExpr();
+            expr = expr.replaceAll(analyzedJoin.getPKSide().getAlias(), 
joinWithoutNonEquivInfo.getPKSide().getAlias());
+            expr = expr.replaceAll(analyzedJoin.getFKSide().getAlias(), 
joinWithoutNonEquivInfo.getFKSide().getAlias());
+            analyzedJoin.getNonEquiJoinCondition().setExpr(expr);
+            
analyzedJoin.setPrimaryTableRef(joinWithoutNonEquivInfo.getPKSide());
+            
analyzedJoin.setPrimaryTable(joinWithoutNonEquivInfo.getPrimaryTable());
+            
analyzedJoin.setForeignTableRef(joinWithoutNonEquivInfo.getFKSide());
+            
analyzedJoin.setForeignTable(joinWithoutNonEquivInfo.getForeignTable());
 
             //4. update dataModel
             try {
 
-                
SCD2NonEquiCondSimplification.INSTANCE.convertToSimplifiedSCD2Cond(suggModelJoin);
-                
modelJoinDesc.setNonEquiJoinCondition(suggModelJoin.getNonEquiJoinCondition());
-                modelJoinDesc.setForeignTable(suggModelJoin.getForeignTable());
-                modelJoinDesc.setPrimaryTable(suggModelJoin.getPrimaryTable());
+                
SCD2NonEquiCondSimplification.INSTANCE.convertToSimplifiedSCD2Cond(analyzedJoin);
+                
joinWithoutNonEquivInfo.setNonEquiJoinCondition(analyzedJoin.getNonEquiJoinCondition());
+                
joinWithoutNonEquivInfo.setForeignTable(analyzedJoin.getForeignTable());
+                
joinWithoutNonEquivInfo.setPrimaryTable(analyzedJoin.getPrimaryTable());
             } catch (SCD2Exception e) {
                 logger.error("Update datamodel failed...", e);
                 throw new KylinException(QueryErrorCode.SCD2_COMMON_ERROR, 
Throwables.getRootCause(e).getMessage());
@@ -328,6 +329,23 @@ public class ModelSemanticHelper extends BasicService {
 
     }
 
+    private JoinDesc deriveJoins(QueryExec queryExec, String sql) {
+        List<OLAPContext> contexts = queryExec.deriveOlapContexts(sql);
+        Optional<RuntimeException> th;
+        if (contexts.size() == 0) {
+            th = Optional.of(new SCD2Exception("Failed to extract joins from 
the input sql: " + sql));
+        } else if (contexts.size() > 1) {
+            th = Optional.of(new SCD2Exception("Non-equiv-join conditions were 
split. the input sql is: " + sql));
+        } else {
+            OLAPContext ctx = contexts.get(0);
+            if (ctx.joins.size() == 1) {
+                return ctx.joins.get(0);
+            }
+            th = Optional.of(new SCD2Exception("Non-equiv-join conditions were 
split. the input sql is: " + sql));
+        }
+        throw th.get();
+    }
+
     private void checkRequestNonEquiJoinConds(final SimplifiedJoinDesc 
requestJoinDesc) {
 
         if 
(!SCD2CondChecker.INSTANCE.checkSCD2EquiJoinCond(requestJoinDesc.getForeignKey(),
diff --git 
a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSmartSupporter.java
 
b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSmartSupporter.java
deleted file mode 100644
index 19bf5061ac..0000000000
--- 
a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSmartSupporter.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.rest.service;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.metadata.model.JoinDesc;
-import org.apache.kylin.metadata.model.util.scd2.SimplifiedJoinDesc;
-
-public interface ModelSmartSupporter {
-    JoinDesc suggNonEquiJoinModel(final KylinConfig kylinConfig, final String 
project, final JoinDesc modelJoinDesc,
-            final SimplifiedJoinDesc requestJoinDesc);
-}
diff --git 
a/src/query/src/main/java/org/apache/kylin/query/engine/QueryExec.java 
b/src/query/src/main/java/org/apache/kylin/query/engine/QueryExec.java
index dd0f66fcc9..80295c22e8 100644
--- a/src/query/src/main/java/org/apache/kylin/query/engine/QueryExec.java
+++ b/src/query/src/main/java/org/apache/kylin/query/engine/QueryExec.java
@@ -45,11 +45,14 @@ import org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexExecutorImpl;
 import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.kylin.common.KapConfig;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.QueryTrace;
 import org.apache.kylin.common.ReadFsSwitch;
+import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.query.StructField;
@@ -68,15 +71,13 @@ import org.apache.kylin.query.relnode.OLAPContext;
 import org.apache.kylin.query.util.AsyncQueryUtil;
 import org.apache.kylin.query.util.CalcitePlanRouterVisitor;
 import org.apache.kylin.query.util.HepUtils;
+import org.apache.kylin.query.util.QueryContextCutter;
 import org.apache.kylin.query.util.QueryInterruptChecker;
 import org.apache.kylin.query.util.QueryUtil;
 import org.apache.kylin.query.util.RelAggPushDownUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
-import org.apache.kylin.guava30.shaded.common.collect.Lists;
-
 import lombok.Getter;
 import lombok.Setter;
 
@@ -211,6 +212,24 @@ public class QueryExec {
         }
     }
 
+    public List<OLAPContext> deriveOlapContexts(String sql) {
+        List<OLAPContext> contexts = Lists.newArrayList();
+        try {
+            OLAPContext.clearThreadLocalContexts();
+            RelNode relNode = parseAndOptimize(sql);
+            QueryContextCutter.analyzeOlapContext(relNode);
+            Collection<OLAPContext> tmp = OLAPContext.getThreadLocalContexts();
+            if (CollectionUtils.isNotEmpty(tmp)) {
+                contexts.addAll(tmp);
+            }
+        } catch (Exception e) {
+            logger.error("Sql Parsing error.", e);
+        } finally {
+            OLAPContext.clearThreadLocalContexts();
+        }
+        return contexts;
+    }
+
     private void magicDirts(String sql) {
         if (sql.contains("ReadFsSwitch.turnOnBackupFsWhile")) {
             ReadFsSwitch.turnOnBackupFsWhile();
diff --git 
a/src/query/src/main/java/org/apache/kylin/query/util/QueryContextCutter.java 
b/src/query/src/main/java/org/apache/kylin/query/util/QueryContextCutter.java
index 55006842b4..5d4e133447 100644
--- 
a/src/query/src/main/java/org/apache/kylin/query/util/QueryContextCutter.java
+++ 
b/src/query/src/main/java/org/apache/kylin/query/util/QueryContextCutter.java
@@ -46,6 +46,15 @@ public class QueryContextCutter {
     private QueryContextCutter() {
     }
 
+    /**
+     * Analyze a RelNode tree to get olapContexts.
+     * @param root The root relNode of a query statement
+     */
+    public static void analyzeOlapContext(RelNode root) {
+        cutContext(new ContextInitialCutStrategy(), (KapRel) root.getInput(0), 
root);
+        fillOlapContextPropertiesWithRelTree(root);
+    }
+
     /**
      * For each query parse tree, the following steps are used for generating 
OlapContexts 
      * and matching the precomputed indexes.

Reply via email to