This is an automated email from the ASF dual-hosted git repository. liyang pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit c3b676012acaf2a5f5e4063dcee0519a5f29641f Author: Zhimin Wu <[email protected]> AuthorDate: Fri Sep 27 15:05:55 2024 +0800 KYLIN-5973 fix streaming function --- .../metadata/mapper/FusionModelDynamicSqlSupport.java | 2 -- .../persistence/metadata/mapper/FusionModelMapper.java | 11 ----------- .../metadata/mapper/StreamingJobMapper.java | 6 ++++++ .../persistence/resources/FusionModelRawResource.java | 6 ++++-- .../src/main/resources/metadata-jdbc-h2.properties | 1 - .../src/main/resources/metadata-jdbc-mysql.properties | 1 - .../main/resources/metadata-jdbc-postgresql.properties | 1 - .../src/test/resources/ut_big_sqls/sqls1.expect | 18 ++++++++++++++++++ .../kylin/query/runtime/plan/TableScanPlan.scala | 10 +++++----- 9 files changed, 33 insertions(+), 23 deletions(-) diff --git a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/FusionModelDynamicSqlSupport.java b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/FusionModelDynamicSqlSupport.java index 521608cbd6..76c30d62e1 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/FusionModelDynamicSqlSupport.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/FusionModelDynamicSqlSupport.java @@ -30,8 +30,6 @@ public final class FusionModelDynamicSqlSupport { public static final class FusionModel extends BasicSqlTable<FusionModel> { - public final SqlColumn<String> modelUuid = column("model_uuid", JDBCType.CHAR); - public FusionModel() { super("fusion_model", FusionModel::new); } diff --git a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/FusionModelMapper.java b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/FusionModelMapper.java index ed36d80473..69c39af557 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/FusionModelMapper.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/FusionModelMapper.java @@ -51,7 +51,6 @@ public interface FusionModelMapper extends BasicMapper<FusionModelRawResource> { @Result(column = "meta_key", property = "metaKey", jdbcType = JdbcType.VARCHAR), @Result(column = "project", property = "project", jdbcType = JdbcType.VARCHAR), @Result(column = "uuid", property = "uuid", jdbcType = JdbcType.CHAR), - @Result(column = "model_uuid", property = "modelUuid", jdbcType = JdbcType.CHAR), @Result(column = "mvcc", property = "mvcc", jdbcType = JdbcType.BIGINT), @Result(column = "ts", property = "ts", jdbcType = JdbcType.BIGINT), @Result(column = "reserved_filed_1", property = "reservedFiled1", jdbcType = JdbcType.VARCHAR), @@ -60,21 +59,11 @@ public interface FusionModelMapper extends BasicMapper<FusionModelRawResource> { @Result(column = "reserved_filed_3", property = "reservedFiled3", jdbcType = JdbcType.LONGVARBINARY) }) List<FusionModelRawResource> selectMany(SelectStatementProvider selectStatement); - @Override - default UpdateDSL<UpdateModel> updateAllColumns(FusionModelRawResource record, UpdateDSL<UpdateModel> dsl) { - dsl = BasicMapper.super.updateAllColumns(record, dsl); - return dsl.set(sqlTable.modelUuid).equalTo(record::getModelUuid); - } - @Override default BasicSqlTable getSqlTable() { return sqlTable; } - @Override - default BasicColumn[] getSelectList() { - return getSelectListWithAdditions(sqlTable.modelUuid); - } @Override @SelectProvider(type = SqlWithRecordLockProviderAdapter.class, method = "select") diff --git a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/StreamingJobMapper.java b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/StreamingJobMapper.java index 9b342c050a..fc797e1029 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/StreamingJobMapper.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/StreamingJobMapper.java @@ -31,6 +31,7 @@ import org.apache.ibatis.type.JdbcType; import org.apache.kylin.common.persistence.metadata.jdbc.ContentTypeHandler; import org.apache.kylin.common.persistence.metadata.jdbc.SqlWithRecordLockProviderAdapter; import org.apache.kylin.common.persistence.resources.StreamingJobRawResource; +import org.mybatis.dynamic.sql.BasicColumn; import org.mybatis.dynamic.sql.select.SelectDSLCompleter; import org.mybatis.dynamic.sql.select.render.SelectStatementProvider; import org.mybatis.dynamic.sql.util.SqlProviderAdapter; @@ -42,6 +43,11 @@ public interface StreamingJobMapper extends BasicMapper<StreamingJobRawResource> return sqlTable; } + @Override + default BasicColumn[] getSelectList() { + return getSelectListWithAdditions(sqlTable.modelUuid); + } + @Override @SelectProvider(type = SqlWithRecordLockProviderAdapter.class, method = "select") @ResultMap("StreamingJobResult") diff --git a/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/FusionModelRawResource.java b/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/FusionModelRawResource.java index 705183dd27..9568c6d4f6 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/FusionModelRawResource.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/FusionModelRawResource.java @@ -32,6 +32,8 @@ public class FusionModelRawResource extends RawResource { @JsonProperty("project") private String project; - @JsonProperty("model_uuid") - private String modelUuid; + @Override + public String getModelUuid() { + return this.getUuid(); + } } diff --git a/src/core-common/src/main/resources/metadata-jdbc-h2.properties b/src/core-common/src/main/resources/metadata-jdbc-h2.properties index 1b5f558840..84be416a25 100644 --- a/src/core-common/src/main/resources/metadata-jdbc-h2.properties +++ b/src/core-common/src/main/resources/metadata-jdbc-h2.properties @@ -439,7 +439,6 @@ id bigint AUTO_INCREMENT NOT NULL,\ meta_key varchar(255) NOT NULL,\ project varchar(255) NOT NULL,\ uuid CHAR(36) NOT NULL,\ -model_uuid CHAR(36) NOT NULL,\ mvcc bigint NOT NULL,\ ts bigint NOT NULL,\ content bytea NOT NULL,\ diff --git a/src/core-common/src/main/resources/metadata-jdbc-mysql.properties b/src/core-common/src/main/resources/metadata-jdbc-mysql.properties index 3b7dd85199..2754feb704 100644 --- a/src/core-common/src/main/resources/metadata-jdbc-mysql.properties +++ b/src/core-common/src/main/resources/metadata-jdbc-mysql.properties @@ -488,7 +488,6 @@ CREATE TABLE IF NOT EXISTS `%s_fusion_model` \ `meta_key` varchar(255) NOT NULL, \ `project` varchar(255) NOT NULL, \ `uuid` CHAR(36) NOT NULL COLLATE utf8_bin, \ -`model_uuid` CHAR(36) NOT NULL COLLATE utf8_bin, \ `mvcc` bigint NOT NULL, \ `ts` bigint NOT NULL, \ `content` longblob NOT NULL, \ diff --git a/src/core-common/src/main/resources/metadata-jdbc-postgresql.properties b/src/core-common/src/main/resources/metadata-jdbc-postgresql.properties index 734720d886..944a469eac 100644 --- a/src/core-common/src/main/resources/metadata-jdbc-postgresql.properties +++ b/src/core-common/src/main/resources/metadata-jdbc-postgresql.properties @@ -439,7 +439,6 @@ id bigserial NOT NULL,\ meta_key varchar(255) NOT NULL,\ project varchar(255) NOT NULL,\ uuid CHAR(36) COLLATE "C" NOT NULL,\ -model_uuid CHAR(36) NOT NULL,\ mvcc bigint NOT NULL,\ ts bigint NOT NULL,\ content bytea NOT NULL,\ diff --git a/src/rec-service/src/test/resources/ut_big_sqls/sqls1.expect b/src/rec-service/src/test/resources/ut_big_sqls/sqls1.expect index ce48a1b0d3..9701a578e5 100644 --- a/src/rec-service/src/test/resources/ut_big_sqls/sqls1.expect +++ b/src/rec-service/src/test/resources/ut_big_sqls/sqls1.expect @@ -1,3 +1,21 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + SELECT "db_3"."F_NAME" AS "F_NAME_3", "db_3"."P_NAME" AS "P_NAME_3", "db_3"."VERSION" AS "VERSION_3", "db_3"."COUNT" AS "COUNT_3", "db_3"."COST" AS "COST_3" FROM (SELECT COUNT(*) AS count_all, SUM(cost) AS cost, test_d_factory.F_NAME AS F_NAME , test_d_product.P_NAME AS P_NAME, descrip_c AS version FROM t_demo_.t_demo_data t_demo_data diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/TableScanPlan.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/TableScanPlan.scala index d3e1084157..0b8c030b23 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/TableScanPlan.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/TableScanPlan.scala @@ -262,17 +262,17 @@ object TableScanPlan extends LogEx { val otherDims = Sets.newHashSet(context.getDimensions) otherDims.removeAll(groups) // expand derived (xxxD means contains host columns only, derived columns were translated) - val groupsD = expandDerived(context.getBatchCandidate, groups) + val groupsD = expandDerived(context.getCandidate, groups) val otherDimsD: util.Set[TblColRef] = - expandDerived(context.getBatchCandidate, otherDims) + expandDerived(context.getCandidate, otherDims) otherDimsD.removeAll(groupsD) // identify cuboid val dimensionsD = new util.LinkedHashSet[TblColRef] dimensionsD.addAll(groupsD) dimensionsD.addAll(otherDimsD) - val model = context.getBatchCandidate.getLayoutEntity.getModel - context.getBatchCandidate.getDerivedToHostMap.asScala.toList.foreach(m => { + val model = context.getCandidate.getLayoutEntity.getModel + context.getCandidate.getDerivedToHostMap.asScala.toList.foreach(m => { if (m._2.`type` == DeriveInfo.DeriveType.LOOKUP && !m._2.isOneToOne) { m._2.columns.asScala.foreach(derivedId => { if (mapping.getIndexOf(model.getColRef(derivedId)) != -1) { @@ -288,7 +288,7 @@ object TableScanPlan extends LogEx { dataflow.getLatestReadySegment, gtColIdx, olapContext.getReturnTupleInfo, - context.getBatchCandidate) + context.getCandidate) if (derived.hasDerived) { newPlan = derived.joinDerived(newPlan) }
