This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new e03517e4e58 [enhance](mtmv)External partition prune #44415 #44567 #44673 (#44767) e03517e4e58 is described below commit e03517e4e583d862475e92daed6a431dddee05d3 Author: zhangdong <zhangd...@selectdb.com> AuthorDate: Fri Nov 29 20:53:01 2024 +0800 [enhance](mtmv)External partition prune #44415 #44567 #44673 (#44767) pick: https://github.com/apache/doris/pull/44673 https://github.com/apache/doris/pull/44415 https://github.com/apache/doris/pull/44567 --- .../java/org/apache/doris/catalog/OlapTable.java | 23 +++++- .../org/apache/doris/datasource/ExternalTable.java | 53 ++++++++++++++ .../doris/datasource/hive/HMSExternalTable.java | 62 ++++++++++------ .../mvcc/MvccSnapshot.java} | 17 ++--- .../mvcc/MvccTable.java} | 23 +++--- .../doris/datasource/mvcc/MvccTableInfo.java | 84 ++++++++++++++++++++++ .../doris/mtmv/MTMVPartitionExprDateTrunc.java | 2 +- .../org/apache/doris/mtmv/MTMVPartitionInfo.java | 3 +- .../org/apache/doris/mtmv/MTMVPartitionUtil.java | 11 +-- .../MTMVRelatedPartitionDescInitGenerator.java | 3 +- .../MTMVRelatedPartitionDescRollUpGenerator.java | 3 +- .../org/apache/doris/mtmv/MTMVRelatedTableIf.java | 22 ++++-- .../org/apache/doris/nereids/CascadesContext.java | 8 +++ .../org/apache/doris/nereids/NereidsPlanner.java | 2 +- .../org/apache/doris/nereids/StatementContext.java | 31 ++++++++ .../doris/nereids/rules/analysis/BindRelation.java | 1 - .../exploration/mv/MaterializedViewUtils.java | 4 +- .../rules/OneListPartitionEvaluator.java | 14 ++-- .../expression/rules/OnePartitionEvaluator.java | 4 +- .../rules/OneRangePartitionEvaluator.java | 14 ++-- .../rules/expression/rules/PartitionPruner.java | 16 ++--- .../rules/UnknownPartitionEvaluator.java | 12 ++-- .../rules/rewrite/PruneFileScanPartition.java | 32 ++++----- .../trees/plans/commands/info/CreateMTMVInfo.java | 2 +- .../commands/info/MTMVPartitionDefinition.java | 3 +- .../trees/plans/logical/LogicalFileScan.java | 16 ++--- .../apache/doris/mtmv/MTMVPartitionUtilTest.java | 5 +- 27 files changed, 340 insertions(+), 130 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index f9404e21fa2..0c6cbc828e4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -47,6 +47,7 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.datasource.mvcc.MvccSnapshot; import org.apache.doris.mtmv.MTMVRefreshContext; import org.apache.doris.mtmv.MTMVRelatedTableIf; import org.apache.doris.mtmv.MTMVSnapshotIf; @@ -983,6 +984,10 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { } @Override + public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> snapshot) throws DdlException { + return getPartitionColumnNames(); + } + public Set<String> getPartitionColumnNames() throws DdlException { Set<String> partitionColumnNames = Sets.newHashSet(); if (partitionInfo instanceof SinglePartitionInfo) { @@ -3001,11 +3006,20 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { } @Override + public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) { + return getPartitionType(); + } + public PartitionType getPartitionType() { return partitionInfo.getType(); } @Override + public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) + throws AnalysisException { + return getAndCopyPartitionItems(); + } + public Map<String, PartitionItem> getAndCopyPartitionItems() throws AnalysisException { if (!tryReadLock(1, TimeUnit.MINUTES)) { throw new AnalysisException( @@ -3026,12 +3040,17 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { } @Override + public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) { + return getPartitionColumns(); + } + public List<Column> getPartitionColumns() { return getPartitionInfo().getPartitionColumns(); } @Override - public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context) + public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, + Optional<MvccSnapshot> snapshot) throws AnalysisException { Map<String, Long> partitionVersions = context.getBaseVersions().getPartitionVersions(); long partitionId = getPartitionOrAnalysisException(partitionName).getId(); @@ -3041,7 +3060,7 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { } @Override - public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) { + public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<MvccSnapshot> snapshot) { Map<Long, Long> tableVersions = context.getBaseVersions().getTableVersions(); long visibleVersion = tableVersions.containsKey(id) ? tableVersions.get(id) : getVisibleVersion(); return new MTMVVersionSnapshot(visibleVersion, id); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java index 590a4cbe046..041f7e35c16 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java @@ -20,6 +20,7 @@ package org.apache.doris.datasource; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.TableAttributes; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.constraint.Constraint; @@ -28,6 +29,8 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.mvcc.MvccSnapshot; +import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.statistics.AnalysisInfo; @@ -40,6 +43,7 @@ import org.apache.doris.thrift.TTableDescriptor; import com.google.common.collect.Lists; import com.google.gson.annotations.SerializedName; import lombok.Getter; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.NotImplementedException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -47,6 +51,7 @@ import org.apache.logging.log4j.Logger; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -371,4 +376,52 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog); return cache.getSchemaValue(dbName, name); } + + /** + * Retrieve all partitions and initialize SelectedPartitions + * + * @param snapshot if not support mvcc, ignore this + * @return + */ + public SelectedPartitions initSelectedPartitions(Optional<MvccSnapshot> snapshot) { + if (!supportPartitionPruned()) { + return SelectedPartitions.NOT_PRUNED; + } + if (CollectionUtils.isEmpty(this.getPartitionColumns(snapshot))) { + return SelectedPartitions.NOT_PRUNED; + } + Map<String, PartitionItem> nameToPartitionItems = getNameToPartitionItems(snapshot); + return new SelectedPartitions(nameToPartitionItems.size(), nameToPartitionItems, false); + } + + /** + * get partition map + * If partition related operations are supported, this method needs to be implemented in the subclass + * + * @param snapshot if not support mvcc, ignore this + * @return partitionName ==> PartitionItem + */ + public Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) { + return Collections.emptyMap(); + } + + /** + * get partition column list + * If partition related operations are supported, this method needs to be implemented in the subclass + * + * @param snapshot if not support mvcc, ignore this + * @return + */ + public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) { + return Collections.emptyList(); + } + + /** + * Does it support partition cpruned, If so, this method needs to be overridden in subclasses + * + * @return + */ + public boolean supportPartitionPruned() { + return false; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index 5df44fda476..9254d68a4ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -33,6 +33,7 @@ import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.datasource.hudi.HudiUtils; import org.apache.doris.datasource.iceberg.IcebergUtils; +import org.apache.doris.datasource.mvcc.MvccSnapshot; import org.apache.doris.mtmv.MTMVBaseTableIf; import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot; import org.apache.doris.mtmv.MTMVRefreshContext; @@ -40,7 +41,6 @@ import org.apache.doris.mtmv.MTMVRelatedTableIf; import org.apache.doris.mtmv.MTMVSnapshotIf; import org.apache.doris.mtmv.MTMVTimestampSnapshot; import org.apache.doris.nereids.exceptions.NotSupportedException; -import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; import org.apache.doris.qe.GlobalVariable; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; @@ -288,7 +288,6 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI .orElse(Collections.emptyList()); } - @Override public List<Column> getPartitionColumns() { makeSureInitialized(); Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue(); @@ -296,19 +295,38 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI .orElse(Collections.emptyList()); } - public SelectedPartitions getAllPartitions() { + @Override + public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) { + return getPartitionColumns(); + } + + @Override + public boolean supportPartitionPruned() { + return getDlaType() == DLAType.HIVE; + } + + @Override + public Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) { + return getNameToPartitionItems(); + } + + public Map<String, PartitionItem> getNameToPartitionItems() { if (CollectionUtils.isEmpty(this.getPartitionColumns())) { - return SelectedPartitions.NOT_PRUNED; + return Collections.emptyMap(); } - HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() .getMetaStoreCache((HMSExternalCatalog) this.getCatalog()); List<Type> partitionColumnTypes = this.getPartitionColumnTypes(); HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues( this.getDbName(), this.getName(), partitionColumnTypes); Map<Long, PartitionItem> idToPartitionItem = hivePartitionValues.getIdToPartitionItem(); - - return new SelectedPartitions(idToPartitionItem.size(), idToPartitionItem, false); + // transfer id to name + BiMap<Long, String> idToName = hivePartitionValues.getPartitionNameToIdMap().inverse(); + Map<String, PartitionItem> nameToPartitionItem = Maps.newHashMapWithExpectedSize(idToPartitionItem.size()); + for (Entry<Long, PartitionItem> entry : idToPartitionItem.entrySet()) { + nameToPartitionItem.put(idToName.get(entry.getKey()), entry.getValue()); + } + return nameToPartitionItem; } public boolean isHiveTransactionalTable() { @@ -739,34 +757,33 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI } @Override + public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) { + return getPartitionType(); + } + public PartitionType getPartitionType() { return getPartitionColumns().size() > 0 ? PartitionType.LIST : PartitionType.UNPARTITIONED; } @Override + public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> snapshot) { + return getPartitionColumnNames(); + } + public Set<String> getPartitionColumnNames() { return getPartitionColumns().stream() .map(c -> c.getName().toLowerCase()).collect(Collectors.toSet()); } @Override - public Map<String, PartitionItem> getAndCopyPartitionItems() { - HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() - .getMetaStoreCache((HMSExternalCatalog) getCatalog()); - HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues( - getDbName(), getName(), getPartitionColumnTypes()); - Map<String, PartitionItem> res = Maps.newHashMap(); - Map<Long, PartitionItem> idToPartitionItem = hivePartitionValues.getIdToPartitionItem(); - BiMap<Long, String> idToName = hivePartitionValues.getPartitionNameToIdMap().inverse(); - for (Entry<Long, PartitionItem> entry : idToPartitionItem.entrySet()) { - res.put(idToName.get(entry.getKey()), entry.getValue()); - } - return res; + + public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) { + return getNameToPartitionItems(); } @Override - public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context) - throws AnalysisException { + public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, + Optional<MvccSnapshot> snapshot) throws AnalysisException { HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() .getMetaStoreCache((HMSExternalCatalog) getCatalog()); HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues( @@ -778,7 +795,8 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI } @Override - public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws AnalysisException { + public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<MvccSnapshot> snapshot) + throws AnalysisException { if (getPartitionType() == PartitionType.UNPARTITIONED) { return new MTMVMaxTimestampSnapshot(getName(), getLastDdlTime()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescInitGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccSnapshot.java similarity index 60% copy from fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescInitGenerator.java copy to fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccSnapshot.java index 13b58239376..d7826b0a5de 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescInitGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccSnapshot.java @@ -15,20 +15,11 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.mtmv; - -import org.apache.doris.common.AnalysisException; - -import java.util.Map; +package org.apache.doris.datasource.mvcc; /** - * get all related partition descs + * The snapshot information of mvcc is defined by each table, + * but it should be ensured that the table information queried through this snapshot remains unchanged */ -public class MTMVRelatedPartitionDescInitGenerator implements MTMVRelatedPartitionDescGeneratorService { - - @Override - public void apply(MTMVPartitionInfo mvPartitionInfo, Map<String, String> mvProperties, - RelatedPartitionDescResult lastResult) throws AnalysisException { - lastResult.setItems(mvPartitionInfo.getRelatedTable().getAndCopyPartitionItems()); - } +public interface MvccSnapshot { } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescInitGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java similarity index 60% copy from fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescInitGenerator.java copy to fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java index 13b58239376..d69e0f3114d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescInitGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java @@ -15,20 +15,19 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.mtmv; +package org.apache.doris.datasource.mvcc; -import org.apache.doris.common.AnalysisException; - -import java.util.Map; +import org.apache.doris.catalog.TableIf; /** - * get all related partition descs + * The table that needs to query data based on the version needs to implement this interface. */ -public class MTMVRelatedPartitionDescInitGenerator implements MTMVRelatedPartitionDescGeneratorService { - - @Override - public void apply(MTMVPartitionInfo mvPartitionInfo, Map<String, String> mvProperties, - RelatedPartitionDescResult lastResult) throws AnalysisException { - lastResult.setItems(mvPartitionInfo.getRelatedTable().getAndCopyPartitionItems()); - } +public interface MvccTable extends TableIf { + /** + * Retrieve the current snapshot information of the table, + * and the returned result will be used for the entire process of this query + * + * @return MvccSnapshot + */ + MvccSnapshot loadSnapshot(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTableInfo.java new file mode 100644 index 00000000000..0d865f837c8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTableInfo.java @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.mvcc; + +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.datasource.CatalogIf; + +import com.google.common.base.Objects; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class MvccTableInfo { + private static final Logger LOG = LogManager.getLogger(MvccTableInfo.class); + + private String tableName; + private String dbName; + private String ctlName; + + public MvccTableInfo(TableIf table) { + java.util.Objects.requireNonNull(table, "table is null"); + DatabaseIf database = table.getDatabase(); + java.util.Objects.requireNonNull(database, "database is null"); + CatalogIf catalog = database.getCatalog(); + java.util.Objects.requireNonNull(database, "catalog is null"); + this.tableName = table.getName(); + this.dbName = database.getFullName(); + this.ctlName = catalog.getName(); + } + + public String getTableName() { + return tableName; + } + + public String getDbName() { + return dbName; + } + + public String getCtlName() { + return ctlName; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + MvccTableInfo that = (MvccTableInfo) o; + return Objects.equal(tableName, that.tableName) && Objects.equal( + dbName, that.dbName) && Objects.equal(ctlName, that.ctlName); + } + + @Override + public int hashCode() { + return Objects.hashCode(tableName, dbName, ctlName); + } + + @Override + public String toString() { + return "MvccTableInfo{" + + "tableName='" + tableName + '\'' + + ", dbName='" + dbName + '\'' + + ", ctlName='" + ctlName + '\'' + + '}'; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionExprDateTrunc.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionExprDateTrunc.java index ea15c84d1b9..95a8717e01c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionExprDateTrunc.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionExprDateTrunc.java @@ -69,7 +69,7 @@ public class MTMVPartitionExprDateTrunc implements MTMVPartitionExprService { String.format("timeUnit not support: %s, only support: %s", this.timeUnit, timeUnits)); } MTMVRelatedTableIf relatedTable = mvPartitionInfo.getRelatedTable(); - PartitionType partitionType = relatedTable.getPartitionType(); + PartitionType partitionType = relatedTable.getPartitionType(Optional.empty()); if (partitionType == PartitionType.RANGE) { Type partitionColumnType = MTMVPartitionUtil .getPartitionColumnType(mvPartitionInfo.getRelatedTable(), mvPartitionInfo.getRelatedCol()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java index b3cd239269a..7eae44db0af 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java @@ -25,6 +25,7 @@ import org.apache.doris.datasource.CatalogMgr; import com.google.gson.annotations.SerializedName; import java.util.List; +import java.util.Optional; /** * MTMVPartitionInfo @@ -115,7 +116,7 @@ public class MTMVPartitionInfo { if (partitionType == MTMVPartitionType.SELF_MANAGE) { throw new AnalysisException("partitionType is: " + partitionType); } - List<Column> partitionColumns = getRelatedTable().getPartitionColumns(); + List<Column> partitionColumns = getRelatedTable().getPartitionColumns(Optional.empty()); for (int i = 0; i < partitionColumns.size(); i++) { if (partitionColumns.get(i).getName().equalsIgnoreCase(relatedCol)) { return i; diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java index 14b0f89ac76..836a5f08bff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java @@ -50,6 +50,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -329,7 +330,7 @@ public class MTMVPartitionUtil { } for (String relatedPartitionName : relatedPartitionNames) { MTMVSnapshotIf relatedPartitionCurrentSnapshot = relatedTable - .getPartitionSnapshot(relatedPartitionName, context); + .getPartitionSnapshot(relatedPartitionName, context, Optional.empty()); if (!mtmv.getRefreshSnapshot() .equalsWithRelatedPartition(mtmvPartitionName, relatedPartitionName, relatedPartitionCurrentSnapshot)) { @@ -445,7 +446,7 @@ public class MTMVPartitionUtil { if (!baseTable.needAutoRefresh()) { return true; } - MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot(context); + MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot(context, Optional.empty()); return mtmv.getRefreshSnapshot() .equalsWithBaseTable(mtmvPartitionName, new BaseTableInfo(baseTable), baseTableCurrentSnapshot); } @@ -481,7 +482,7 @@ public class MTMVPartitionUtil { MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable(); for (String relatedPartitionName : relatedPartitionNames) { MTMVSnapshotIf partitionSnapshot = relatedTable - .getPartitionSnapshot(relatedPartitionName, context); + .getPartitionSnapshot(relatedPartitionName, context, Optional.empty()); refreshPartitionSnapshot.getPartitions() .put(relatedPartitionName, partitionSnapshot); } @@ -496,13 +497,13 @@ public class MTMVPartitionUtil { continue; } refreshPartitionSnapshot.addTableSnapshot(baseTableInfo, - ((MTMVRelatedTableIf) table).getTableSnapshot(context)); + ((MTMVRelatedTableIf) table).getTableSnapshot(context, Optional.empty())); } return refreshPartitionSnapshot; } public static Type getPartitionColumnType(MTMVRelatedTableIf relatedTable, String col) throws AnalysisException { - List<Column> partitionColumns = relatedTable.getPartitionColumns(); + List<Column> partitionColumns = relatedTable.getPartitionColumns(Optional.empty()); for (Column column : partitionColumns) { if (column.getName().equals(col)) { return column.getType(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescInitGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescInitGenerator.java index 13b58239376..c6b4e331184 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescInitGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescInitGenerator.java @@ -20,6 +20,7 @@ package org.apache.doris.mtmv; import org.apache.doris.common.AnalysisException; import java.util.Map; +import java.util.Optional; /** * get all related partition descs @@ -29,6 +30,6 @@ public class MTMVRelatedPartitionDescInitGenerator implements MTMVRelatedPartiti @Override public void apply(MTMVPartitionInfo mvPartitionInfo, Map<String, String> mvProperties, RelatedPartitionDescResult lastResult) throws AnalysisException { - lastResult.setItems(mvPartitionInfo.getRelatedTable().getAndCopyPartitionItems()); + lastResult.setItems(mvPartitionInfo.getRelatedTable().getAndCopyPartitionItems(Optional.empty())); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescRollUpGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescRollUpGenerator.java index 76e20ef70f5..325fab819d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescRollUpGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescRollUpGenerator.java @@ -31,6 +31,7 @@ import com.google.common.collect.Sets; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; /** @@ -45,7 +46,7 @@ public class MTMVRelatedPartitionDescRollUpGenerator implements MTMVRelatedParti return; } MTMVRelatedTableIf relatedTable = mvPartitionInfo.getRelatedTable(); - PartitionType partitionType = relatedTable.getPartitionType(); + PartitionType partitionType = relatedTable.getPartitionType(Optional.empty()); if (partitionType == PartitionType.RANGE) { lastResult.setDescs(rollUpRange(lastResult.getDescs(), mvPartitionInfo)); } else if (partitionType == PartitionType.LIST) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java index 4a8b14603ce..c4261aa78f1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java @@ -23,9 +23,11 @@ import org.apache.doris.catalog.PartitionType; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.mvcc.MvccSnapshot; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; /** @@ -38,31 +40,35 @@ public interface MTMVRelatedTableIf extends TableIf { * Note: This method is called every time there is a refresh and transparent rewrite, * so if this method is slow, it will significantly reduce query performance * + * @param snapshot * @return partitionName->PartitionItem */ - Map<String, PartitionItem> getAndCopyPartitionItems() throws AnalysisException; + Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) throws AnalysisException; /** * getPartitionType LIST/RANGE/UNPARTITIONED * + * @param snapshot * @return */ - PartitionType getPartitionType(); + PartitionType getPartitionType(Optional<MvccSnapshot> snapshot); /** * getPartitionColumnNames * + * @param snapshot * @return * @throws DdlException */ - Set<String> getPartitionColumnNames() throws DdlException; + Set<String> getPartitionColumnNames(Optional<MvccSnapshot> snapshot) throws DdlException; /** * getPartitionColumns * + * @param snapshot * @return */ - List<Column> getPartitionColumns(); + List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot); /** * getPartitionSnapshot @@ -70,12 +76,14 @@ public interface MTMVRelatedTableIf extends TableIf { * If snapshots have already been obtained in bulk in the context, * the results should be obtained directly from the context * + * @param snapshot * @param partitionName * @param context * @return partition snapshot at current time * @throws AnalysisException */ - MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context) throws AnalysisException; + MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, + Optional<MvccSnapshot> snapshot) throws AnalysisException; /** * getTableSnapshot @@ -83,11 +91,13 @@ public interface MTMVRelatedTableIf extends TableIf { * If snapshots have already been obtained in bulk in the context, * the results should be obtained directly from the context * + * @param snapshot * @param context * @return table snapshot at current time * @throws AnalysisException */ - MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws AnalysisException; + MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<MvccSnapshot> snapshot) + throws AnalysisException; /** * Does the current type of table allow timed triggering diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java index 6d58089bed8..403d05f8c18 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java @@ -72,6 +72,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; +import org.apache.commons.collections.MapUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -478,6 +479,13 @@ public class CascadesContext implements ScheduleContext { return tableNames; } + public Map<Long, TableIf> getOrExtractTables(LogicalPlan logicalPlan) { + if (MapUtils.isEmpty(tables)) { + extractTables(logicalPlan); + } + return tables; + } + private Set<List<String>> extractTableNamesFromHaving(LogicalHaving<?> having) { Set<SubqueryExpr> subqueryExprs = having.getPredicate() .collect(SubqueryExpr.class::isInstance); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index 299cd40da17..5cefde11e07 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -200,7 +200,7 @@ public class NereidsPlanner extends Planner { plan = preprocess(plan); initCascadesContext(plan, requireProperties); - + statementContext.loadSnapshots(cascadesContext.getOrExtractTables(plan)); try (Lock lock = new Lock(plan, cascadesContext)) { Plan resultPlan = planWithoutLock(plan, explainLevel, showPlanProcess, requireProperties); lockCallback.accept(resultPlan); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java index 0503637ef95..c6e50df5172 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java @@ -24,6 +24,9 @@ import org.apache.doris.common.FormatOptions; import org.apache.doris.common.Id; import org.apache.doris.common.IdGenerator; import org.apache.doris.common.Pair; +import org.apache.doris.datasource.mvcc.MvccSnapshot; +import org.apache.doris.datasource.mvcc.MvccTable; +import org.apache.doris.datasource.mvcc.MvccTableInfo; import org.apache.doris.nereids.hint.Hint; import org.apache.doris.nereids.memo.Group; import org.apache.doris.nereids.rules.analysis.ColumnAliasGenerator; @@ -166,6 +169,8 @@ public class StatementContext implements Closeable { private List<PlannerHook> plannerHooks = new ArrayList<>(); + private final Map<MvccTableInfo, MvccSnapshot> snapshots = Maps.newHashMap(); + public StatementContext() { this(ConnectContext.get(), null, 0); } @@ -500,6 +505,32 @@ public class StatementContext implements Closeable { this.plannerHooks.add(plannerHook); } + /** + * Load snapshot information of mvcc + * + * @param tables Tables used in queries + */ + public void loadSnapshots(Map<Long, TableIf> tables) { + if (tables == null) { + return; + } + for (TableIf tableIf : tables.values()) { + if (tableIf instanceof MvccTable) { + snapshots.put(new MvccTableInfo(tableIf), ((MvccTable) tableIf).loadSnapshot()); + } + } + } + + /** + * Obtain snapshot information of mvcc + * + * @param mvccTable mvccTable + * @return MvccSnapshot + */ + public MvccSnapshot getSnapshot(MvccTable mvccTable) { + return snapshots.get(new MvccTableInfo(mvccTable)); + } + private static class CloseableResource implements Closeable { public final String resourceName; public final String threadName; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java index d06647ee515..e773d2721a9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java @@ -429,7 +429,6 @@ public class BindRelation extends OneAnalysisRuleFactory { } else { return new LogicalFileScan(unboundRelation.getRelationId(), (HMSExternalTable) table, qualifierWithoutTableName, - ((HMSExternalTable) table).getAllPartitions(), unboundRelation.getTableSample(), unboundRelation.getTableSnapshot()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java index 52d9afa48ea..e1f9a4a6a6a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java @@ -433,13 +433,13 @@ public class MaterializedViewUtils { return null; } MTMVRelatedTableIf relatedTable = (MTMVRelatedTableIf) table; - PartitionType type = relatedTable.getPartitionType(); + PartitionType type = relatedTable.getPartitionType(Optional.empty()); if (PartitionType.UNPARTITIONED.equals(type)) { context.addFailReason(String.format("related base table is not partition table, the table is %s", table.getName())); return null; } - Set<Column> partitionColumnSet = new HashSet<>(relatedTable.getPartitionColumns()); + Set<Column> partitionColumnSet = new HashSet<>(relatedTable.getPartitionColumns(Optional.empty())); Column mvReferenceColumn = contextPartitionColumn.getColumn().get(); Expr definExpr = mvReferenceColumn.getDefineExpr(); if (definExpr instanceof SlotRef) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OneListPartitionEvaluator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OneListPartitionEvaluator.java index b9bdf520e3d..ecf8a267241 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OneListPartitionEvaluator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OneListPartitionEvaluator.java @@ -35,16 +35,16 @@ import java.util.Objects; import java.util.stream.IntStream; /** OneListPartitionInputs */ -public class OneListPartitionEvaluator - extends DefaultExpressionRewriter<Map<Slot, PartitionSlotInput>> implements OnePartitionEvaluator { - private final long partitionId; +public class OneListPartitionEvaluator<K> + extends DefaultExpressionRewriter<Map<Slot, PartitionSlotInput>> implements OnePartitionEvaluator<K> { + private final K partitionIdent; private final List<Slot> partitionSlots; private final ListPartitionItem partitionItem; private final ExpressionRewriteContext expressionRewriteContext; - public OneListPartitionEvaluator(long partitionId, List<Slot> partitionSlots, + public OneListPartitionEvaluator(K partitionIdent, List<Slot> partitionSlots, ListPartitionItem partitionItem, CascadesContext cascadesContext) { - this.partitionId = partitionId; + this.partitionIdent = partitionIdent; this.partitionSlots = Objects.requireNonNull(partitionSlots, "partitionSlots cannot be null"); this.partitionItem = Objects.requireNonNull(partitionItem, "partitionItem cannot be null"); this.expressionRewriteContext = new ExpressionRewriteContext( @@ -52,8 +52,8 @@ public class OneListPartitionEvaluator } @Override - public long getPartitionId() { - return partitionId; + public K getPartitionIdent() { + return partitionIdent; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OnePartitionEvaluator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OnePartitionEvaluator.java index c51252b44a6..8810a04750f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OnePartitionEvaluator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OnePartitionEvaluator.java @@ -25,8 +25,8 @@ import java.util.List; import java.util.Map; /** the evaluator of the partition which represent one partition */ -public interface OnePartitionEvaluator { - long getPartitionId(); +public interface OnePartitionEvaluator<K> { + K getPartitionIdent(); /** * return a slot to expression mapping to replace the input. diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OneRangePartitionEvaluator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OneRangePartitionEvaluator.java index 84a037171f3..1fb8954ab16 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OneRangePartitionEvaluator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OneRangePartitionEvaluator.java @@ -80,10 +80,10 @@ import java.util.function.BiFunction; * * you can see the process steps in the comment of PartitionSlotInput.columnRanges */ -public class OneRangePartitionEvaluator +public class OneRangePartitionEvaluator<K> extends ExpressionVisitor<EvaluateRangeResult, EvaluateRangeInput> - implements OnePartitionEvaluator { - private final long partitionId; + implements OnePartitionEvaluator<K> { + private final K partitionIdent; private final List<Slot> partitionSlots; private final RangePartitionItem partitionItem; private final ExpressionRewriteContext expressionRewriteContext; @@ -95,9 +95,9 @@ public class OneRangePartitionEvaluator private final Map<Slot, PartitionSlotType> slotToType; /** OneRangePartitionEvaluator */ - public OneRangePartitionEvaluator(long partitionId, List<Slot> partitionSlots, + public OneRangePartitionEvaluator(K partitionIdent, List<Slot> partitionSlots, RangePartitionItem partitionItem, CascadesContext cascadesContext, int expandThreshold) { - this.partitionId = partitionId; + this.partitionIdent = partitionIdent; this.partitionSlots = Objects.requireNonNull(partitionSlots, "partitionSlots cannot be null"); this.partitionItem = Objects.requireNonNull(partitionItem, "partitionItem cannot be null"); this.expressionRewriteContext = new ExpressionRewriteContext( @@ -155,8 +155,8 @@ public class OneRangePartitionEvaluator } @Override - public long getPartitionId() { - return partitionId; + public K getPartitionIdent() { + return partitionIdent; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PartitionPruner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PartitionPruner.java index efe12f38cd7..fac1a7f82d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PartitionPruner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PartitionPruner.java @@ -102,21 +102,21 @@ public class PartitionPruner extends DefaultExpressionRewriter<Void> { } /** prune */ - public List<Long> prune() { - Builder<Long> scanPartitionIds = ImmutableList.builder(); + public <K> List<K> prune() { + Builder<K> scanPartitionIdents = ImmutableList.builder(); for (OnePartitionEvaluator partition : partitions) { if (!canBePrunedOut(partition)) { - scanPartitionIds.add(partition.getPartitionId()); + scanPartitionIdents.add((K) partition.getPartitionIdent()); } } - return scanPartitionIds.build(); + return scanPartitionIdents.build(); } /** * prune partition with `idToPartitions` as parameter. */ - public static List<Long> prune(List<Slot> partitionSlots, Expression partitionPredicate, - Map<Long, PartitionItem> idToPartitions, CascadesContext cascadesContext, + public static <K> List<K> prune(List<Slot> partitionSlots, Expression partitionPredicate, + Map<K, PartitionItem> idToPartitions, CascadesContext cascadesContext, PartitionTableType partitionTableType) { partitionPredicate = PartitionPruneExpressionExtractor.extract( partitionPredicate, ImmutableSet.copyOf(partitionSlots), cascadesContext); @@ -135,7 +135,7 @@ public class PartitionPruner extends DefaultExpressionRewriter<Void> { } List<OnePartitionEvaluator> evaluators = Lists.newArrayListWithCapacity(idToPartitions.size()); - for (Entry<Long, PartitionItem> kv : idToPartitions.entrySet()) { + for (Entry<K, PartitionItem> kv : idToPartitions.entrySet()) { evaluators.add(toPartitionEvaluator( kv.getKey(), kv.getValue(), partitionSlots, cascadesContext, expandThreshold)); } @@ -147,7 +147,7 @@ public class PartitionPruner extends DefaultExpressionRewriter<Void> { /** * convert partition item to partition evaluator */ - public static final OnePartitionEvaluator toPartitionEvaluator(long id, PartitionItem partitionItem, + public static final <K> OnePartitionEvaluator<K> toPartitionEvaluator(K id, PartitionItem partitionItem, List<Slot> partitionSlots, CascadesContext cascadesContext, int expandThreshold) { if (partitionItem instanceof ListPartitionItem) { return new OneListPartitionEvaluator( diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/UnknownPartitionEvaluator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/UnknownPartitionEvaluator.java index ae313ca09de..394182a1311 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/UnknownPartitionEvaluator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/UnknownPartitionEvaluator.java @@ -28,18 +28,18 @@ import java.util.List; import java.util.Map; /** UnknownPartitionEvaluator */ -public class UnknownPartitionEvaluator implements OnePartitionEvaluator { - private final long partitionId; +public class UnknownPartitionEvaluator<K> implements OnePartitionEvaluator<K> { + private final K partitionIdent; private final PartitionItem partitionItem; - public UnknownPartitionEvaluator(long partitionId, PartitionItem partitionItem) { - this.partitionId = partitionId; + public UnknownPartitionEvaluator(K partitionId, PartitionItem partitionItem) { + this.partitionIdent = partitionId; this.partitionItem = partitionItem; } @Override - public long getPartitionId() { - return partitionId; + public K getPartitionIdent() { + return partitionIdent; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java index 2de4efab2ff..4bbb0a8aa76 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java @@ -19,8 +19,6 @@ package org.apache.doris.nereids.rules.rewrite; import org.apache.doris.catalog.PartitionItem; import org.apache.doris.datasource.ExternalTable; -import org.apache.doris.datasource.hive.HMSExternalTable; -import org.apache.doris.datasource.hive.HMSExternalTable.DLAType; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; @@ -38,6 +36,7 @@ import org.apache.commons.collections.CollectionUtils; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; @@ -60,10 +59,8 @@ public class PruneFileScanPartition extends OneRewriteRuleFactory { ExternalTable tbl = scan.getTable(); SelectedPartitions selectedPartitions; - // TODO(cmy): support other external table - if (tbl instanceof HMSExternalTable && ((HMSExternalTable) tbl).getDlaType() == DLAType.HIVE) { - HMSExternalTable hiveTbl = (HMSExternalTable) tbl; - selectedPartitions = pruneHivePartitions(hiveTbl, filter, scan, ctx.cascadesContext); + if (tbl.supportPartitionPruned()) { + selectedPartitions = pruneExternalPartitions(tbl, filter, scan, ctx.cascadesContext); } else { // set isPruned so that it won't go pass the partition prune again selectedPartitions = new SelectedPartitions(0, ImmutableMap.of(), true); @@ -74,10 +71,11 @@ public class PruneFileScanPartition extends OneRewriteRuleFactory { }).toRule(RuleType.FILE_SCAN_PARTITION_PRUNE); } - private SelectedPartitions pruneHivePartitions(HMSExternalTable hiveTbl, + private SelectedPartitions pruneExternalPartitions(ExternalTable externalTable, LogicalFilter<LogicalFileScan> filter, LogicalFileScan scan, CascadesContext ctx) { - Map<Long, PartitionItem> selectedPartitionItems = Maps.newHashMap(); - if (CollectionUtils.isEmpty(hiveTbl.getPartitionColumns())) { + Map<String, PartitionItem> selectedPartitionItems = Maps.newHashMap(); + // todo: real snapshotId + if (CollectionUtils.isEmpty(externalTable.getPartitionColumns(Optional.empty()))) { // non partitioned table, return NOT_PRUNED. // non partition table will be handled in HiveScanNode. return SelectedPartitions.NOT_PRUNED; @@ -85,19 +83,19 @@ public class PruneFileScanPartition extends OneRewriteRuleFactory { Map<String, Slot> scanOutput = scan.getOutput() .stream() .collect(Collectors.toMap(slot -> slot.getName().toLowerCase(), Function.identity())); - - List<Slot> partitionSlots = hiveTbl.getPartitionColumns() + // todo: real snapshotId + List<Slot> partitionSlots = externalTable.getPartitionColumns(Optional.empty()) .stream() .map(column -> scanOutput.get(column.getName().toLowerCase())) .collect(Collectors.toList()); - Map<Long, PartitionItem> idToPartitionItem = scan.getSelectedPartitions().selectedPartitions; - List<Long> prunedPartitions = new ArrayList<>(PartitionPruner.prune( - partitionSlots, filter.getPredicate(), idToPartitionItem, ctx, PartitionTableType.HIVE)); + Map<String, PartitionItem> nameToPartitionItem = scan.getSelectedPartitions().selectedPartitions; + List<String> prunedPartitions = new ArrayList<>(PartitionPruner.prune( + partitionSlots, filter.getPredicate(), nameToPartitionItem, ctx, PartitionTableType.HIVE)); - for (Long id : prunedPartitions) { - selectedPartitionItems.put(id, idToPartitionItem.get(id)); + for (String name : prunedPartitions) { + selectedPartitionItems.put(name, nameToPartitionItem.get(name)); } - return new SelectedPartitions(idToPartitionItem.size(), selectedPartitionItems, true); + return new SelectedPartitions(nameToPartitionItem.size(), selectedPartitionItems, true); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java index be87e6c71f2..837d9478c89 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java @@ -351,7 +351,7 @@ public class CreateMTMVInfo { allPartitionDescs.size(), ctx.getSessionVariable().getCreateTablePartitionMaxNum())); } try { - PartitionType type = relatedTable.getPartitionType(); + PartitionType type = relatedTable.getPartitionType(Optional.empty()); if (type == PartitionType.RANGE) { return new RangePartitionDesc(Lists.newArrayList(mvPartitionInfo.getPartitionCol()), allPartitionDescs); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/MTMVPartitionDefinition.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/MTMVPartitionDefinition.java index 427e2368e7a..c4117e8608e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/MTMVPartitionDefinition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/MTMVPartitionDefinition.java @@ -54,6 +54,7 @@ import org.apache.doris.qe.SessionVariable; import com.google.common.collect.Sets; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -147,7 +148,7 @@ public class MTMVPartitionDefinition { MTMVRelatedTableIf mtmvBaseRealtedTable = MTMVUtil.getRelatedTable(relatedTableInfo.getTableInfo()); Set<String> partitionColumnNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER); try { - partitionColumnNames.addAll(mtmvBaseRealtedTable.getPartitionColumnNames()); + partitionColumnNames.addAll(mtmvBaseRealtedTable.getPartitionColumnNames(Optional.empty())); } catch (DdlException e) { throw new AnalysisException(e.getMessage(), e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java index 0a2c69b68c1..96b8e032d11 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java @@ -59,17 +59,11 @@ public class LogicalFileScan extends LogicalCatalogRelation { this.tableSnapshot = tableSnapshot; } - public LogicalFileScan(RelationId id, ExternalTable table, List<String> qualifier, - SelectedPartitions selectedPartitions, - Optional<TableSample> tableSample, Optional<TableSnapshot> tableSnapshot) { - this(id, table, qualifier, Optional.empty(), Optional.empty(), - selectedPartitions, tableSample, tableSnapshot); - } - public LogicalFileScan(RelationId id, ExternalTable table, List<String> qualifier, Optional<TableSample> tableSample, Optional<TableSnapshot> tableSnapshot) { + // todo: real snapshotId this(id, table, qualifier, Optional.empty(), Optional.empty(), - SelectedPartitions.NOT_PRUNED, tableSample, tableSnapshot); + table.initSelectedPartitions(Optional.empty()), tableSample, tableSnapshot); } public SelectedPartitions getSelectedPartitions() { @@ -147,9 +141,9 @@ public class LogicalFileScan extends LogicalCatalogRelation { */ public final long totalPartitionNum; /** - * partition id -> partition item + * partition name -> partition item */ - public final Map<Long, PartitionItem> selectedPartitions; + public final Map<String, PartitionItem> selectedPartitions; /** * true means the result is after partition pruning * false means the partition pruning is not processed. @@ -159,7 +153,7 @@ public class LogicalFileScan extends LogicalCatalogRelation { /** * Constructor for SelectedPartitions. */ - public SelectedPartitions(long totalPartitionNum, Map<Long, PartitionItem> selectedPartitions, + public SelectedPartitions(long totalPartitionNum, Map<String, PartitionItem> selectedPartitions, boolean isPruned) { this.totalPartitionNum = totalPartitionNum; this.selectedPartitions = ImmutableMap.copyOf(Objects.requireNonNull(selectedPartitions, diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java index 997385742dc..e5d2e21a8db 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java @@ -35,6 +35,7 @@ import org.junit.Before; import org.junit.Test; import java.util.List; +import java.util.Optional; import java.util.Set; public class MTMVPartitionUtilTest { @@ -112,7 +113,7 @@ public class MTMVPartitionUtilTest { minTimes = 0; result = true; - baseOlapTable.getTableSnapshot((MTMVRefreshContext) any); + baseOlapTable.getTableSnapshot((MTMVRefreshContext) any, (Optional) any); minTimes = 0; result = baseSnapshotIf; @@ -132,7 +133,7 @@ public class MTMVPartitionUtilTest { minTimes = 0; result = true; - baseOlapTable.getPartitionSnapshot(anyString, (MTMVRefreshContext) any); + baseOlapTable.getPartitionSnapshot(anyString, (MTMVRefreshContext) any, (Optional) any); minTimes = 0; result = baseSnapshotIf; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org