This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5275202c176 [enhance](mtmv) mtmv disable hive auto refresh (#30775)
5275202c176 is described below
commit 5275202c1760e6587dde5ff27e0faf24123388f9
Author: zhangdong <[email protected]>
AuthorDate: Mon Feb 5 11:40:10 2024 +0800
[enhance](mtmv) mtmv disable hive auto refresh (#30775)
- If the `related table` is `hive`, do not refresh automatically
- If the `related table` is `hive`, the partition col is allowed to be
`null`. Otherwise, it must be `not null`
- add more `ut`
---
.../java/org/apache/doris/catalog/OlapTable.java | 9 +
.../doris/catalog/external/HMSExternalTable.java | 10 +
.../doris/common/proc/PartitionsProcDir.java | 4 +-
.../apache/doris/common/proc/TablesProcDir.java | 3 +-
.../apache/doris/job/extensions/mtmv/MTMVTask.java | 36 +-
.../mtmv/{MTMVUtil.java => MTMVPartitionUtil.java} | 105 +----
.../org/apache/doris/mtmv/MTMVRelatedTableIf.java | 15 +
.../org/apache/doris/mtmv/MTMVRelationManager.java | 3 +-
.../org/apache/doris/mtmv/MTMVRewriteUtil.java | 87 ++++
.../java/org/apache/doris/mtmv/MTMVService.java | 4 +-
.../main/java/org/apache/doris/mtmv/MTMVUtil.java | 452 +--------------------
.../mv/AbstractMaterializedViewRule.java | 6 +-
.../exploration/mv/MaterializedViewUtils.java | 2 +-
.../trees/plans/commands/info/RefreshMTMVInfo.java | 4 +-
.../doris/tablefunction/MetadataGenerator.java | 4 +-
.../apache/doris/mtmv/MTMVPartitionUtilTest.java | 187 +++++++++
.../apache/doris/mtmv/MTMVRefreshSnapshotTest.java | 96 +++++
.../org/apache/doris/mtmv/MTMVRewriteUtilTest.java | 254 ++++++++++++
.../java/org/apache/doris/mtmv/MTMVTaskTest.java | 163 ++++++++
.../exploration/mv/MaterializedViewUtilsTest.java | 2 -
regression-test/data/mtmv_p0/test_hive_mtmv.out | 5 +
.../suites/mtmv_p0/test_hive_mtmv.groovy | 8 +
22 files changed, 892 insertions(+), 567 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 2e01f37639b..0c0ce320f11 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -2632,4 +2632,13 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf {
return getPartitionOrAnalysisException(partitionId).getName();
}
+ @Override
+ public boolean needAutoRefresh() {
+ return true;
+ }
+
+ @Override
+ public boolean isPartitionColumnAllowNull() {
+ return false;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
index aa4258baadb..c31ba11a0fd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
@@ -885,6 +885,16 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
((ListPartitionItem)
item).getItems().get(0).getPartitionValuesAsStringListForHive());
return partitionValuesList;
}
+
+ @Override
+ public boolean needAutoRefresh() {
+ return false;
+ }
+
+ @Override
+ public boolean isPartitionColumnAllowNull() {
+ return true;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
index 01feaf23683..4703429fa18 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
@@ -45,7 +45,7 @@ import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.ListComparator;
import org.apache.doris.common.util.OrderByPair;
import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.mtmv.MTMVUtil;
+import org.apache.doris.mtmv.MTMVPartitionUtil;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@@ -309,7 +309,7 @@ public class PartitionsProcDir implements ProcDirInterface {
partitionInfo.add(tblPartitionInfo.getIsMutable(partitionId));
if (olapTable instanceof MTMV) {
try {
- List<String> partitionUnSyncTables = MTMVUtil
+ List<String> partitionUnSyncTables = MTMVPartitionUtil
.getPartitionUnSyncTables((MTMV) olapTable,
partitionId);
partitionInfo.add(CollectionUtils.isEmpty(partitionUnSyncTables));
partitionInfo.add(partitionUnSyncTables.toString());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TablesProcDir.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TablesProcDir.java
index b3ce9be35c9..c2926c829ad 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TablesProcDir.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TablesProcDir.java
@@ -23,7 +23,6 @@ import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.RangePartitionInfo;
import org.apache.doris.catalog.TableIf;
-import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.ListComparator;
@@ -91,7 +90,7 @@ public class TablesProcDir implements ProcDirInterface {
String partitionKey = FeConstants.null_string;
table.readLock();
try {
- if (table.getType() == TableType.OLAP) {
+ if (table instanceof OlapTable) {
OlapTable olapTable = (OlapTable) table;
if (olapTable.getPartitionInfo().getType() ==
PartitionType.RANGE) {
partitionNum = olapTable.getPartitions().size();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
index 194172a6732..6a861200e69 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
@@ -18,17 +18,14 @@
package org.apache.doris.job.extensions.mtmv;
import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.TableIf;
-import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.TimeUtils;
@@ -36,6 +33,7 @@ import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
+import org.apache.doris.mtmv.MTMVPartitionUtil;
import org.apache.doris.mtmv.MTMVPlanUtil;
import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod;
import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot;
@@ -145,6 +143,13 @@ public class MTMVTask extends AbstractTask {
this.taskContext = Objects.requireNonNull(taskContext);
}
+ // only for test
+ public MTMVTask(MTMV mtmv, MTMVRelation relation, MTMVTaskContext
taskContext) {
+ this.mtmv = mtmv;
+ this.relation = relation;
+ this.taskContext = taskContext;
+ }
+
@Override
public void run() throws JobException {
LOG.info("mtmv task run, taskId: {}", super.getTaskId());
@@ -161,10 +166,10 @@ public class MTMVTask extends AbstractTask {
// To be completely consistent with hive, you need to manually
refresh the cache
// refreshHmsTable();
if (mtmv.getMvPartitionInfo().getPartitionType() ==
MTMVPartitionType.FOLLOW_BASE_TABLE) {
- MTMVUtil.alignMvPartition(mtmv,
mtmv.getMvPartitionInfo().getRelatedTable());
+ MTMVPartitionUtil.alignMvPartition(mtmv,
mtmv.getMvPartitionInfo().getRelatedTable());
}
List<Long> needRefreshPartitionIds =
calculateNeedRefreshPartitions();
- this.needRefreshPartitions = MTMVUtil.getPartitionNamesByIds(mtmv,
needRefreshPartitionIds);
+ this.needRefreshPartitions =
MTMVPartitionUtil.getPartitionNamesByIds(mtmv, needRefreshPartitionIds);
this.refreshMode = generateRefreshMode(needRefreshPartitionIds);
if (refreshMode == MTMVTaskRefreshMode.NOT_REFRESH) {
return;
@@ -181,8 +186,8 @@ public class MTMVTask extends AbstractTask {
Set<Long> execPartitionIds =
Sets.newHashSet(needRefreshPartitionIds
.subList(start, end > needRefreshPartitionIds.size() ?
needRefreshPartitionIds.size() : end));
// need get names before exec
- List<String> execPartitionNames =
MTMVUtil.getPartitionNamesByIds(mtmv, execPartitionIds);
- Map<String, MTMVRefreshPartitionSnapshot>
execPartitionSnapshots = MTMVUtil
+ List<String> execPartitionNames =
MTMVPartitionUtil.getPartitionNamesByIds(mtmv, execPartitionIds);
+ Map<String, MTMVRefreshPartitionSnapshot>
execPartitionSnapshots = MTMVPartitionUtil
.generatePartitionSnapshots(mtmv,
relation.getBaseTables(), execPartitionIds);
exec(ctx, execPartitionIds, tableWithPartKey);
completedPartitions.addAll(execPartitionNames);
@@ -241,7 +246,7 @@ public class MTMVTask extends AbstractTask {
LOG.info("mtmv task before, taskId: {}", super.getTaskId());
super.before();
try {
- mtmv = getMTMV();
+ mtmv = MTMVUtil.getMTMV(dbId, mtmvId);
} catch (UserException e) {
LOG.warn("before task failed:", e);
throw new JobException(e);
@@ -267,11 +272,6 @@ public class MTMVTask extends AbstractTask {
}
}
- private MTMV getMTMV() throws DdlException, MetaNotFoundException {
- Database db =
Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
- return (MTMV) db.getTableOrMetaException(mtmvId,
TableType.MATERIALIZED_VIEW);
- }
-
@Override
public void runTask() throws JobException {
LOG.info("mtmv task runTask, taskId: {}", super.getTaskId());
@@ -296,7 +296,7 @@ public class MTMVTask extends AbstractTask {
String dbName = "";
String mvName = "";
try {
- MTMV mtmv = getMTMV();
+ MTMV mtmv = MTMVUtil.getMTMV(dbId, mtmvId);
dbName = mtmv.getQualifiedDbName();
mvName = mtmv.getName();
} catch (UserException e) {
@@ -386,20 +386,20 @@ public class MTMVTask extends AbstractTask {
}
}
- private List<Long> calculateNeedRefreshPartitions() throws
AnalysisException {
+ public List<Long> calculateNeedRefreshPartitions() throws
AnalysisException {
// check whether the user manually triggers it
if (taskContext.getTriggerMode() == MTMVTaskTriggerMode.MANUAL) {
if (taskContext.isComplete()) {
return mtmv.getPartitionIds();
} else if (!CollectionUtils
.isEmpty(taskContext.getPartitions())) {
- return MTMVUtil.getPartitionsIdsByNames(mtmv,
taskContext.getPartitions());
+ return MTMVPartitionUtil.getPartitionsIdsByNames(mtmv,
taskContext.getPartitions());
}
}
// check if data is fresh
// We need to use a newly generated relationship and cannot retrieve
it using mtmv.getRelation()
// to avoid rebuilding the baseTable and causing a change in the
tableId
- boolean fresh = MTMVUtil.isMTMVSync(mtmv, relation.getBaseTables(),
mtmv.getExcludedTriggerTables());
+ boolean fresh = MTMVPartitionUtil.isMTMVSync(mtmv,
relation.getBaseTables(), mtmv.getExcludedTriggerTables());
if (fresh) {
return Lists.newArrayList();
}
@@ -413,7 +413,7 @@ public class MTMVTask extends AbstractTask {
}
// We need to use a newly generated relationship and cannot retrieve
it using mtmv.getRelation()
// to avoid rebuilding the baseTable and causing a change in the
tableId
- return MTMVUtil.getMTMVNeedRefreshPartitions(mtmv,
relation.getBaseTables());
+ return MTMVPartitionUtil.getMTMVNeedRefreshPartitions(mtmv,
relation.getBaseTables());
}
public MTMVTaskContext getTaskContext() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java
similarity index 83%
copy from fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
copy to fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java
index 74593e5def5..a8e3e11869d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java
@@ -29,11 +29,7 @@ import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
-import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
-import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState;
-import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
-import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -48,23 +44,8 @@ import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
-public class MTMVUtil {
- private static final Logger LOG = LogManager.getLogger(MTMVUtil.class);
-
- /**
- * get Table by BaseTableInfo
- *
- * @param baseTableInfo
- * @return
- * @throws AnalysisException
- */
- public static TableIf getTable(BaseTableInfo baseTableInfo) throws
AnalysisException {
- TableIf table = Env.getCurrentEnv().getCatalogMgr()
- .getCatalogOrAnalysisException(baseTableInfo.getCtlId())
- .getDbOrAnalysisException(baseTableInfo.getDbId())
- .getTableOrAnalysisException(baseTableInfo.getTableId());
- return table;
- }
+public class MTMVPartitionUtil {
+ private static final Logger LOG =
LogManager.getLogger(MTMVPartitionUtil.class);
/**
* Determine whether the partition is sync with retated partition and
other baseTables
@@ -76,7 +57,7 @@ public class MTMVUtil {
* @return
* @throws AnalysisException
*/
- private static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId,
Set<BaseTableInfo> tables,
+ public static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId,
Set<BaseTableInfo> tables,
Set<String> excludedTriggerTables) throws AnalysisException {
boolean isSyncWithPartition = true;
if (mtmv.getMvPartitionInfo().getPartitionType() ==
MTMVPartitionType.FOLLOW_BASE_TABLE) {
@@ -192,11 +173,14 @@ public class MTMVUtil {
public static List<String> getPartitionUnSyncTables(MTMV mtmv, Long
partitionId) throws AnalysisException {
List<String> res = Lists.newArrayList();
for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTables())
{
- TableIf table = getTable(baseTableInfo);
+ TableIf table = MTMVUtil.getTable(baseTableInfo);
if (!(table instanceof MTMVRelatedTableIf)) {
continue;
}
MTMVRelatedTableIf mtmvRelatedTableIf = (MTMVRelatedTableIf) table;
+ if (!mtmvRelatedTableIf.needAutoRefresh()) {
+ continue;
+ }
if (mtmv.getMvPartitionInfo().getPartitionType() ==
MTMVPartitionType.FOLLOW_BASE_TABLE && mtmv
.getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) {
PartitionItem item =
mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId);
@@ -220,55 +204,6 @@ public class MTMVUtil {
return res;
}
- /**
- * Determine which partition of mtmv can be rewritten
- *
- * @param mtmv
- * @param ctx
- * @return
- */
- public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv,
ConnectContext ctx) {
- List<Partition> res = Lists.newArrayList();
- Collection<Partition> allPartitions = mtmv.getPartitions();
- // check session variable if enable rewrite
- if (!ctx.getSessionVariable().isEnableMaterializedViewRewrite()) {
- return res;
- }
- if (mtmvContainsExternalTable(mtmv) && !ctx.getSessionVariable()
- .isMaterializedViewRewriteEnableContainExternalTable()) {
- return res;
- }
-
- MTMVRelation mtmvRelation = mtmv.getRelation();
- if (mtmvRelation == null) {
- return res;
- }
- // check mv is normal
- if (!(mtmv.getStatus().getState() == MTMVState.NORMAL
- && mtmv.getStatus().getRefreshState() ==
MTMVRefreshState.SUCCESS)) {
- return res;
- }
- // check gracePeriod
- long gracePeriodMills = mtmv.getGracePeriod();
- long currentTimeMills = System.currentTimeMillis();
- for (Partition partition : allPartitions) {
- if (gracePeriodMills > 0 && currentTimeMills <=
(partition.getVisibleVersionTime()
- + gracePeriodMills)) {
- res.add(partition);
- continue;
- }
- try {
- if (isMTMVPartitionSync(mtmv, partition.getId(),
mtmvRelation.getBaseTables(), Sets.newHashSet())) {
- res.add(partition);
- }
- } catch (AnalysisException e) {
- // ignore it
- LOG.warn("check isMTMVPartitionSync failed", e);
- }
- }
- return res;
- }
-
/**
* Get the partitions that need to be refreshed
*
@@ -303,9 +238,12 @@ public class MTMVUtil {
* @return
* @throws AnalysisException
*/
- private static boolean isSyncWithPartition(MTMV mtmv, Long mtmvPartitionId,
+ public static boolean isSyncWithPartition(MTMV mtmv, Long mtmvPartitionId,
MTMVRelatedTableIf relatedTable,
Long relatedPartitionId) throws AnalysisException {
+ if (!relatedTable.needAutoRefresh()) {
+ return true;
+ }
MTMVSnapshotIf relatedPartitionCurrentSnapshot = relatedTable
.getPartitionSnapshot(relatedPartitionId);
String relatedPartitionName =
relatedTable.getPartitionName(relatedPartitionId);
@@ -320,7 +258,7 @@ public class MTMVUtil {
* @param desc
* @return
*/
- private static String generatePartitionName(PartitionKeyDesc desc) {
+ public static String generatePartitionName(PartitionKeyDesc desc) {
String partitionName = "p_";
partitionName +=
desc.toSql().trim().replaceAll("\\(|\\)|\\-|\\[|\\]|'|\\s+", "")
.replaceAll("\\(|\\)|\\,|\\[|\\]", "_");
@@ -401,7 +339,7 @@ public class MTMVUtil {
for (BaseTableInfo baseTableInfo : tables) {
TableIf table = null;
try {
- table = getTable(baseTableInfo);
+ table = MTMVUtil.getTable(baseTableInfo);
} catch (AnalysisException e) {
LOG.warn("get table failed, {}", baseTableInfo, e);
return false;
@@ -421,7 +359,7 @@ public class MTMVUtil {
throws AnalysisException {
TableIf table = null;
try {
- table = getTable(baseTableInfo);
+ table = MTMVUtil.getTable(baseTableInfo);
} catch (AnalysisException e) {
LOG.warn("get table failed, {}", baseTableInfo, e);
return false;
@@ -433,22 +371,15 @@ public class MTMVUtil {
return true;
}
MTMVRelatedTableIf baseTable = (MTMVRelatedTableIf) table;
+ if (!baseTable.needAutoRefresh()) {
+ return true;
+ }
MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot();
String mtmvPartitionName = mtmv.getPartitionName(mtmvPartitionId);
return mtmv.getRefreshSnapshot()
.equalsWithBaseTable(mtmvPartitionName, baseTable.getId(),
baseTableCurrentSnapshot);
}
- private static boolean mtmvContainsExternalTable(MTMV mtmv) {
- Set<BaseTableInfo> baseTables = mtmv.getRelation().getBaseTables();
- for (BaseTableInfo baseTableInfo : baseTables) {
- if (baseTableInfo.getCtlId() !=
InternalCatalog.INTERNAL_CATALOG_ID) {
- return true;
- }
- }
- return false;
- }
-
public static Map<String, MTMVRefreshPartitionSnapshot>
generatePartitionSnapshots(MTMV mtmv,
Set<BaseTableInfo> baseTables, Set<Long> partitionIds)
throws AnalysisException {
@@ -482,7 +413,7 @@ public class MTMVUtil {
.getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) {
continue;
}
- TableIf table = getTable(baseTableInfo);
+ TableIf table = MTMVUtil.getTable(baseTableInfo);
if (!(table instanceof MTMVRelatedTableIf)) {
continue;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java
index 51773db0df1..46454679b56 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java
@@ -87,4 +87,19 @@ public interface MTMVRelatedTableIf extends TableIf {
* @throws AnalysisException
*/
String getPartitionName(long partitionId) throws AnalysisException;
+
+ /**
+ * Does the current type of table allow timed triggering
+ *
+ * @return If return false,The method of comparing whether to synchronize
will directly return true,
+ * otherwise the snapshot information will be compared
+ */
+ boolean needAutoRefresh();
+
+ /**
+ * if allow partition column `isAllowNull`
+ *
+ * @return
+ */
+ boolean isPartitionColumnAllowNull();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java
index 77414e4fc88..aa7ffd2426d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java
@@ -71,7 +71,8 @@ public class MTMVRelationManager implements MTMVHookService {
for (BaseTableInfo tableInfo : mvInfos) {
try {
MTMV mtmv = (MTMV) MTMVUtil.getTable(tableInfo);
- if
(!CollectionUtils.isEmpty(MTMVUtil.getMTMVCanRewritePartitions(mtmv, ctx))) {
+ if (!CollectionUtils
+
.isEmpty(MTMVRewriteUtil.getMTMVCanRewritePartitions(mtmv, ctx,
System.currentTimeMillis()))) {
res.add(mtmv);
}
} catch (AnalysisException e) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java
new file mode 100644
index 00000000000..666a79eba97
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState;
+import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collection;
+import java.util.List;
+
+public class MTMVRewriteUtil {
+ private static final Logger LOG =
LogManager.getLogger(MTMVRewriteUtil.class);
+
+ /**
+ * Determine which partition of mtmv can be rewritten
+ *
+ * @param mtmv
+ * @param ctx
+ * @return
+ */
+ public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv,
ConnectContext ctx,
+ long currentTimeMills) {
+ List<Partition> res = Lists.newArrayList();
+ Collection<Partition> allPartitions = mtmv.getPartitions();
+ // check session variable if enable rewrite
+ if (!ctx.getSessionVariable().isEnableMaterializedViewRewrite()) {
+ return res;
+ }
+ if (MTMVUtil.mtmvContainsExternalTable(mtmv) &&
!ctx.getSessionVariable()
+ .isMaterializedViewRewriteEnableContainExternalTable()) {
+ return res;
+ }
+
+ MTMVRelation mtmvRelation = mtmv.getRelation();
+ if (mtmvRelation == null) {
+ return res;
+ }
+ // check mv is normal
+ if (!(mtmv.getStatus().getState() == MTMVState.NORMAL
+ && mtmv.getStatus().getRefreshState() ==
MTMVRefreshState.SUCCESS)) {
+ return res;
+ }
+ // check gracePeriod
+ long gracePeriodMills = mtmv.getGracePeriod();
+ for (Partition partition : allPartitions) {
+ if (gracePeriodMills > 0 && currentTimeMills <=
(partition.getVisibleVersionTime()
+ + gracePeriodMills)) {
+ res.add(partition);
+ continue;
+ }
+ try {
+ if (MTMVPartitionUtil.isMTMVPartitionSync(mtmv,
partition.getId(), mtmvRelation.getBaseTables(),
+ Sets.newHashSet())) {
+ res.add(partition);
+ }
+ } catch (AnalysisException e) {
+ // ignore it
+ LOG.warn("check isMTMVPartitionSync failed", e);
+ }
+ }
+ return res;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java
index 6abb22f3e5f..227166e56d3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java
@@ -84,10 +84,10 @@ public class MTMVService {
public void createMTMV(MTMV mtmv) throws DdlException, AnalysisException {
Objects.requireNonNull(mtmv);
+ LOG.info("createMTMV: " + mtmv.getName());
if (mtmv.getMvPartitionInfo().getPartitionType() ==
MTMVPartitionType.FOLLOW_BASE_TABLE) {
- MTMVUtil.alignMvPartition(mtmv,
mtmv.getMvPartitionInfo().getRelatedTable());
+ MTMVPartitionUtil.alignMvPartition(mtmv,
mtmv.getMvPartitionInfo().getRelatedTable());
}
- LOG.info("createMTMV: " + mtmv.getName());
for (MTMVHookService mtmvHookService : hooks.values()) {
mtmvHookService.createMTMV(mtmv);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
index 74593e5def5..3b97e35141d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
@@ -17,39 +17,19 @@
package org.apache.doris.mtmv;
-import org.apache.doris.analysis.AddPartitionClause;
-import org.apache.doris.analysis.DropPartitionClause;
-import org.apache.doris.analysis.PartitionKeyDesc;
-import org.apache.doris.analysis.SinglePartitionDesc;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
-import org.apache.doris.catalog.Partition;
-import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.datasource.InternalCatalog;
-import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
-import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState;
-import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
-import org.apache.doris.qe.ConnectContext;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
import java.util.Set;
public class MTMVUtil {
- private static final Logger LOG = LogManager.getLogger(MTMVUtil.class);
/**
* get Table by BaseTableInfo
@@ -66,380 +46,18 @@ public class MTMVUtil {
return table;
}
- /**
- * Determine whether the partition is sync with retated partition and
other baseTables
- *
- * @param mtmv
- * @param partitionId
- * @param tables
- * @param excludedTriggerTables
- * @return
- * @throws AnalysisException
- */
- private static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId,
Set<BaseTableInfo> tables,
- Set<String> excludedTriggerTables) throws AnalysisException {
- boolean isSyncWithPartition = true;
- if (mtmv.getMvPartitionInfo().getPartitionType() ==
MTMVPartitionType.FOLLOW_BASE_TABLE) {
- MTMVRelatedTableIf relatedTable =
mtmv.getMvPartitionInfo().getRelatedTable();
- // if follow base table, not need compare with related table, only
should compare with related partition
- excludedTriggerTables.add(relatedTable.getName());
- PartitionItem item =
mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId);
- Map<Long, PartitionItem> relatedPartitionItems =
relatedTable.getPartitionItems();
- long relatedPartitionId = getExistPartitionId(item,
- relatedPartitionItems);
- if (relatedPartitionId == -1L) {
- LOG.warn("can not found related partition: " + partitionId);
- return false;
- }
- isSyncWithPartition = isSyncWithPartition(mtmv, partitionId,
relatedTable, relatedPartitionId);
- }
- return isSyncWithPartition && isSyncWithAllBaseTables(mtmv,
partitionId, tables, excludedTriggerTables);
-
- }
-
- /**
- * Align the partitions of mtmv and related tables, delete more and add
less
- *
- * @param mtmv
- * @param relatedTable
- * @throws DdlException
- * @throws AnalysisException
- */
- public static void alignMvPartition(MTMV mtmv, MTMVRelatedTableIf
relatedTable)
- throws DdlException, AnalysisException {
- Map<Long, PartitionItem> relatedTableItems =
relatedTable.getPartitionItems();
- Map<Long, PartitionItem> mtmvItems = mtmv.getPartitionItems();
- // drop partition of mtmv
- for (Entry<Long, PartitionItem> entry : mtmvItems.entrySet()) {
- long partitionId = getExistPartitionId(entry.getValue(),
relatedTableItems);
- if (partitionId == -1L) {
- dropPartition(mtmv, entry.getKey());
- }
- }
- // add partition for mtmv
- for (Entry<Long, PartitionItem> entry : relatedTableItems.entrySet()) {
- long partitionId = getExistPartitionId(entry.getValue(),
mtmvItems);
- if (partitionId == -1L) {
- addPartition(mtmv, entry.getValue());
- }
- }
- }
-
- public static List<String> getPartitionNamesByIds(MTMV mtmv,
Collection<Long> ids) throws AnalysisException {
- List<String> res = Lists.newArrayList();
- for (Long partitionId : ids) {
-
res.add(mtmv.getPartitionOrAnalysisException(partitionId).getName());
- }
- return res;
- }
-
- public static List<Long> getPartitionsIdsByNames(MTMV mtmv, List<String>
partitions) throws AnalysisException {
- List<Long> res = Lists.newArrayList();
- for (String partitionName : partitions) {
- Partition partition =
mtmv.getPartitionOrAnalysisException(partitionName);
- res.add(partition.getId());
- }
- return res;
- }
-
- /**
- * check if table is sync with all baseTables
- *
- * @param mtmv
- * @return
- */
- public static boolean isMTMVSync(MTMV mtmv) {
- MTMVRelation mtmvRelation = mtmv.getRelation();
- if (mtmvRelation == null) {
- return false;
- }
- try {
- return isMTMVSync(mtmv, mtmvRelation.getBaseTables(),
Sets.newHashSet());
- } catch (AnalysisException e) {
- LOG.warn("isMTMVSync failed: ", e);
- return false;
- }
- }
-
- /**
- * Determine whether the mtmv is sync with tables
- *
- * @param mtmv
- * @param tables
- * @param excludeTables
- * @return
- * @throws AnalysisException
- */
- public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables,
Set<String> excludeTables)
- throws AnalysisException {
- Collection<Partition> partitions = mtmv.getPartitions();
- for (Partition partition : partitions) {
- if (!isMTMVPartitionSync(mtmv, partition.getId(), tables,
excludeTables)) {
- return false;
- }
- }
- return true;
+ public static MTMV getMTMV(long dbId, long mtmvId) throws DdlException,
MetaNotFoundException {
+ Database db =
Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
+ return (MTMV) db.getTableOrMetaException(mtmvId,
TableType.MATERIALIZED_VIEW);
}
/**
- * get not sync tables
+ * if base tables of mtmv contains external table
*
* @param mtmv
- * @param partitionId
* @return
- * @throws AnalysisException
*/
- public static List<String> getPartitionUnSyncTables(MTMV mtmv, Long
partitionId) throws AnalysisException {
- List<String> res = Lists.newArrayList();
- for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTables())
{
- TableIf table = getTable(baseTableInfo);
- if (!(table instanceof MTMVRelatedTableIf)) {
- continue;
- }
- MTMVRelatedTableIf mtmvRelatedTableIf = (MTMVRelatedTableIf) table;
- if (mtmv.getMvPartitionInfo().getPartitionType() ==
MTMVPartitionType.FOLLOW_BASE_TABLE && mtmv
-
.getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) {
- PartitionItem item =
mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId);
- Map<Long, PartitionItem> relatedPartitionItems =
mtmvRelatedTableIf.getPartitionItems();
- long relatedPartitionId = getExistPartitionId(item,
- relatedPartitionItems);
- if (relatedPartitionId == -1L) {
- throw new AnalysisException("can not found related
partition");
- }
- boolean isSyncWithPartition = isSyncWithPartition(mtmv,
partitionId, mtmvRelatedTableIf,
- relatedPartitionId);
- if (!isSyncWithPartition) {
- res.add(mtmvRelatedTableIf.getName());
- }
- } else {
- if (!isSyncWithBaseTable(mtmv, partitionId, baseTableInfo)) {
- res.add(table.getName());
- }
- }
- }
- return res;
- }
-
- /**
- * Determine which partition of mtmv can be rewritten
- *
- * @param mtmv
- * @param ctx
- * @return
- */
- public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv,
ConnectContext ctx) {
- List<Partition> res = Lists.newArrayList();
- Collection<Partition> allPartitions = mtmv.getPartitions();
- // check session variable if enable rewrite
- if (!ctx.getSessionVariable().isEnableMaterializedViewRewrite()) {
- return res;
- }
- if (mtmvContainsExternalTable(mtmv) && !ctx.getSessionVariable()
- .isMaterializedViewRewriteEnableContainExternalTable()) {
- return res;
- }
-
- MTMVRelation mtmvRelation = mtmv.getRelation();
- if (mtmvRelation == null) {
- return res;
- }
- // check mv is normal
- if (!(mtmv.getStatus().getState() == MTMVState.NORMAL
- && mtmv.getStatus().getRefreshState() ==
MTMVRefreshState.SUCCESS)) {
- return res;
- }
- // check gracePeriod
- long gracePeriodMills = mtmv.getGracePeriod();
- long currentTimeMills = System.currentTimeMillis();
- for (Partition partition : allPartitions) {
- if (gracePeriodMills > 0 && currentTimeMills <=
(partition.getVisibleVersionTime()
- + gracePeriodMills)) {
- res.add(partition);
- continue;
- }
- try {
- if (isMTMVPartitionSync(mtmv, partition.getId(),
mtmvRelation.getBaseTables(), Sets.newHashSet())) {
- res.add(partition);
- }
- } catch (AnalysisException e) {
- // ignore it
- LOG.warn("check isMTMVPartitionSync failed", e);
- }
- }
- return res;
- }
-
- /**
- * Get the partitions that need to be refreshed
- *
- * @param mtmv
- * @param baseTables
- * @return
- */
- public static List<Long> getMTMVNeedRefreshPartitions(MTMV mtmv,
Set<BaseTableInfo> baseTables) {
- Collection<Partition> allPartitions = mtmv.getPartitions();
- List<Long> res = Lists.newArrayList();
- for (Partition partition : allPartitions) {
- try {
- if (!isMTMVPartitionSync(mtmv, partition.getId(), baseTables,
- mtmv.getExcludedTriggerTables())) {
- res.add(partition.getId());
- }
- } catch (AnalysisException e) {
- res.add(partition.getId());
- LOG.warn("check isMTMVPartitionSync failed", e);
- }
- }
- return res;
- }
-
- /**
- * compare last update time of mtmvPartition and tablePartition
- *
- * @param mtmv
- * @param mtmvPartitionId
- * @param relatedTable
- * @param relatedPartitionId
- * @return
- * @throws AnalysisException
- */
- private static boolean isSyncWithPartition(MTMV mtmv, Long mtmvPartitionId,
- MTMVRelatedTableIf relatedTable,
- Long relatedPartitionId) throws AnalysisException {
- MTMVSnapshotIf relatedPartitionCurrentSnapshot = relatedTable
- .getPartitionSnapshot(relatedPartitionId);
- String relatedPartitionName =
relatedTable.getPartitionName(relatedPartitionId);
- String mtmvPartitionName = mtmv.getPartitionName(mtmvPartitionId);
- return mtmv.getRefreshSnapshot()
- .equalsWithRelatedPartition(mtmvPartitionName,
relatedPartitionName, relatedPartitionCurrentSnapshot);
- }
-
- /**
- * like p_00000101_20170201
- *
- * @param desc
- * @return
- */
- private static String generatePartitionName(PartitionKeyDesc desc) {
- String partitionName = "p_";
- partitionName +=
desc.toSql().trim().replaceAll("\\(|\\)|\\-|\\[|\\]|'|\\s+", "")
- .replaceAll("\\(|\\)|\\,|\\[|\\]", "_");
- if (partitionName.length() > 50) {
- partitionName = partitionName.substring(0, 30) +
Math.abs(Objects.hash(partitionName))
- + "_" + System.currentTimeMillis();
- }
- return partitionName;
- }
-
- /**
- * drop partition of mtmv
- *
- * @param mtmv
- * @param partitionId
- */
- private static void dropPartition(MTMV mtmv, Long partitionId) throws
AnalysisException, DdlException {
- if (!mtmv.writeLockIfExist()) {
- return;
- }
- try {
- Partition partition =
mtmv.getPartitionOrAnalysisException(partitionId);
- DropPartitionClause dropPartitionClause = new
DropPartitionClause(false, partition.getName(), false, false);
- Env.getCurrentEnv().dropPartition((Database) mtmv.getDatabase(),
mtmv, dropPartitionClause);
- } finally {
- mtmv.writeUnlock();
- }
-
- }
-
- /**
- * add partition for mtmv like relatedPartitionId of relatedTable
- *
- * @param mtmv
- * @param partitionItem
- * @throws DdlException
- */
- private static void addPartition(MTMV mtmv, PartitionItem partitionItem)
- throws DdlException {
- PartitionKeyDesc oldPartitionKeyDesc =
partitionItem.toPartitionKeyDesc();
- Map<String, String> partitionProperties = Maps.newHashMap();
- SinglePartitionDesc singlePartitionDesc = new SinglePartitionDesc(true,
- generatePartitionName(oldPartitionKeyDesc),
- oldPartitionKeyDesc, partitionProperties);
-
- AddPartitionClause addPartitionClause = new
AddPartitionClause(singlePartitionDesc,
- mtmv.getDefaultDistributionInfo().toDistributionDesc(),
partitionProperties, false);
- Env.getCurrentEnv().addPartition((Database) mtmv.getDatabase(),
mtmv.getName(), addPartitionClause);
- }
-
- /**
- * compare PartitionItem and return equals partitionId
- * if not found, return -1L
- *
- * @param target
- * @param sources
- * @return
- */
- private static long getExistPartitionId(PartitionItem target, Map<Long,
PartitionItem> sources) {
- for (Entry<Long, PartitionItem> entry : sources.entrySet()) {
- if (target.equals(entry.getValue())) {
- return entry.getKey();
- }
- }
- return -1L;
- }
-
- /**
- * Determine is sync, ignoring excludedTriggerTables and non OlapTanle
- *
- * @param mtmvPartitionId
- * @param tables
- * @param excludedTriggerTables
- * @return
- */
- private static boolean isSyncWithAllBaseTables(MTMV mtmv, long
mtmvPartitionId, Set<BaseTableInfo> tables,
- Set<String> excludedTriggerTables) throws AnalysisException {
- for (BaseTableInfo baseTableInfo : tables) {
- TableIf table = null;
- try {
- table = getTable(baseTableInfo);
- } catch (AnalysisException e) {
- LOG.warn("get table failed, {}", baseTableInfo, e);
- return false;
- }
- if (excludedTriggerTables.contains(table.getName())) {
- continue;
- }
- boolean syncWithBaseTable = isSyncWithBaseTable(mtmv,
mtmvPartitionId, baseTableInfo);
- if (!syncWithBaseTable) {
- return false;
- }
- }
- return true;
- }
-
- private static boolean isSyncWithBaseTable(MTMV mtmv, long
mtmvPartitionId, BaseTableInfo baseTableInfo)
- throws AnalysisException {
- TableIf table = null;
- try {
- table = getTable(baseTableInfo);
- } catch (AnalysisException e) {
- LOG.warn("get table failed, {}", baseTableInfo, e);
- return false;
- }
-
- if (!(table instanceof MTMVRelatedTableIf)) {
- // if not MTMVRelatedTableIf, we can not get snapshot from it,
- // Currently, it is believed to be synchronous
- return true;
- }
- MTMVRelatedTableIf baseTable = (MTMVRelatedTableIf) table;
- MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot();
- String mtmvPartitionName = mtmv.getPartitionName(mtmvPartitionId);
- return mtmv.getRefreshSnapshot()
- .equalsWithBaseTable(mtmvPartitionName, baseTable.getId(),
baseTableCurrentSnapshot);
- }
-
- private static boolean mtmvContainsExternalTable(MTMV mtmv) {
+ public static boolean mtmvContainsExternalTable(MTMV mtmv) {
Set<BaseTableInfo> baseTables = mtmv.getRelation().getBaseTables();
for (BaseTableInfo baseTableInfo : baseTables) {
if (baseTableInfo.getCtlId() !=
InternalCatalog.INTERNAL_CATALOG_ID) {
@@ -448,60 +66,4 @@ public class MTMVUtil {
}
return false;
}
-
- public static Map<String, MTMVRefreshPartitionSnapshot>
generatePartitionSnapshots(MTMV mtmv,
- Set<BaseTableInfo> baseTables, Set<Long> partitionIds)
- throws AnalysisException {
- Map<String, MTMVRefreshPartitionSnapshot> res = Maps.newHashMap();
- for (Long partitionId : partitionIds) {
- res.put(mtmv.getPartition(partitionId).getName(),
generatePartitionSnapshot(mtmv, baseTables, partitionId));
- }
- return res;
- }
-
-
- private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMV
mtmv,
- Set<BaseTableInfo> baseTables, Long partitionId)
- throws AnalysisException {
- MTMVRefreshPartitionSnapshot refreshPartitionSnapshot = new
MTMVRefreshPartitionSnapshot();
- if (mtmv.getMvPartitionInfo().getPartitionType() ==
MTMVPartitionType.FOLLOW_BASE_TABLE) {
- MTMVRelatedTableIf relatedTable =
mtmv.getMvPartitionInfo().getRelatedTable();
- List<Long> relatedPartitionIds = getMTMVPartitionRelatedPartitions(
- mtmv.getPartitionItems().get(partitionId),
- relatedTable);
-
- for (Long relatedPartitionId : relatedPartitionIds) {
- MTMVSnapshotIf partitionSnapshot = relatedTable
- .getPartitionSnapshot(relatedPartitionId);
- refreshPartitionSnapshot.getPartitions()
-
.put(relatedTable.getPartitionName(relatedPartitionId), partitionSnapshot);
- }
- }
- for (BaseTableInfo baseTableInfo : baseTables) {
- if (mtmv.getMvPartitionInfo().getPartitionType() ==
MTMVPartitionType.FOLLOW_BASE_TABLE && mtmv
-
.getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) {
- continue;
- }
- TableIf table = getTable(baseTableInfo);
- if (!(table instanceof MTMVRelatedTableIf)) {
- continue;
- }
- refreshPartitionSnapshot.getTables().put(table.getId(),
((MTMVRelatedTableIf) table).getTableSnapshot());
- }
- return refreshPartitionSnapshot;
- }
-
- private static List<Long> getMTMVPartitionRelatedPartitions(PartitionItem
mtmvPartitionItem,
- MTMVRelatedTableIf relatedTable) {
- List<Long> res = Lists.newArrayList();
- Map<Long, PartitionItem> relatedPartitionItems =
relatedTable.getPartitionItems();
- for (Entry<Long, PartitionItem> entry :
relatedPartitionItems.entrySet()) {
- if (mtmvPartitionItem.equals(entry.getValue())) {
- res.add(entry.getKey());
- // current, the partitioning of MTMV corresponds one-to-one
with the partitioning of related table
- return res;
- }
- }
- return res;
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
index bac4059c162..12f409a5946 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
@@ -25,7 +25,7 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Pair;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo;
-import org.apache.doris.mtmv.MTMVUtil;
+import org.apache.doris.mtmv.MTMVRewriteUtil;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.jobs.executor.Rewriter;
import org.apache.doris.nereids.memo.GroupExpression;
@@ -315,8 +315,8 @@ public abstract class AbstractMaterializedViewRule
implements ExplorationRuleFac
return ImmutableSet.of();
}
// get mv valid partitions
- Set<Long> mvDataValidPartitionIdSet =
MTMVUtil.getMTMVCanRewritePartitions(mtmv,
- cascadesContext.getConnectContext()).stream()
+ Set<Long> mvDataValidPartitionIdSet =
MTMVRewriteUtil.getMTMVCanRewritePartitions(mtmv,
+ cascadesContext.getConnectContext(),
System.currentTimeMillis()).stream()
.map(Partition::getId)
.collect(Collectors.toSet());
Set<Long> queryUsedPartitionIdSet = rewrittenPlan.collectToList(node
-> node instanceof LogicalOlapScan
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java
index e7b11b5fd17..baf72cc278e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java
@@ -255,7 +255,7 @@ public class MaterializedViewUtils {
Column mvReferenceColumn =
context.getMvPartitionColumn().getColumn().get();
if (partitionColumnSet.contains(mvReferenceColumn)) {
context.addTableColumn(table, mvReferenceColumn);
- context.setPctPossible(true);
+ context.setPctPossible(!mvReferenceColumn.isAllowNull() ||
relatedTable.isPartitionColumnAllowNull());
}
return visit(relation, context);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RefreshMTMVInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RefreshMTMVInfo.java
index f280e867819..5598c812594 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RefreshMTMVInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RefreshMTMVInfo.java
@@ -24,7 +24,7 @@ import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.mtmv.MTMVUtil;
+import org.apache.doris.mtmv.MTMVPartitionUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.util.Utils;
@@ -67,7 +67,7 @@ public class RefreshMTMVInfo {
Database db =
Env.getCurrentInternalCatalog().getDbOrDdlException(mvName.getDb());
MTMV mtmv = (MTMV) db.getTableOrMetaException(mvName.getTbl(),
TableType.MATERIALIZED_VIEW);
if (!CollectionUtils.isEmpty(partitions)) {
- MTMVUtil.getPartitionsIdsByNames(mtmv, partitions);
+ MTMVPartitionUtil.getPartitionsIdsByNames(mtmv, partitions);
}
} catch (org.apache.doris.common.AnalysisException |
MetaNotFoundException | DdlException e) {
throw new AnalysisException(e.getMessage());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index 21505586491..2e0de09d29b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -33,7 +33,7 @@ import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.extensions.mtmv.MTMVJob;
import org.apache.doris.job.task.AbstractTask;
-import org.apache.doris.mtmv.MTMVUtil;
+import org.apache.doris.mtmv.MTMVPartitionUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.planner.external.iceberg.IcebergMetadataCache;
import org.apache.doris.qe.ConnectContext;
@@ -634,7 +634,7 @@ public class MetadataGenerator {
trow.addToColumnValue(new
TCell().setStringVal(mv.getEnvInfo().toString()));
trow.addToColumnValue(new
TCell().setStringVal(mv.getMvProperties().toString()));
trow.addToColumnValue(new
TCell().setStringVal(mv.getMvPartitionInfo().toNameString()));
- trow.addToColumnValue(new
TCell().setBoolVal(MTMVUtil.isMTMVSync(mv)));
+ trow.addToColumnValue(new
TCell().setBoolVal(MTMVPartitionUtil.isMTMVSync(mv)));
dataBatch.add(trow);
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java
b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java
new file mode 100644
index 00000000000..4bb74bfd448
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java
@@ -0,0 +1,187 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import org.apache.doris.analysis.PartitionKeyDesc;
+import org.apache.doris.analysis.PartitionValue;
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import mockit.Expectations;
+import mockit.Mocked;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Set;
+
+public class MTMVPartitionUtilTest {
+ @Mocked
+ private MTMV mtmv;
+ @Mocked
+ private Partition p1;
+ @Mocked
+ private MTMVRelation relation;
+ @Mocked
+ private BaseTableInfo baseTableInfo;
+ @Mocked
+ private MTMVPartitionInfo mtmvPartitionInfo;
+ @Mocked
+ private OlapTable baseOlapTable;
+ @Mocked
+ private MTMVSnapshotIf baseSnapshotIf;
+ @Mocked
+ private MTMVRefreshSnapshot refreshSnapshot;
+ @Mocked
+ private MTMVUtil mtmvUtil;
+
+ private Set<BaseTableInfo> baseTables = Sets.newHashSet();
+
+ @Before
+ public void setUp() throws NoSuchMethodException, SecurityException,
AnalysisException {
+ baseTables.add(baseTableInfo);
+ new Expectations() {
+ {
+ mtmv.getRelation();
+ minTimes = 0;
+ result = relation;
+
+ mtmv.getPartitions();
+ minTimes = 0;
+ result = Lists.newArrayList(p1);
+
+ p1.getId();
+ minTimes = 0;
+ result = 1L;
+
+ mtmv.getMvPartitionInfo();
+ minTimes = 0;
+ result = mtmvPartitionInfo;
+
+ mtmvPartitionInfo.getPartitionType();
+ minTimes = 0;
+ result = MTMVPartitionType.SELF_MANAGE;
+
+ mtmvUtil.getTable(baseTableInfo);
+ minTimes = 0;
+ result = baseOlapTable;
+
+ baseOlapTable.needAutoRefresh();
+ minTimes = 0;
+ result = true;
+
+ baseOlapTable.getTableSnapshot();
+ minTimes = 0;
+ result = baseSnapshotIf;
+
+ mtmv.getPartitionName(anyLong);
+ minTimes = 0;
+ result = "p1";
+
+ mtmv.getRefreshSnapshot();
+ minTimes = 0;
+ result = refreshSnapshot;
+
+ refreshSnapshot.equalsWithBaseTable(anyString, anyLong,
(MTMVSnapshotIf) any);
+ minTimes = 0;
+ result = true;
+
+ relation.getBaseTables();
+ minTimes = 0;
+ result = baseTables;
+
+ baseOlapTable.needAutoRefresh();
+ minTimes = 0;
+ result = true;
+
+ baseOlapTable.getPartitionSnapshot(anyLong);
+ minTimes = 0;
+ result = baseSnapshotIf;
+
+ baseOlapTable.getPartitionName(anyLong);
+ minTimes = 0;
+ result = "p1";
+
+ refreshSnapshot.equalsWithRelatedPartition(anyString,
anyString, (MTMVSnapshotIf) any);
+ minTimes = 0;
+ result = true;
+ }
+ };
+ }
+
+ @Test
+ public void testIsMTMVSyncNormal() {
+ boolean mtmvSync = MTMVPartitionUtil.isMTMVSync(mtmv);
+ Assert.assertTrue(mtmvSync);
+ }
+
+ @Test
+ public void testIsMTMVSyncNotSync() {
+ new Expectations() {
+ {
+ refreshSnapshot.equalsWithBaseTable(anyString, anyLong,
(MTMVSnapshotIf) any);
+ minTimes = 0;
+ result = false;
+ }
+ };
+ boolean mtmvSync = MTMVPartitionUtil.isMTMVSync(mtmv);
+ Assert.assertFalse(mtmvSync);
+ }
+
+ @Test
+ public void testIsSyncWithPartition() throws AnalysisException {
+ boolean isSyncWithPartition =
MTMVPartitionUtil.isSyncWithPartition(mtmv, 1L, baseOlapTable, 2L);
+ Assert.assertTrue(isSyncWithPartition);
+ }
+
+ @Test
+ public void testIsSyncWithPartitionNotSync() throws AnalysisException {
+ new Expectations() {
+ {
+ refreshSnapshot.equalsWithRelatedPartition(anyString,
anyString, (MTMVSnapshotIf) any);
+ minTimes = 0;
+ result = false;
+ }
+ };
+ boolean isSyncWithPartition =
MTMVPartitionUtil.isSyncWithPartition(mtmv, 1L, baseOlapTable, 2L);
+ Assert.assertFalse(isSyncWithPartition);
+ }
+
+ @Test
+ public void testGeneratePartitionName() {
+ List<List<PartitionValue>> inValues = Lists.newArrayList();
+ inValues.add(Lists.newArrayList(new PartitionValue("value11"), new
PartitionValue("value12")));
+ inValues.add(Lists.newArrayList(new PartitionValue("value21"), new
PartitionValue("value22")));
+ PartitionKeyDesc inDesc = PartitionKeyDesc.createIn(inValues);
+ String inName = MTMVPartitionUtil.generatePartitionName(inDesc);
+ Assert.assertEquals("p_value11_value12_value21_value22", inName);
+
+ PartitionKeyDesc rangeDesc = PartitionKeyDesc.createFixed(
+ Lists.newArrayList(new PartitionValue(1L)),
+ Lists.newArrayList(new PartitionValue(2L))
+ );
+ String rangeName = MTMVPartitionUtil.generatePartitionName(rangeDesc);
+ Assert.assertEquals("p_1_2", rangeName);
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRefreshSnapshotTest.java
b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRefreshSnapshotTest.java
new file mode 100644
index 00000000000..42b5b783841
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRefreshSnapshotTest.java
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import org.apache.doris.common.AnalysisException;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class MTMVRefreshSnapshotTest {
+ private String mvExistPartitionName = "mvp1";
+ private String relatedExistPartitionName = "p1";
+ private long baseExistTableId = 1L;
+ private long correctVersion = 1L;
+ private MTMVRefreshSnapshot refreshSnapshot = new MTMVRefreshSnapshot();
+ private MTMVVersionSnapshot p1Snapshot = new
MTMVVersionSnapshot(correctVersion);
+ private MTMVVersionSnapshot t1Snapshot = new
MTMVVersionSnapshot(correctVersion);
+
+ @Before
+ public void setUp() throws NoSuchMethodException, SecurityException,
AnalysisException {
+ Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots =
Maps.newHashMap();
+ MTMVRefreshPartitionSnapshot mvp1PartitionSnapshot = new
MTMVRefreshPartitionSnapshot();
+ partitionSnapshots.put(mvExistPartitionName, mvp1PartitionSnapshot);
+ mvp1PartitionSnapshot.getPartitions().put(relatedExistPartitionName,
p1Snapshot);
+ mvp1PartitionSnapshot.getTables().put(baseExistTableId, t1Snapshot);
+ refreshSnapshot.updateSnapshots(partitionSnapshots,
Sets.newHashSet(mvExistPartitionName));
+ }
+
+ @Test
+ public void testPartitionSync() {
+ // normal
+ boolean sync =
refreshSnapshot.equalsWithRelatedPartition(mvExistPartitionName,
relatedExistPartitionName,
+ new MTMVVersionSnapshot(correctVersion));
+ Assert.assertTrue(sync);
+ // non exist mv partition
+ sync = refreshSnapshot.equalsWithRelatedPartition("mvp2",
relatedExistPartitionName,
+ new MTMVVersionSnapshot(correctVersion));
+ Assert.assertFalse(sync);
+ // non exist related partition
+ sync = refreshSnapshot
+ .equalsWithRelatedPartition(mvExistPartitionName, "p2", new
MTMVVersionSnapshot(correctVersion));
+ Assert.assertFalse(sync);
+ // snapshot value not equal
+ sync =
refreshSnapshot.equalsWithRelatedPartition(mvExistPartitionName,
relatedExistPartitionName,
+ new MTMVVersionSnapshot(2L));
+ Assert.assertFalse(sync);
+ // snapshot type not equal
+ sync =
refreshSnapshot.equalsWithRelatedPartition(mvExistPartitionName,
relatedExistPartitionName,
+ new MTMVTimestampSnapshot(correctVersion));
+ Assert.assertFalse(sync);
+ }
+
+ @Test
+ public void testTableSync() {
+ // normal
+ boolean sync =
refreshSnapshot.equalsWithBaseTable(mvExistPartitionName, baseExistTableId,
+ new MTMVVersionSnapshot(correctVersion));
+ Assert.assertTrue(sync);
+ // non exist mv partition
+ sync = refreshSnapshot
+ .equalsWithBaseTable("mvp2", baseExistTableId, new
MTMVVersionSnapshot(correctVersion));
+ Assert.assertFalse(sync);
+ // non exist related partition
+ sync = refreshSnapshot
+ .equalsWithBaseTable(mvExistPartitionName, 2L, new
MTMVVersionSnapshot(correctVersion));
+ Assert.assertFalse(sync);
+ // snapshot value not equal
+ sync = refreshSnapshot
+ .equalsWithBaseTable(mvExistPartitionName, baseExistTableId,
new MTMVVersionSnapshot(2L));
+ Assert.assertFalse(sync);
+ // snapshot type not equal
+ sync = refreshSnapshot.equalsWithBaseTable(mvExistPartitionName,
baseExistTableId,
+ new MTMVTimestampSnapshot(correctVersion));
+ Assert.assertFalse(sync);
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java
b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java
new file mode 100644
index 00000000000..55394897e42
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java
@@ -0,0 +1,254 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState;
+import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
+
+import com.google.common.collect.Lists;
+import mockit.Expectations;
+import mockit.Mocked;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Set;
+
+public class MTMVRewriteUtilTest {
+ @Mocked
+ private MTMV mtmv;
+ @Mocked
+ private ConnectContext ctx;
+ @Mocked
+ private SessionVariable sessionVariable;
+ @Mocked
+ private Partition p1;
+ @Mocked
+ private MTMVRelation relation;
+ @Mocked
+ private MTMVStatus status;
+ @Mocked
+ private MTMVPartitionUtil mtmvPartitionUtil;
+ @Mocked
+ private MTMVUtil mtmvUtil;
+ private long currentTimeMills = 3L;
+
+ @Before
+ public void setUp() throws NoSuchMethodException, SecurityException,
AnalysisException {
+
+ new Expectations() {
+ {
+ mtmv.getPartitions();
+ minTimes = 0;
+ result = Lists.newArrayList(p1);
+
+ p1.getVisibleVersionTime();
+ minTimes = 0;
+ result = 1L;
+
+ mtmv.getGracePeriod();
+ minTimes = 0;
+ result = 0L;
+
+ mtmv.getRelation();
+ minTimes = 0;
+ result = relation;
+
+ mtmv.getStatus();
+ minTimes = 0;
+ result = status;
+
+ mtmv.getGracePeriod();
+ minTimes = 0;
+ result = 0L;
+
+ status.getState();
+ minTimes = 0;
+ result = MTMVState.NORMAL;
+
+ status.getRefreshState();
+ minTimes = 0;
+ result = MTMVRefreshState.SUCCESS;
+
+ ctx.getSessionVariable();
+ minTimes = 0;
+ result = sessionVariable;
+
+ sessionVariable.isEnableMaterializedViewRewrite();
+ minTimes = 0;
+ result = true;
+
+
sessionVariable.isMaterializedViewRewriteEnableContainExternalTable();
+ minTimes = 0;
+ result = true;
+
+ MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong,
(Set<BaseTableInfo>) any, (Set<String>) any);
+ minTimes = 0;
+ result = true;
+
+ MTMVUtil.mtmvContainsExternalTable((MTMV) any);
+ minTimes = 0;
+ result = false;
+ }
+ };
+ }
+
+ @Test
+ public void testGetMTMVCanRewritePartitionsNormal() {
+ Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
+ .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills);
+ Assert.assertEquals(1, mtmvCanRewritePartitions.size());
+ }
+
+ @Test
+ public void testGetMTMVCanRewritePartitionsInGracePeriod() throws
AnalysisException {
+ new Expectations() {
+ {
+ mtmv.getGracePeriod();
+ minTimes = 0;
+ result = 2L;
+
+ MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong,
(Set<BaseTableInfo>) any, (Set<String>) any);
+ minTimes = 0;
+ result = false;
+ }
+ };
+
+ Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
+ .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills);
+ Assert.assertEquals(1, mtmvCanRewritePartitions.size());
+ }
+
+ @Test
+ public void testGetMTMVCanRewritePartitionsNotInGracePeriod() throws
AnalysisException {
+ new Expectations() {
+ {
+ mtmv.getGracePeriod();
+ minTimes = 0;
+ result = 1L;
+
+ MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong,
(Set<BaseTableInfo>) any, (Set<String>) any);
+ minTimes = 0;
+ result = false;
+ }
+ };
+
+ Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
+ .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills);
+ Assert.assertEquals(0, mtmvCanRewritePartitions.size());
+ }
+
+ @Test
+ public void
testGetMTMVCanRewritePartitionsDisableMaterializedViewRewrite() {
+ new Expectations() {
+ {
+ sessionVariable.isEnableMaterializedViewRewrite();
+ minTimes = 0;
+ result = false;
+ }
+ };
+ Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
+ .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills);
+ Assert.assertEquals(0, mtmvCanRewritePartitions.size());
+ }
+
+ @Test
+ public void testGetMTMVCanRewritePartitionsNotSync() throws
AnalysisException {
+ new Expectations() {
+ {
+ MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong,
(Set<BaseTableInfo>) any, (Set<String>) any);
+ minTimes = 0;
+ result = false;
+ }
+ };
+ Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
+ .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills);
+ Assert.assertEquals(0, mtmvCanRewritePartitions.size());
+ }
+
+ @Test
+ public void testGetMTMVCanRewritePartitionsEnableContainExternalTable() {
+ new Expectations() {
+ {
+ MTMVUtil.mtmvContainsExternalTable((MTMV) any);
+ minTimes = 0;
+ result = true;
+
+
sessionVariable.isMaterializedViewRewriteEnableContainExternalTable();
+ minTimes = 0;
+ result = true;
+ }
+ };
+ Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
+ .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills);
+ Assert.assertEquals(1, mtmvCanRewritePartitions.size());
+ }
+
+ @Test
+ public void testGetMTMVCanRewritePartitionsDisableContainExternalTable() {
+ new Expectations() {
+ {
+ MTMVUtil.mtmvContainsExternalTable((MTMV) any);
+ minTimes = 0;
+ result = true;
+
+
sessionVariable.isMaterializedViewRewriteEnableContainExternalTable();
+ minTimes = 0;
+ result = false;
+ }
+ };
+ Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
+ .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills);
+ Assert.assertEquals(0, mtmvCanRewritePartitions.size());
+ }
+
+ @Test
+ public void testGetMTMVCanRewritePartitionsStateAbnormal() {
+ new Expectations() {
+ {
+ status.getState();
+ minTimes = 0;
+ result = MTMVState.SCHEMA_CHANGE;
+ }
+ };
+ Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
+ .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills);
+ Assert.assertEquals(0, mtmvCanRewritePartitions.size());
+ }
+
+ @Test
+ public void testGetMTMVCanRewritePartitionsRefreshStateAbnormal() {
+ new Expectations() {
+ {
+ status.getRefreshState();
+ minTimes = 0;
+ result = MTMVRefreshState.FAIL;
+ }
+ };
+ Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
+ .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills);
+ Assert.assertEquals(0, mtmvCanRewritePartitions.size());
+ }
+
+}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java
b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java
new file mode 100644
index 00000000000..b1fc52dad46
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.job.extensions.mtmv.MTMVTask;
+import org.apache.doris.job.extensions.mtmv.MTMVTask.MTMVTaskTriggerMode;
+import org.apache.doris.job.extensions.mtmv.MTMVTaskContext;
+import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
+import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import mockit.Expectations;
+import mockit.Mocked;
+import org.apache.commons.collections.CollectionUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Set;
+
+public class MTMVTaskTest {
+ private long poneId = 1L;
+ private String poneName = "p1";
+ private long ptwoId = 2L;
+ private String ptwoName = "p2";
+ private List<Long> allPartitionIds = Lists.newArrayList(poneId, ptwoId);
+ private MTMVRelation relation = new MTMVRelation(Sets.newHashSet(),
Sets.newHashSet());
+
+ @Mocked
+ private MTMV mtmv;
+ @Mocked
+ private MTMVUtil mtmvUtil;
+ @Mocked
+ private MTMVPartitionUtil mtmvPartitionUtil;
+ @Mocked
+ private MTMVPartitionInfo mtmvPartitionInfo;
+ @Mocked
+ private MTMVRefreshInfo mtmvRefreshInfo;
+
+ @Before
+ public void setUp()
+ throws NoSuchMethodException, SecurityException,
AnalysisException, DdlException, MetaNotFoundException {
+
+ new Expectations() {
+ {
+ mtmvUtil.getMTMV(anyLong, anyLong);
+ minTimes = 0;
+ result = mtmv;
+
+ mtmv.getPartitionIds();
+ minTimes = 0;
+ result = allPartitionIds;
+
+ mtmv.getMvPartitionInfo();
+ minTimes = 0;
+ result = mtmvPartitionInfo;
+
+ mtmvPartitionInfo.getPartitionType();
+ minTimes = 0;
+ result = MTMVPartitionType.FOLLOW_BASE_TABLE;
+
+ mtmvPartitionUtil.getPartitionsIdsByNames(mtmv,
Lists.newArrayList(poneName));
+ minTimes = 0;
+ result = poneId;
+
+ mtmvPartitionUtil.isMTMVSync(mtmv, (Set<BaseTableInfo>) any,
(Set<String>) any);
+ minTimes = 0;
+ result = true;
+
+ mtmv.getRefreshInfo();
+ minTimes = 0;
+ result = mtmvRefreshInfo;
+
+ mtmvRefreshInfo.getRefreshMethod();
+ minTimes = 0;
+ result = RefreshMethod.COMPLETE;
+ }
+ };
+ }
+
+ @Test
+ public void testCalculateNeedRefreshPartitionsManualComplete() throws
AnalysisException {
+ MTMVTaskContext context = new
MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, Lists.newArrayList(poneName), true);
+ MTMVTask task = new MTMVTask(mtmv, relation, context);
+ List<Long> result = task.calculateNeedRefreshPartitions();
+ Assert.assertEquals(allPartitionIds, result);
+ }
+
+ @Test
+ public void testCalculateNeedRefreshPartitionsManualPartitions() throws
AnalysisException {
+ MTMVTaskContext context = new
MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, Lists.newArrayList(poneName),
false);
+ MTMVTask task = new MTMVTask(mtmv, relation, context);
+ List<Long> result = task.calculateNeedRefreshPartitions();
+ Assert.assertEquals(Lists.newArrayList(poneId), result);
+ }
+
+ @Test
+ public void testCalculateNeedRefreshPartitionsSystem() throws
AnalysisException {
+ MTMVTaskContext context = new
MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM);
+ MTMVTask task = new MTMVTask(mtmv, relation, context);
+ List<Long> result = task.calculateNeedRefreshPartitions();
+ Assert.assertTrue(CollectionUtils.isEmpty(result));
+ }
+
+ @Test
+ public void testCalculateNeedRefreshPartitionsSystemNotSyncComplete()
throws AnalysisException {
+ new Expectations() {
+ {
+ mtmvPartitionUtil.isMTMVSync(mtmv, (Set<BaseTableInfo>) any,
(Set<String>) any);
+ minTimes = 0;
+ result = false;
+ }
+ };
+ MTMVTaskContext context = new
MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM);
+ MTMVTask task = new MTMVTask(mtmv, relation, context);
+ List<Long> result = task.calculateNeedRefreshPartitions();
+ Assert.assertEquals(allPartitionIds, result);
+ }
+
+ @Test
+ public void testCalculateNeedRefreshPartitionsSystemNotSyncAuto() throws
AnalysisException {
+ new Expectations() {
+ {
+ mtmvPartitionUtil.isMTMVSync(mtmv, (Set<BaseTableInfo>) any,
(Set<String>) any);
+ minTimes = 0;
+ result = false;
+
+ mtmvRefreshInfo.getRefreshMethod();
+ minTimes = 0;
+ result = RefreshMethod.AUTO;
+
+ mtmvPartitionUtil.getMTMVNeedRefreshPartitions(mtmv,
(Set<BaseTableInfo>) any);
+ minTimes = 0;
+ result = Lists.newArrayList(ptwoId);
+ }
+ };
+ MTMVTaskContext context = new
MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM);
+ MTMVTask task = new MTMVTask(mtmv, relation, context);
+ List<Long> result = task.calculateNeedRefreshPartitions();
+ Assert.assertEquals(Lists.newArrayList(ptwoId), result);
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtilsTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtilsTest.java
index 4204bbe0221..02fb18edbf7 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtilsTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtilsTest.java
@@ -26,7 +26,6 @@ import org.apache.doris.nereids.util.PlanChecker;
import org.apache.doris.utframe.TestWithFeService;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.util.Optional;
@@ -252,7 +251,6 @@ public class MaterializedViewUtilsTest extends
TestWithFeService {
}
@Test
- @Disabled
public void getRelatedTableInfoTestWithoutGroupNullTest() {
PlanChecker.from(connectContext)
.checkExplain("SELECT (o.c1_abs + ps.c2_abs) as add_alias,
l.L_SHIPDATE, l.L_ORDERKEY, o.O_ORDERDATE, "
diff --git a/regression-test/data/mtmv_p0/test_hive_mtmv.out
b/regression-test/data/mtmv_p0/test_hive_mtmv.out
index 9ee89dd033d..26e34af7b5d 100644
--- a/regression-test/data/mtmv_p0/test_hive_mtmv.out
+++ b/regression-test/data/mtmv_p0/test_hive_mtmv.out
@@ -8,6 +8,11 @@
1 A 20230101
2 B 20230101
3 C 20230101
+
+-- !refresh_complete --
+1 A 20230101
+2 B 20230101
+3 C 20230101
4 D 20230102
5 E 20230102
6 F 20230102
diff --git a/regression-test/suites/mtmv_p0/test_hive_mtmv.groovy
b/regression-test/suites/mtmv_p0/test_hive_mtmv.groovy
index 573f1f84d5d..cf34cfb616a 100644
--- a/regression-test/suites/mtmv_p0/test_hive_mtmv.groovy
+++ b/regression-test/suites/mtmv_p0/test_hive_mtmv.groovy
@@ -57,12 +57,20 @@ suite("test_hive_mtmv",
"p0,external,hive,external_docker,external_docker_hive")
order_qt_refresh_one_partition "SELECT * FROM ${mvName} order by
id"
//refresh other partitions
+ // current, for hive, auto refresh will not change data
sql """
REFRESH MATERIALIZED VIEW ${mvName}
"""
waitingMTMVTaskFinished(jobName)
order_qt_refresh_other_partition "SELECT * FROM ${mvName} order by
id"
+ //refresh complete
+ sql """
+ REFRESH MATERIALIZED VIEW ${mvName} complete
+ """
+ waitingMTMVTaskFinished(jobName)
+ order_qt_refresh_complete "SELECT * FROM ${mvName} order by id"
+
sql """drop materialized view if exists ${mvName};"""
sql """drop catalog if exists ${catalog_name}"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]