This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 12350ca8672 branch-2.1: [Improve](mtmv) skip the generation of
invalid task for refresh mtmv #46280 (#46392)
12350ca8672 is described below
commit 12350ca8672fbede2f3b6083db4b264704a52dac
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sun Jan 5 08:59:11 2025 +0800
branch-2.1: [Improve](mtmv) skip the generation of invalid task for
refresh mtmv #46280 (#46392)
Cherry-picked from #46280
Co-authored-by: shee <[email protected]>
Co-authored-by: garenshi <[email protected]>
---
.../java/org/apache/doris/mtmv/MTMVService.java | 11 +++-
regression-test/data/mtmv_p0/test_commit_mtmv.out | 20 ++++++
.../suites/mtmv_p0/test_commit_mtmv.groovy | 72 ++++++++++++++++++++++
3 files changed, 102 insertions(+), 1 deletion(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java
index 278811d3a99..26c6bfb10e9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java
@@ -198,7 +198,7 @@ public class MTMVService implements EventListener {
try {
// check if mtmv should trigger by event
MTMV mtmv = (MTMV) MTMVUtil.getTable(baseTableInfo);
- if
(mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger().equals(RefreshTrigger.COMMIT))
{
+ if (canRefresh(mtmv, table)) {
jobManager.onCommit(mtmv);
}
} catch (Exception e) {
@@ -206,4 +206,13 @@ public class MTMVService implements EventListener {
}
}
}
+
+ private boolean canRefresh(MTMV mtmv, TableIf table) {
+ if (mtmv.getExcludedTriggerTables().contains(table.getName())) {
+ LOG.info("skip refresh mtmv: {}, because exclude trigger table:
{}",
+ mtmv.getName(), table.getName());
+ return false;
+ }
+ return
mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger().equals(RefreshTrigger.COMMIT);
+ }
}
diff --git a/regression-test/data/mtmv_p0/test_commit_mtmv.out
b/regression-test/data/mtmv_p0/test_commit_mtmv.out
index 208638b4c10..433d55ef4b8 100644
--- a/regression-test/data/mtmv_p0/test_commit_mtmv.out
+++ b/regression-test/data/mtmv_p0/test_commit_mtmv.out
@@ -61,3 +61,23 @@
-- !mv1_replace --
3 2017-03-15 3
+-- !mv_sag --
+1 1 60
+
+-- !task_sag --
+{"triggerMode":"COMMIT","partitions":[],"isComplete":false}
+
+-- !mv_sag1 --
+1 1 60
+
+-- !task_sag1 --
+{"triggerMode":"COMMIT","partitions":[],"isComplete":false}
+
+-- !mv_sag2 --
+1 1 60
+1 2 70
+2 1 70
+
+-- !task_sag2 --
+{"triggerMode":"COMMIT","partitions":[],"isComplete":false}
+
diff --git a/regression-test/suites/mtmv_p0/test_commit_mtmv.groovy
b/regression-test/suites/mtmv_p0/test_commit_mtmv.groovy
index d8161a3fc92..bb4c3f8f7ce 100644
--- a/regression-test/suites/mtmv_p0/test_commit_mtmv.groovy
+++ b/regression-test/suites/mtmv_p0/test_commit_mtmv.groovy
@@ -149,4 +149,76 @@ suite("test_commit_mtmv") {
sql """drop materialized view if exists ${mvName2};"""
sql """drop table if exists `${tableName}`"""
+ //===========test excluded_trigger_tables===========
+ def tblStu = "test_commit_mtmv_tbl_stu"
+ def tblGrade = "test_commit_mtmv_tbl_grade"
+ def mvSag = "test_commit_mv_sag"
+ sql """drop materialized view if exists ${mvSag};"""
+ sql """drop table if exists `${tblStu}`"""
+ sql """drop table if exists `${tblGrade}`"""
+ sql """
+ CREATE TABLE `${tblStu}` (
+ `sid` int(32) NULL,
+ `sname` varchar(32) NULL,
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`sid`)
+ DISTRIBUTED BY HASH(`sid`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+ sql """
+ CREATE TABLE `${tblGrade}` (
+ `sid` int(32) NULL,
+ `cid` int(32) NULL,
+ `score` int NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`sid`)
+ DISTRIBUTED BY HASH(`sid`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+ sql """
+ CREATE MATERIALIZED VIEW ${mvSag}
+ BUILD DEFERRED
+ REFRESH COMPLETE ON commit
+ DISTRIBUTED BY HASH(`sid`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "excluded_trigger_tables" = "${tblGrade}"
+ )
+ AS select a.sid,b.cid,b.score from ${tblStu} a join ${tblGrade} b on
a.sid = b.sid;
+ """
+
+ sql """
+ insert into ${tblGrade} values(1, 1, 60);
+ insert into ${tblStu} values(1, 'sam');
+ """
+ def sagJobName = getJobName(dbName, mvSag);
+ waitingMTMVTaskFinished(sagJobName)
+ order_qt_mv_sag "SELECT * FROM ${mvSag} order by sid,cid"
+ order_qt_task_sag "SELECT TaskContext from tasks('type'='mv') where
MvName='${mvSag}' order by CreateTime desc limit 1"
+
+ sql """
+ insert into ${tblGrade} values(1, 2, 70);
+ """
+ waitingMTMVTaskFinished(sagJobName)
+ order_qt_mv_sag1 "SELECT * FROM ${mvSag} order by sid,cid"
+ order_qt_task_sag1 "SELECT TaskContext from tasks('type'='mv') where
MvName='${mvSag}' order by CreateTime desc limit 1"
+
+ sql """
+ insert into ${tblGrade} values(2, 1, 70);
+ insert into ${tblStu} values(2, 'jack');
+ """
+
+ waitingMTMVTaskFinished(sagJobName)
+ order_qt_mv_sag2 "SELECT * FROM ${mvSag} order by sid,cid"
+ order_qt_task_sag2 "SELECT TaskContext from tasks('type'='mv') where
MvName='${mvSag}' order by CreateTime desc limit 1"
+
+ sql """drop materialized view if exists ${mvSag};"""
+ sql """drop table if exists `${tblStu}`"""
+ sql """drop table if exists `${tblGrade}`"""
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]