This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push: new 4093d3f0ac KYLIN-5681 fix scd2 modeling 4093d3f0ac is described below commit 4093d3f0acb4b29788e20248f4d78eff321a7f17 Author: Pengfei Zhan <dethr...@gmail.com> AuthorDate: Wed Aug 9 12:45:25 2023 +0800 KYLIN-5681 fix scd2 modeling --- src/modeling-service/pom.xml | 4 + .../kylin/rest/service/ModelSemanticHelper.java | 86 +++++++++++++--------- .../kylin/rest/service/ModelSmartSupporter.java | 27 ------- .../org/apache/kylin/query/engine/QueryExec.java | 25 ++++++- .../kylin/query/util/QueryContextCutter.java | 9 +++ 5 files changed, 87 insertions(+), 64 deletions(-) diff --git a/src/modeling-service/pom.xml b/src/modeling-service/pom.xml index 838a96531c..f152bdcd3b 100644 --- a/src/modeling-service/pom.xml +++ b/src/modeling-service/pom.xml @@ -37,6 +37,10 @@ <groupId>org.apache.kylin</groupId> <artifactId>kylin-common-service</artifactId> </dependency> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-query</artifactId> + </dependency> <dependency> <groupId>org.apache.kylin</groupId> <artifactId>kylin-datasource-service</artifactId> diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java index 0c8435af9a..0badc8532a 100644 --- a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java +++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; @@ -43,6 +44,7 @@ import org.apache.calcite.sql.util.SqlVisitor; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.exception.CommonErrorCode; import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.exception.QueryErrorCode; @@ -53,6 +55,11 @@ import org.apache.kylin.common.util.ModifyTableNameSqlVisitor; import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.RandomUtil; import org.apache.kylin.engine.spark.utils.ComputedColumnEvalUtil; +import org.apache.kylin.guava30.shaded.common.base.Throwables; +import org.apache.kylin.guava30.shaded.common.collect.ImmutableBiMap; +import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.kylin.guava30.shaded.common.collect.Maps; +import org.apache.kylin.guava30.shaded.common.collect.Sets; import org.apache.kylin.job.manager.JobManager; import org.apache.kylin.job.model.JobParam; import org.apache.kylin.metadata.cube.cuboid.NAggregationGroup; @@ -91,10 +98,13 @@ import org.apache.kylin.metadata.model.util.ExpandableMeasureUtil; import org.apache.kylin.metadata.model.util.scd2.SCD2CondChecker; import org.apache.kylin.metadata.model.util.scd2.SCD2Exception; import org.apache.kylin.metadata.model.util.scd2.SCD2NonEquiCondSimplification; +import org.apache.kylin.metadata.model.util.scd2.SCD2SqlConverter; import org.apache.kylin.metadata.model.util.scd2.SimplifiedJoinDesc; import org.apache.kylin.metadata.model.util.scd2.SimplifiedJoinTableDesc; import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.metadata.recommendation.ref.OptRecManagerV2; +import org.apache.kylin.query.engine.QueryExec; +import org.apache.kylin.query.relnode.OLAPContext; import org.apache.kylin.query.util.PushDownUtil; import org.apache.kylin.query.util.QueryUtil; import org.apache.kylin.rest.request.ModelRequest; @@ -106,18 +116,10 @@ import org.apache.kylin.rest.util.SpringContext; import org.apache.kylin.source.SourceFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import org.apache.kylin.guava30.shaded.common.base.Throwables; -import org.apache.kylin.guava30.shaded.common.collect.ImmutableBiMap; -import org.apache.kylin.guava30.shaded.common.collect.Lists; -import org.apache.kylin.guava30.shaded.common.collect.Maps; -import org.apache.kylin.guava30.shaded.common.collect.Sets; - import io.kyligence.kap.secondstorage.SecondStorageUpdater; import io.kyligence.kap.secondstorage.SecondStorageUtil; -import lombok.Setter; import lombok.val; import lombok.var; import lombok.extern.slf4j.Slf4j; @@ -126,10 +128,6 @@ import lombok.extern.slf4j.Slf4j; @Service public class ModelSemanticHelper extends BasicService { - @Setter - @Autowired(required = false) - private ModelSmartSupporter modelSmartSupporter; - private static final Logger logger = LoggerFactory.getLogger(ModelSemanticHelper.class); private final ExpandableMeasureUtil expandableMeasureUtil = new ExpandableMeasureUtil((model, ccDesc) -> { String ccExpression = PushDownUtil.massageComputedColumn(model, model.getProject(), ccDesc, @@ -257,11 +255,13 @@ public class ModelSemanticHelper extends BasicService { HashSet<JoinDescNonEquiCompBean> scd2NonEquiCondSets = new HashSet<>(); - val projectKylinConfig = getManager(NProjectManager.class).getProject(dataModel.getProject()).getConfig(); + String project = dataModel.getProject(); + val projectKylinConfig = NProjectManager.getProjectConfig(project); boolean isScd2Enabled = projectKylinConfig.isQueryNonEquiJoinModelEnabled(); - + QueryContext.current().setAclInfo(AclPermissionUtil.createAclInfo(project, getCurrentUserGroups())); + QueryExec queryExec = new QueryExec(project, projectKylinConfig, false); for (int i = 0; i < requestJoinTableDescs.size(); i++) { - final JoinDesc modelJoinDesc = dataModel.getJoinTables().get(i).getJoin(); + final JoinDesc joinWithoutNonEquivInfo = dataModel.getJoinTables().get(i).getJoin(); final SimplifiedJoinDesc requestJoinDesc = requestJoinTableDescs.get(i).getSimplifiedJoinDesc(); if (CollectionUtils.isEmpty(requestJoinDesc.getSimplifiedNonEquiJoinConditions())) { @@ -277,41 +277,42 @@ public class ModelSemanticHelper extends BasicService { checkRequestNonEquiJoinConds(requestJoinDesc); //3. suggest nonEquiModel - final JoinDesc suggModelJoin = modelSmartSupporter.suggNonEquiJoinModel(projectKylinConfig, - dataModel.getProject(), modelJoinDesc, requestJoinDesc); + String scd2Sql = SCD2SqlConverter.INSTANCE.genSCD2SqlStr(joinWithoutNonEquivInfo, + requestJoinDesc.getSimplifiedNonEquiJoinConditions()); + final JoinDesc analyzedJoin = deriveJoins(queryExec, scd2Sql); // restore table alias in non-equi conditions final NonEquiJoinCondition nonEquiCondWithAliasRestored = new NonEquiJoinConditionVisitor() { @Override public NonEquiJoinCondition visitColumn(NonEquiJoinCondition cond) { TableRef originalTableRef; if (cond.getColRef().getTableRef().getTableIdentity() - .equals(modelJoinDesc.getPKSide().getTableIdentity())) { - originalTableRef = modelJoinDesc.getPKSide(); + .equals(joinWithoutNonEquivInfo.getPKSide().getTableIdentity())) { + originalTableRef = joinWithoutNonEquivInfo.getPKSide(); } else { - originalTableRef = modelJoinDesc.getFKSide(); + originalTableRef = joinWithoutNonEquivInfo.getFKSide(); } return new NonEquiJoinCondition(originalTableRef.getColumn(cond.getColRef().getName()), cond.getDataType()); } - }.visit(suggModelJoin.getNonEquiJoinCondition()); - suggModelJoin.setNonEquiJoinCondition(nonEquiCondWithAliasRestored); - String expr = suggModelJoin.getNonEquiJoinCondition().getExpr(); - expr = expr.replaceAll(suggModelJoin.getPKSide().getAlias(), modelJoinDesc.getPKSide().getAlias()); - expr = expr.replaceAll(suggModelJoin.getFKSide().getAlias(), modelJoinDesc.getFKSide().getAlias()); - suggModelJoin.getNonEquiJoinCondition().setExpr(expr); - suggModelJoin.setPrimaryTableRef(modelJoinDesc.getPKSide()); - suggModelJoin.setPrimaryTable(modelJoinDesc.getPrimaryTable()); - suggModelJoin.setForeignTableRef(modelJoinDesc.getFKSide()); - suggModelJoin.setForeignTable(modelJoinDesc.getForeignTable()); + }.visit(analyzedJoin.getNonEquiJoinCondition()); + analyzedJoin.setNonEquiJoinCondition(nonEquiCondWithAliasRestored); + String expr = analyzedJoin.getNonEquiJoinCondition().getExpr(); + expr = expr.replaceAll(analyzedJoin.getPKSide().getAlias(), joinWithoutNonEquivInfo.getPKSide().getAlias()); + expr = expr.replaceAll(analyzedJoin.getFKSide().getAlias(), joinWithoutNonEquivInfo.getFKSide().getAlias()); + analyzedJoin.getNonEquiJoinCondition().setExpr(expr); + analyzedJoin.setPrimaryTableRef(joinWithoutNonEquivInfo.getPKSide()); + analyzedJoin.setPrimaryTable(joinWithoutNonEquivInfo.getPrimaryTable()); + analyzedJoin.setForeignTableRef(joinWithoutNonEquivInfo.getFKSide()); + analyzedJoin.setForeignTable(joinWithoutNonEquivInfo.getForeignTable()); //4. update dataModel try { - SCD2NonEquiCondSimplification.INSTANCE.convertToSimplifiedSCD2Cond(suggModelJoin); - modelJoinDesc.setNonEquiJoinCondition(suggModelJoin.getNonEquiJoinCondition()); - modelJoinDesc.setForeignTable(suggModelJoin.getForeignTable()); - modelJoinDesc.setPrimaryTable(suggModelJoin.getPrimaryTable()); + SCD2NonEquiCondSimplification.INSTANCE.convertToSimplifiedSCD2Cond(analyzedJoin); + joinWithoutNonEquivInfo.setNonEquiJoinCondition(analyzedJoin.getNonEquiJoinCondition()); + joinWithoutNonEquivInfo.setForeignTable(analyzedJoin.getForeignTable()); + joinWithoutNonEquivInfo.setPrimaryTable(analyzedJoin.getPrimaryTable()); } catch (SCD2Exception e) { logger.error("Update datamodel failed...", e); throw new KylinException(QueryErrorCode.SCD2_COMMON_ERROR, Throwables.getRootCause(e).getMessage()); @@ -328,6 +329,23 @@ public class ModelSemanticHelper extends BasicService { } + private JoinDesc deriveJoins(QueryExec queryExec, String sql) { + List<OLAPContext> contexts = queryExec.deriveOlapContexts(sql); + Optional<RuntimeException> th; + if (contexts.size() == 0) { + th = Optional.of(new SCD2Exception("Failed to extract joins from the input sql: " + sql)); + } else if (contexts.size() > 1) { + th = Optional.of(new SCD2Exception("Non-equiv-join conditions were split. the input sql is: " + sql)); + } else { + OLAPContext ctx = contexts.get(0); + if (ctx.joins.size() == 1) { + return ctx.joins.get(0); + } + th = Optional.of(new SCD2Exception("Non-equiv-join conditions were split. the input sql is: " + sql)); + } + throw th.get(); + } + private void checkRequestNonEquiJoinConds(final SimplifiedJoinDesc requestJoinDesc) { if (!SCD2CondChecker.INSTANCE.checkSCD2EquiJoinCond(requestJoinDesc.getForeignKey(), diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSmartSupporter.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSmartSupporter.java deleted file mode 100644 index 19bf5061ac..0000000000 --- a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSmartSupporter.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kylin.rest.service; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.metadata.model.JoinDesc; -import org.apache.kylin.metadata.model.util.scd2.SimplifiedJoinDesc; - -public interface ModelSmartSupporter { - JoinDesc suggNonEquiJoinModel(final KylinConfig kylinConfig, final String project, final JoinDesc modelJoinDesc, - final SimplifiedJoinDesc requestJoinDesc); -} diff --git a/src/query/src/main/java/org/apache/kylin/query/engine/QueryExec.java b/src/query/src/main/java/org/apache/kylin/query/engine/QueryExec.java index dd0f66fcc9..80295c22e8 100644 --- a/src/query/src/main/java/org/apache/kylin/query/engine/QueryExec.java +++ b/src/query/src/main/java/org/apache/kylin/query/engine/QueryExec.java @@ -45,11 +45,14 @@ import org.apache.calcite.rel.type.RelDataTypeSystem; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexExecutorImpl; import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.commons.collections.CollectionUtils; import org.apache.kylin.common.KapConfig; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.QueryTrace; import org.apache.kylin.common.ReadFsSwitch; +import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting; +import org.apache.kylin.guava30.shaded.common.collect.Lists; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.metadata.query.StructField; @@ -68,15 +71,13 @@ import org.apache.kylin.query.relnode.OLAPContext; import org.apache.kylin.query.util.AsyncQueryUtil; import org.apache.kylin.query.util.CalcitePlanRouterVisitor; import org.apache.kylin.query.util.HepUtils; +import org.apache.kylin.query.util.QueryContextCutter; import org.apache.kylin.query.util.QueryInterruptChecker; import org.apache.kylin.query.util.QueryUtil; import org.apache.kylin.query.util.RelAggPushDownUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting; -import org.apache.kylin.guava30.shaded.common.collect.Lists; - import lombok.Getter; import lombok.Setter; @@ -211,6 +212,24 @@ public class QueryExec { } } + public List<OLAPContext> deriveOlapContexts(String sql) { + List<OLAPContext> contexts = Lists.newArrayList(); + try { + OLAPContext.clearThreadLocalContexts(); + RelNode relNode = parseAndOptimize(sql); + QueryContextCutter.analyzeOlapContext(relNode); + Collection<OLAPContext> tmp = OLAPContext.getThreadLocalContexts(); + if (CollectionUtils.isNotEmpty(tmp)) { + contexts.addAll(tmp); + } + } catch (Exception e) { + logger.error("Sql Parsing error.", e); + } finally { + OLAPContext.clearThreadLocalContexts(); + } + return contexts; + } + private void magicDirts(String sql) { if (sql.contains("ReadFsSwitch.turnOnBackupFsWhile")) { ReadFsSwitch.turnOnBackupFsWhile(); diff --git a/src/query/src/main/java/org/apache/kylin/query/util/QueryContextCutter.java b/src/query/src/main/java/org/apache/kylin/query/util/QueryContextCutter.java index 55006842b4..5d4e133447 100644 --- a/src/query/src/main/java/org/apache/kylin/query/util/QueryContextCutter.java +++ b/src/query/src/main/java/org/apache/kylin/query/util/QueryContextCutter.java @@ -46,6 +46,15 @@ public class QueryContextCutter { private QueryContextCutter() { } + /** + * Analyze a RelNode tree to get olapContexts. + * @param root The root relNode of a query statement + */ + public static void analyzeOlapContext(RelNode root) { + cutContext(new ContextInitialCutStrategy(), (KapRel) root.getInput(0), root); + fillOlapContextPropertiesWithRelTree(root); + } + /** * For each query parse tree, the following steps are used for generating OlapContexts * and matching the precomputed indexes.