This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c1676f03cae [enhance](mtmv) Mv refresh on commit (#34548)
c1676f03cae is described below
commit c1676f03cae57c943323845fc424a1182d085039
Author: zhangdong <[email protected]>
AuthorDate: Wed May 29 17:08:38 2024 +0800
[enhance](mtmv) Mv refresh on commit (#34548)
support refresh MTMV when base table data change / drop
partition/replace partition
CREATE MATERIALIZED VIEW mv1
REFRESH ON COMMIT
AS
SELECT xxx;
---
.../antlr4/org/apache/doris/nereids/DorisParser.g4 | 1 +
.../main/java/org/apache/doris/catalog/Env.java | 20 ++++
.../transaction/CloudGlobalTransactionMgr.java | 19 +++
.../apache/doris/datasource/InternalCatalog.java | 14 ++-
.../DataChangeEvent.java} | 50 +-------
.../DropPartitionEvent.java} | 50 +-------
.../main/java/org/apache/doris/event/Event.java | 60 ++++++++++
.../EventException.java} | 48 ++------
.../EventListener.java} | 49 +-------
.../org/apache/doris/event/EventProcessor.java | 57 +++++++++
.../MTMVRefreshEnum.java => event/EventType.java} | 52 +--------
.../ReplacePartitionEvent.java} | 50 +-------
.../MTMVRefreshEnum.java => event/TableEvent.java} | 60 ++++------
.../apache/doris/job/extensions/mtmv/MTMVJob.java | 29 ++++-
.../apache/doris/job/extensions/mtmv/MTMVTask.java | 1 +
.../java/org/apache/doris/mtmv/BaseTableInfo.java | 20 ++--
.../java/org/apache/doris/mtmv/MTMVJobManager.java | 19 ++-
.../org/apache/doris/mtmv/MTMVRefreshEnum.java | 1 +
.../java/org/apache/doris/mtmv/MTMVService.java | 31 ++++-
.../doris/nereids/parser/LogicalPlanBuilder.java | 3 +
.../doris/transaction/DatabaseTransactionMgr.java | 32 +++++
regression-test/data/mtmv_p0/test_commit_mtmv.out | 40 +++++++
.../suites/mtmv_p0/test_commit_mtmv.groovy | 130 +++++++++++++++++++++
23 files changed, 509 insertions(+), 327 deletions(-)
diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index 2848736f172..04d63492a77 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -215,6 +215,7 @@ buildMode
refreshTrigger
: ON MANUAL
| ON SCHEDULE refreshSchedule
+ | ON COMMIT
;
refreshSchedule
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index fb6926725c7..ee0cb60ed80 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -141,6 +141,8 @@ import org.apache.doris.deploy.DeployManager;
import org.apache.doris.deploy.impl.AmbariDeployManager;
import org.apache.doris.deploy.impl.K8sDeployManager;
import org.apache.doris.deploy.impl.LocalFileDeployManager;
+import org.apache.doris.event.EventProcessor;
+import org.apache.doris.event.ReplacePartitionEvent;
import org.apache.doris.ha.BDBHA;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.ha.HAProtocol;
@@ -537,6 +539,7 @@ public class Env {
private TopicPublisherThread topicPublisherThread;
private MTMVService mtmvService;
+ private EventProcessor eventProcessor;
private InsertOverwriteManager insertOverwriteManager;
@@ -782,6 +785,7 @@ public class Env {
this.topicPublisherThread = new TopicPublisherThread(
"TopicPublisher", Config.publish_topic_info_interval_ms,
systemInfo);
this.mtmvService = new MTMVService();
+ this.eventProcessor = new EventProcessor(mtmvService);
this.insertOverwriteManager = new InsertOverwriteManager();
this.dnsCache = new DNSCache();
this.sqlCacheManager = new NereidsSqlCacheManager();
@@ -853,6 +857,10 @@ public class Env {
return mtmvService;
}
+ public EventProcessor getEventProcessor() {
+ return eventProcessor;
+ }
+
public InsertOverwriteManager getInsertOverwriteManager() {
return insertOverwriteManager;
}
@@ -5679,6 +5687,18 @@ public class Env {
} else {
version = olapTable.getVisibleVersion();
}
+ // Here, we only wait for the EventProcessor to finish processing the
event,
+ // but regardless of the success or failure of the result,
+ // it does not affect the logic of replace the partition
+ try {
+ Env.getCurrentEnv().getEventProcessor().processEvent(
+ new ReplacePartitionEvent(db.getCatalog().getId(),
db.getId(),
+ olapTable.getId()));
+ } catch (Throwable t) {
+ // According to normal logic, no exceptions will be thrown,
+ // but in order to avoid bugs affecting the original logic, all
exceptions are caught
+ LOG.warn("produceEvent failed: ", t);
+ }
// write log
ReplacePartitionOperationLog info =
new ReplacePartitionOperationLog(db.getId(), db.getFullName(),
olapTable.getId(), olapTable.getName(),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index 40c04efedbb..f680f9457d1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -70,6 +70,8 @@ import org.apache.doris.common.util.DebugPointUtil.DebugPoint;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.InternalDatabaseUtil;
import org.apache.doris.common.util.MetaLockUtils;
+import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.event.DataChangeEvent;
import org.apache.doris.load.loadv2.LoadJobFinalOperation;
import org.apache.doris.load.routineload.RLTaskTxnCommitAttachment;
import org.apache.doris.metric.MetricRepo;
@@ -512,6 +514,23 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
MetricRepo.HISTO_TXN_EXEC_LATENCY.update(txnState.getCommitTime()
- txnState.getPrepareTime());
}
afterCommitTxnResp(commitTxnResponse);
+ // Here, we only wait for the EventProcessor to finish processing the
event,
+ // but regardless of the success or failure of the result,
+ // it does not affect the logic of transaction
+ try {
+ produceEvent(dbId, tableList);
+ } catch (Throwable t) {
+ // According to normal logic, no exceptions will be thrown,
+ // but in order to avoid bugs affecting the original logic, all
exceptions are caught
+ LOG.warn("produceEvent failed: ", t);
+ }
+ }
+
+ private void produceEvent(long dbId, List<Table> tableList) {
+ for (Table table : tableList) {
+ Env.getCurrentEnv().getEventProcessor().processEvent(
+ new DataChangeEvent(InternalCatalog.INTERNAL_CATALOG_ID,
dbId, table.getId()));
+ }
}
private List<OlapTable> getMowTableList(List<Table> tableList) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index d562fd62a1b..a0b869ae32a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -141,6 +141,7 @@ import org.apache.doris.datasource.es.EsRepository;
import org.apache.doris.datasource.hive.HMSCachedClient;
import org.apache.doris.datasource.hive.HiveMetadataOps;
import org.apache.doris.datasource.property.constants.HMSProperties;
+import org.apache.doris.event.DropPartitionEvent;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.commands.info.DropMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
@@ -1865,11 +1866,22 @@ public class InternalCatalog implements
CatalogIf<Database> {
}
}
+ // Here, we only wait for the EventProcessor to finish processing the
event,
+ // but regardless of the success or failure of the result,
+ // it does not affect the logic of deleting the partition
+ try {
+ Env.getCurrentEnv().getEventProcessor().processEvent(
+ new DropPartitionEvent(db.getCatalog().getId(), db.getId(),
+ olapTable.getId()));
+ } catch (Throwable t) {
+ // According to normal logic, no exceptions will be thrown,
+ // but in order to avoid bugs affecting the original logic, all
exceptions are caught
+ LOG.warn("produceEvent failed: ", t);
+ }
// log
DropPartitionInfo info = new DropPartitionInfo(db.getId(),
olapTable.getId(), partitionName, isTempPartition,
clause.isForceDrop(), recycleTime, version, versionTime);
Env.getCurrentEnv().getEditLog().logDropPartition(info);
-
LOG.info("succeed in dropping partition[{}], table : [{}-{}], is temp
: {}, is force : {}",
partitionName, olapTable.getId(), olapTable.getName(),
isTempPartition, clause.isForceDrop());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
b/fe/fe-core/src/main/java/org/apache/doris/event/DataChangeEvent.java
similarity index 51%
copy from fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
copy to fe/fe-core/src/main/java/org/apache/doris/event/DataChangeEvent.java
index 0f4f904c573..d58e62bfdde 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/event/DataChangeEvent.java
@@ -15,52 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.mtmv;
+package org.apache.doris.event;
-/**
- * refresh enum
- */
-public class MTMVRefreshEnum {
-
- /**
- * RefreshMethod
- */
- public enum RefreshMethod {
- COMPLETE, //complete
- AUTO //try to update incrementally, if not possible, update in full
- }
-
- /**
- * BuildMode
- */
- public enum BuildMode {
- IMMEDIATE, //right now
- DEFERRED // deferred
- }
-
- /**
- * RefreshTrigger
- */
- public enum RefreshTrigger {
- MANUAL, //manual
- SCHEDULE // schedule
- }
-
- /**
- * MTMVState
- */
- public enum MTMVState {
- INIT,
- NORMAL,
- SCHEMA_CHANGE
- }
-
- /**
- * MTMVRefreshState
- */
- public enum MTMVRefreshState {
- INIT,
- FAIL,
- SUCCESS
+public class DataChangeEvent extends TableEvent {
+ public DataChangeEvent(long ctlId, long dbId, long tableId) {
+ super(EventType.DATA_CHANGE, ctlId, dbId, tableId);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
b/fe/fe-core/src/main/java/org/apache/doris/event/DropPartitionEvent.java
similarity index 51%
copy from fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
copy to fe/fe-core/src/main/java/org/apache/doris/event/DropPartitionEvent.java
index 0f4f904c573..67339ebd05a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/event/DropPartitionEvent.java
@@ -15,52 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.mtmv;
+package org.apache.doris.event;
-/**
- * refresh enum
- */
-public class MTMVRefreshEnum {
-
- /**
- * RefreshMethod
- */
- public enum RefreshMethod {
- COMPLETE, //complete
- AUTO //try to update incrementally, if not possible, update in full
- }
-
- /**
- * BuildMode
- */
- public enum BuildMode {
- IMMEDIATE, //right now
- DEFERRED // deferred
- }
-
- /**
- * RefreshTrigger
- */
- public enum RefreshTrigger {
- MANUAL, //manual
- SCHEDULE // schedule
- }
-
- /**
- * MTMVState
- */
- public enum MTMVState {
- INIT,
- NORMAL,
- SCHEMA_CHANGE
- }
-
- /**
- * MTMVRefreshState
- */
- public enum MTMVRefreshState {
- INIT,
- FAIL,
- SUCCESS
+public class DropPartitionEvent extends TableEvent {
+ public DropPartitionEvent(long ctlId, long dbId, long tableId) {
+ super(EventType.DROP_PARTITION, ctlId, dbId, tableId);
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/event/Event.java
b/fe/fe-core/src/main/java/org/apache/doris/event/Event.java
new file mode 100644
index 00000000000..e049a1aeb8c
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/event/Event.java
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.event;
+
+import org.apache.doris.catalog.Env;
+
+import java.util.Objects;
+
+public abstract class Event {
+ protected final long eventId;
+
+ // eventTime of the event. Used instead of calling getter on event
everytime
+ protected final long eventTime;
+
+ // eventType from the NotificationEvent
+ protected final EventType eventType;
+
+ protected Event(EventType eventType) {
+ Objects.requireNonNull(eventType, "require eventType");
+ this.eventId = Env.getCurrentEnv().getNextId();
+ this.eventTime = System.currentTimeMillis();
+ this.eventType = eventType;
+ }
+
+ public long getEventId() {
+ return eventId;
+ }
+
+ public long getEventTime() {
+ return eventTime;
+ }
+
+ public EventType getEventType() {
+ return eventType;
+ }
+
+ @Override
+ public String toString() {
+ return "Event{"
+ + "eventId=" + eventId
+ + ", eventTime=" + eventTime
+ + ", eventType=" + eventType
+ + '}';
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
b/fe/fe-core/src/main/java/org/apache/doris/event/EventException.java
similarity index 52%
copy from fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
copy to fe/fe-core/src/main/java/org/apache/doris/event/EventException.java
index 0f4f904c573..425ca03d65f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/event/EventException.java
@@ -15,52 +15,20 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.mtmv;
-/**
- * refresh enum
- */
-public class MTMVRefreshEnum {
+package org.apache.doris.event;
- /**
- * RefreshMethod
- */
- public enum RefreshMethod {
- COMPLETE, //complete
- AUTO //try to update incrementally, if not possible, update in full
- }
-
- /**
- * BuildMode
- */
- public enum BuildMode {
- IMMEDIATE, //right now
- DEFERRED // deferred
- }
+public class EventException extends Exception {
- /**
- * RefreshTrigger
- */
- public enum RefreshTrigger {
- MANUAL, //manual
- SCHEDULE // schedule
+ public EventException(String msg, Throwable cause) {
+ super(msg, cause);
}
- /**
- * MTMVState
- */
- public enum MTMVState {
- INIT,
- NORMAL,
- SCHEMA_CHANGE
+ public EventException(String msg) {
+ super(msg);
}
- /**
- * MTMVRefreshState
- */
- public enum MTMVRefreshState {
- INIT,
- FAIL,
- SUCCESS
+ public EventException(Exception e) {
+ super(e);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
b/fe/fe-core/src/main/java/org/apache/doris/event/EventListener.java
similarity index 51%
copy from fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
copy to fe/fe-core/src/main/java/org/apache/doris/event/EventListener.java
index 0f4f904c573..d5c142bf934 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/event/EventListener.java
@@ -15,52 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.mtmv;
+package org.apache.doris.event;
-/**
- * refresh enum
- */
-public class MTMVRefreshEnum {
+public interface EventListener {
- /**
- * RefreshMethod
- */
- public enum RefreshMethod {
- COMPLETE, //complete
- AUTO //try to update incrementally, if not possible, update in full
- }
-
- /**
- * BuildMode
- */
- public enum BuildMode {
- IMMEDIATE, //right now
- DEFERRED // deferred
- }
-
- /**
- * RefreshTrigger
- */
- public enum RefreshTrigger {
- MANUAL, //manual
- SCHEDULE // schedule
- }
-
- /**
- * MTMVState
- */
- public enum MTMVState {
- INIT,
- NORMAL,
- SCHEMA_CHANGE
- }
-
- /**
- * MTMVRefreshState
- */
- public enum MTMVRefreshState {
- INIT,
- FAIL,
- SUCCESS
- }
+ void processEvent(Event event) throws EventException;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/event/EventProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/event/EventProcessor.java
new file mode 100644
index 00000000000..4731a17a372
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/event/EventProcessor.java
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.event;
+
+import com.google.common.collect.Sets;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Objects;
+import java.util.Set;
+
+public class EventProcessor {
+
+ private static final Logger LOG =
LogManager.getLogger(EventProcessor.class);
+
+ private Set<EventListener> listeners = Sets.newHashSet();
+
+ public EventProcessor(EventListener... args) {
+ for (EventListener listener : args) {
+ this.listeners.add(listener);
+ }
+ }
+
+ public boolean processEvent(Event event) {
+ Objects.requireNonNull(event);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("processEvent: {}", event);
+ }
+ boolean result = true;
+ for (EventListener listener : listeners) {
+ try {
+ listener.processEvent(event);
+ } catch (EventException e) {
+ // A listener processing failure does not affect other
listeners
+ LOG.warn("[{}] process event failed, event: {}, errMsg: {}",
listener.getClass().getName(), event,
+ e.getMessage());
+ result = false;
+ }
+ }
+ return result;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
b/fe/fe-core/src/main/java/org/apache/doris/event/EventType.java
similarity index 51%
copy from fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
copy to fe/fe-core/src/main/java/org/apache/doris/event/EventType.java
index 0f4f904c573..be942141fd3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/event/EventType.java
@@ -15,52 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.mtmv;
+package org.apache.doris.event;
-/**
- * refresh enum
- */
-public class MTMVRefreshEnum {
-
- /**
- * RefreshMethod
- */
- public enum RefreshMethod {
- COMPLETE, //complete
- AUTO //try to update incrementally, if not possible, update in full
- }
-
- /**
- * BuildMode
- */
- public enum BuildMode {
- IMMEDIATE, //right now
- DEFERRED // deferred
- }
-
- /**
- * RefreshTrigger
- */
- public enum RefreshTrigger {
- MANUAL, //manual
- SCHEDULE // schedule
- }
-
- /**
- * MTMVState
- */
- public enum MTMVState {
- INIT,
- NORMAL,
- SCHEMA_CHANGE
- }
-
- /**
- * MTMVRefreshState
- */
- public enum MTMVRefreshState {
- INIT,
- FAIL,
- SUCCESS
- }
+public enum EventType {
+ DATA_CHANGE,
+ REPLACE_PARTITION,
+ DROP_PARTITION
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
b/fe/fe-core/src/main/java/org/apache/doris/event/ReplacePartitionEvent.java
similarity index 51%
copy from fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
copy to
fe/fe-core/src/main/java/org/apache/doris/event/ReplacePartitionEvent.java
index 0f4f904c573..371d5cd553c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/event/ReplacePartitionEvent.java
@@ -15,52 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.mtmv;
+package org.apache.doris.event;
-/**
- * refresh enum
- */
-public class MTMVRefreshEnum {
-
- /**
- * RefreshMethod
- */
- public enum RefreshMethod {
- COMPLETE, //complete
- AUTO //try to update incrementally, if not possible, update in full
- }
-
- /**
- * BuildMode
- */
- public enum BuildMode {
- IMMEDIATE, //right now
- DEFERRED // deferred
- }
-
- /**
- * RefreshTrigger
- */
- public enum RefreshTrigger {
- MANUAL, //manual
- SCHEDULE // schedule
- }
-
- /**
- * MTMVState
- */
- public enum MTMVState {
- INIT,
- NORMAL,
- SCHEMA_CHANGE
- }
-
- /**
- * MTMVRefreshState
- */
- public enum MTMVRefreshState {
- INIT,
- FAIL,
- SUCCESS
+public class ReplacePartitionEvent extends TableEvent {
+ public ReplacePartitionEvent(long ctlId, long dbId, long tableId) {
+ super(EventType.REPLACE_PARTITION, ctlId, dbId, tableId);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
b/fe/fe-core/src/main/java/org/apache/doris/event/TableEvent.java
similarity index 52%
copy from fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
copy to fe/fe-core/src/main/java/org/apache/doris/event/TableEvent.java
index 0f4f904c573..210ad2df40f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/event/TableEvent.java
@@ -15,52 +15,38 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.mtmv;
+package org.apache.doris.event;
-/**
- * refresh enum
- */
-public class MTMVRefreshEnum {
+public abstract class TableEvent extends Event {
+ protected final long ctlId;
+ protected final long dbId;
+ protected final long tableId;
- /**
- * RefreshMethod
- */
- public enum RefreshMethod {
- COMPLETE, //complete
- AUTO //try to update incrementally, if not possible, update in full
+ public TableEvent(EventType eventType, long ctlId, long dbId, long
tableId) {
+ super(eventType);
+ this.ctlId = ctlId;
+ this.dbId = dbId;
+ this.tableId = tableId;
}
- /**
- * BuildMode
- */
- public enum BuildMode {
- IMMEDIATE, //right now
- DEFERRED // deferred
+ public long getCtlId() {
+ return ctlId;
}
- /**
- * RefreshTrigger
- */
- public enum RefreshTrigger {
- MANUAL, //manual
- SCHEDULE // schedule
+ public long getDbId() {
+ return dbId;
}
- /**
- * MTMVState
- */
- public enum MTMVState {
- INIT,
- NORMAL,
- SCHEMA_CHANGE
+ public long getTableId() {
+ return tableId;
}
- /**
- * MTMVRefreshState
- */
- public enum MTMVRefreshState {
- INIT,
- FAIL,
- SUCCESS
+ @Override
+ public String toString() {
+ return "TableEvent{"
+ + "ctlId=" + ctlId
+ + ", dbId=" + dbId
+ + ", tableId=" + tableId
+ + "} " + super.toString();
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java
index 4f44b2e14b9..5d7cf4435b9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java
@@ -140,27 +140,44 @@ public class MTMVJob extends AbstractJob<MTMVTask,
MTMVTaskContext> {
/**
* if user trigger, return true
- * if system trigger, Check if there are any system triggered tasks, and
if so, return false
+ * else, only can have 2 task. because every task can refresh all data.
*
* @param taskContext
* @return
*/
@Override
public boolean isReadyForScheduling(MTMVTaskContext taskContext) {
- if (taskContext != null) {
+ if (isManual(taskContext)) {
return true;
}
List<MTMVTask> runningTasks = getRunningTasks();
+ int runningNum = 0;
for (MTMVTask task : runningTasks) {
- if (task.getTaskContext() == null ||
task.getTaskContext().getTriggerMode() == MTMVTaskTriggerMode.SYSTEM) {
- LOG.warn("isReadyForScheduling return false, because current
taskContext is null, exist task: {}",
- task);
- return false;
+ if (!isManual(task.getTaskContext())) {
+ runningNum++;
+ // Prerequisite: Each refresh will calculate which partitions
to refresh
+ //
+ // For example, there is currently a running task that is
refreshing partition p1.
+ // If the data of p2 changes at this time and triggers a
refresh task t2,
+ // according to the logic (>=1), t2 will be lost
+ //
+ // If the logic is >=2, t2 will wait lock of MTMVJob.
+ // If the p3 data changes again and triggers the refresh task
t3,
+ // then t3 will be discarded. However, when t2 runs, both p2
and p3 data will be refreshed.
+ if (runningNum >= 2) {
+ LOG.warn("isReadyForScheduling return false, because
current taskContext is null, exist task: {}",
+ task);
+ return false;
+ }
}
}
return true;
}
+ private boolean isManual(MTMVTaskContext taskContext) {
+ return taskContext != null && taskContext.getTriggerMode() ==
MTMVTaskTriggerMode.MANUAL;
+ }
+
@Override
public ShowResultSetMetaData getJobMetaData() {
return JOB_META_DATA;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
index 240c7de6a71..517909f5e1f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
@@ -107,6 +107,7 @@ public class MTMVTask extends AbstractTask {
public enum MTMVTaskTriggerMode {
MANUAL,
+ COMMIT,
SYSTEM
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java
index 9b3b6be04f1..bc9a3fdd205 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java
@@ -32,18 +32,24 @@ public class BaseTableInfo {
private static final Logger LOG =
LogManager.getLogger(BaseTableInfo.class);
@SerializedName("ti")
- private Long tableId;
+ private long tableId;
@SerializedName("di")
- private Long dbId;
+ private long dbId;
@SerializedName("ci")
- private Long ctlId;
+ private long ctlId;
- public BaseTableInfo(Long tableId, Long dbId) {
+ public BaseTableInfo(long tableId, long dbId) {
this.tableId = java.util.Objects.requireNonNull(tableId, "tableId is
null");
this.dbId = java.util.Objects.requireNonNull(dbId, "dbId is null");
this.ctlId = InternalCatalog.INTERNAL_CATALOG_ID;
}
+ public BaseTableInfo(long tableId, long dbId, long ctlId) {
+ this.tableId = java.util.Objects.requireNonNull(tableId, "tableId is
null");
+ this.dbId = java.util.Objects.requireNonNull(dbId, "dbId is null");
+ this.ctlId = java.util.Objects.requireNonNull(ctlId, "ctlId is null");
+ }
+
public BaseTableInfo(TableIf table) {
DatabaseIf database = table.getDatabase();
java.util.Objects.requireNonNull(database, "database is null");
@@ -54,15 +60,15 @@ public class BaseTableInfo {
this.ctlId = catalog.getId();
}
- public Long getTableId() {
+ public long getTableId() {
return tableId;
}
- public Long getDbId() {
+ public long getDbId() {
return dbId;
}
- public Long getCtlId() {
+ public long getCtlId() {
return ctlId;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
index f53a7b60868..bed44e8d37d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
@@ -45,6 +45,7 @@ import
org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
import org.apache.doris.persist.AlterMTMV;
import org.apache.doris.qe.ConnectContext;
+import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -79,11 +80,10 @@ public class MTMVJobManager implements MTMVHookService {
private JobExecutionConfiguration getJobConfig(MTMV mtmv) {
JobExecutionConfiguration jobExecutionConfiguration = new
JobExecutionConfiguration();
- if (mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger()
- .equals(RefreshTrigger.SCHEDULE)) {
+ RefreshTrigger refreshTrigger =
mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger();
+ if (refreshTrigger.equals(RefreshTrigger.SCHEDULE)) {
setScheduleJobConfig(jobExecutionConfiguration, mtmv);
- } else if
(mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger()
- .equals(RefreshTrigger.MANUAL)) {
+ } else if (refreshTrigger.equals(RefreshTrigger.MANUAL) ||
refreshTrigger.equals(RefreshTrigger.COMMIT)) {
setManualJobConfig(jobExecutionConfiguration, mtmv);
}
return jobExecutionConfiguration;
@@ -210,9 +210,20 @@ public class MTMVJobManager implements MTMVHookService {
job.cancelTaskById(info.getTaskId());
}
+ public void onCommit(MTMV mtmv) throws DdlException, JobException {
+ MTMVJob job = getJobByMTMV(mtmv);
+ MTMVTaskContext mtmvTaskContext = new
MTMVTaskContext(MTMVTaskTriggerMode.COMMIT, Lists.newArrayList(),
+ false);
+ Env.getCurrentEnv().getJobManager().triggerJob(job.getJobId(),
mtmvTaskContext);
+ }
+
private MTMVJob getJobByTableNameInfo(TableNameInfo info) throws
DdlException, MetaNotFoundException {
Database db =
Env.getCurrentInternalCatalog().getDbOrDdlException(info.getDb());
MTMV mtmv = (MTMV) db.getTableOrMetaException(info.getTbl(),
TableType.MATERIALIZED_VIEW);
+ return getJobByMTMV(mtmv);
+ }
+
+ private MTMVJob getJobByMTMV(MTMV mtmv) throws DdlException {
List<MTMVJob> jobs = Env.getCurrentEnv().getJobManager()
.queryJobs(JobType.MV, mtmv.getJobInfo().getJobName());
if (CollectionUtils.isEmpty(jobs) || jobs.size() != 1) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
index 0f4f904c573..b9d27db9c22 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
@@ -43,6 +43,7 @@ public class MTMVRefreshEnum {
*/
public enum RefreshTrigger {
MANUAL, //manual
+ COMMIT, //manual
SCHEDULE // schedule
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java
index cbbaef6b917..d5d86b7eeda 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java
@@ -22,8 +22,13 @@ import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.event.Event;
+import org.apache.doris.event.EventException;
+import org.apache.doris.event.EventListener;
+import org.apache.doris.event.TableEvent;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
+import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshTrigger;
import org.apache.doris.nereids.trees.plans.commands.info.CancelMTMVTaskInfo;
import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
@@ -36,8 +41,9 @@ import org.apache.logging.log4j.Logger;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
-public class MTMVService {
+public class MTMVService implements EventListener {
private static final Logger LOG = LogManager.getLogger(MTMVService.class);
private Map<String, MTMVHookService> hooks = Maps.newConcurrentMap();
@@ -162,4 +168,27 @@ public class MTMVService {
mtmvHookService.cancelMTMVTask(info);
}
}
+
+ @Override
+ public void processEvent(Event event) throws EventException {
+ Objects.requireNonNull(event);
+ if (!(event instanceof TableEvent)) {
+ return;
+ }
+ TableEvent tableEvent = (TableEvent) event;
+ LOG.info("processEvent, Event: {}", event);
+ Set<BaseTableInfo> mtmvs = relationManager.getMtmvsByBaseTableOneLevel(
+ new BaseTableInfo(tableEvent.getTableId(),
tableEvent.getDbId(), tableEvent.getCtlId()));
+ for (BaseTableInfo baseTableInfo : mtmvs) {
+ try {
+ // check if mtmv should trigger by event
+ MTMV mtmv = MTMVUtil.getMTMV(baseTableInfo.getDbId(),
baseTableInfo.getTableId());
+ if
(mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger().equals(RefreshTrigger.COMMIT))
{
+ jobManager.onCommit(mtmv);
+ }
+ } catch (Exception e) {
+ throw new EventException(e);
+ }
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index 0e244d794b0..ee649f331ff 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -704,6 +704,9 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
if (ctx.MANUAL() != null) {
return new MTMVRefreshTriggerInfo(RefreshTrigger.MANUAL);
}
+ if (ctx.COMMIT() != null) {
+ return new MTMVRefreshTriggerInfo(RefreshTrigger.COMMIT);
+ }
if (ctx.SCHEDULE() != null) {
return new MTMVRefreshTriggerInfo(RefreshTrigger.SCHEDULE,
visitRefreshSchedule(ctx.refreshSchedule()));
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index e271146b1e7..686a769dcc9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -50,6 +50,7 @@ import org.apache.doris.common.util.InternalDatabaseUtil;
import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.event.DataChangeEvent;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.BatchRemoveTransactionsOperationV2;
@@ -1145,6 +1146,17 @@ public class DatabaseTransactionMgr {
} finally {
MetaLockUtils.writeUnlockTables(tableList);
}
+ // Here, we only wait for the EventProcessor to finish processing the
event,
+ // but regardless of the success or failure of the result,
+ // it does not affect the logic of transaction
+ try {
+ produceEvent(transactionState, db);
+ } catch (Throwable t) {
+ // According to normal logic, no exceptions will be thrown,
+ // but in order to avoid bugs affecting the original logic, all
exceptions are caught
+ LOG.warn("produceEvent failed: ", t);
+ }
+
// The visible latch should only be counted down after all things are
done
// (finish transaction, write edit log, etc).
// Otherwise, there is no way for stream load to query the result
right after loading finished,
@@ -1174,6 +1186,26 @@ public class DatabaseTransactionMgr {
}
}
+ private void produceEvent(TransactionState transactionState, Database db) {
+ Collection<TableCommitInfo> tableCommitInfos;
+ if (!transactionState.getSubTxnIdToTableCommitInfo().isEmpty()) {
+ tableCommitInfos = transactionState.getSubTxnTableCommitInfos();
+ } else {
+ tableCommitInfos =
transactionState.getIdToTableCommitInfos().values();
+ }
+ for (TableCommitInfo tableCommitInfo : tableCommitInfos) {
+ long tableId = tableCommitInfo.getTableId();
+ OlapTable table = (OlapTable) db.getTableNullable(tableId);
+ if (table == null) {
+ LOG.warn("table {} does not exist when produceEvent.
transaction: {}, db: {}",
+ tableId, transactionState.getTransactionId(),
db.getId());
+ continue;
+ }
+ Env.getCurrentEnv().getEventProcessor().processEvent(
+ new DataChangeEvent(db.getCatalog().getId(), db.getId(),
tableId));
+ }
+ }
+
private boolean finishCheckPartitionVersion(TransactionState
transactionState, Database db,
List<Pair<OlapTable, Partition>> relatedTblPartitions) {
Iterator<TableCommitInfo> tableCommitInfoIterator
diff --git a/regression-test/data/mtmv_p0/test_commit_mtmv.out
b/regression-test/data/mtmv_p0/test_commit_mtmv.out
new file mode 100644
index 00000000000..fafb8f883a4
--- /dev/null
+++ b/regression-test/data/mtmv_p0/test_commit_mtmv.out
@@ -0,0 +1,40 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !mv1 --
+1 2017-01-15 1
+2 2017-02-15 2
+3 2017-03-15 3
+
+-- !task1 --
+{"triggerMode":"COMMIT","partitions":[],"isComplete":false}
+
+-- !mv2 --
+1 2017-01-15 1
+2 2017-02-15 2
+3 2017-03-15 3
+
+-- !task2 --
+{"triggerMode":"COMMIT","partitions":[],"isComplete":false}
+
+-- !mv1_2 --
+1 2017-01-15 1
+1 2017-01-15 1
+2 2017-02-15 2
+3 2017-03-15 3
+
+-- !mv2_2 --
+1 2017-01-15 1
+2 2017-02-15 2
+3 2017-03-15 3
+
+-- !mv1_init --
+1 2017-01-15 1
+2 2017-02-15 2
+3 2017-03-15 3
+
+-- !mv1_drop --
+2 2017-02-15 2
+3 2017-03-15 3
+
+-- !mv1_replace --
+3 2017-03-15 3
+
diff --git a/regression-test/suites/mtmv_p0/test_commit_mtmv.groovy
b/regression-test/suites/mtmv_p0/test_commit_mtmv.groovy
new file mode 100644
index 00000000000..cd02dcd57d7
--- /dev/null
+++ b/regression-test/suites/mtmv_p0/test_commit_mtmv.groovy
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_commit_mtmv") {
+ def tableName = "test_commit_mtmv_table"
+ def mvName1 = "test_commit_mtmv1"
+ def mvName2 = "test_commit_mtmv2"
+ def dbName = "regression_test_mtmv_p0"
+ sql """drop materialized view if exists ${mvName1};"""
+ sql """drop materialized view if exists ${mvName2};"""
+ sql """drop table if exists `${tableName}`"""
+ sql """
+ CREATE TABLE IF NOT EXISTS `${tableName}` (
+ `user_id` LARGEINT NOT NULL COMMENT '\"用户id\"',
+ `date` DATE NOT NULL COMMENT '\"数据灌入日期时间\"',
+ `num` SMALLINT NOT NULL COMMENT '\"数量\"'
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`user_id`, `date`, `num`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`user_id`) BUCKETS 2
+ PROPERTIES ('replication_num' = '1') ;
+ """
+
+ sql """
+ CREATE MATERIALIZED VIEW ${mvName1}
+ BUILD DEFERRED REFRESH AUTO ON COMMIT
+ DISTRIBUTED BY RANDOM BUCKETS 2
+ PROPERTIES ('replication_num' = '1')
+ AS
+ SELECT * FROM ${tableName};
+ """
+ sql """
+ CREATE MATERIALIZED VIEW ${mvName2}
+ BUILD DEFERRED REFRESH AUTO ON COMMIT
+ DISTRIBUTED BY RANDOM BUCKETS 2
+ PROPERTIES ('replication_num' = '1')
+ AS
+ SELECT * FROM ${mvName1};
+ """
+ sql """
+ insert into ${tableName}
values(1,"2017-01-15",1),(2,"2017-02-15",2),(3,"2017-03-15",3);;
+ """
+ def jobName1 = getJobName(dbName, mvName1);
+ waitingMTMVTaskFinished(jobName1)
+ order_qt_mv1 "SELECT * FROM ${mvName1}"
+ order_qt_task1 "SELECT TaskContext from tasks('type'='mv') where
MvName='${mvName1}' order by CreateTime desc limit 1"
+
+ def jobName2 = getJobName(dbName, mvName2);
+ waitingMTMVTaskFinished(jobName2)
+ order_qt_mv2 "SELECT * FROM ${mvName2}"
+ order_qt_task2 "SELECT TaskContext from tasks('type'='mv') where
MvName='${mvName2}' order by CreateTime desc limit 1"
+
+ // on manual can not trigger by commit
+ sql """
+ alter MATERIALIZED VIEW ${mvName2} REFRESH ON MANUAL;
+ """
+
+ sql """
+ insert into ${tableName} values(1,"2017-01-15",1);;
+ """
+ waitingMTMVTaskFinished(jobName1)
+ order_qt_mv1_2 "SELECT * FROM ${mvName1}"
+ waitingMTMVTaskFinished(jobName2)
+ order_qt_mv2_2 "SELECT * FROM ${mvName2}"
+
+ sql """drop materialized view if exists ${mvName1};"""
+ sql """drop materialized view if exists ${mvName2};"""
+ sql """drop table if exists `${tableName}`"""
+
+ // test drop partition
+ sql """
+ CREATE TABLE IF NOT EXISTS `${tableName}` (
+ `user_id` LARGEINT NOT NULL COMMENT '\"用户id\"',
+ `date` DATE NOT NULL COMMENT '\"数据灌入日期时间\"',
+ `num` SMALLINT NOT NULL COMMENT '\"数量\"'
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`user_id`, `date`, `num`)
+ COMMENT 'OLAP'
+ PARTITION BY RANGE(`date`)
+ (PARTITION p201701 VALUES [('0000-01-01'), ('2017-02-01')),
+ PARTITION p201702 VALUES [('2017-02-01'), ('2017-03-01')),
+ PARTITION p201703 VALUES [('2017-03-01'), ('2017-04-01')))
+ DISTRIBUTED BY HASH(`user_id`) BUCKETS 2
+ PROPERTIES ('replication_num' = '1') ;
+ """
+ sql """
+ CREATE MATERIALIZED VIEW ${mvName1}
+ BUILD DEFERRED REFRESH AUTO ON COMMIT
+ PARTITION BY (`date`)
+ DISTRIBUTED BY RANDOM BUCKETS 2
+ PROPERTIES ('replication_num' = '1')
+ AS
+ SELECT * FROM ${tableName};
+ """
+ sql """
+ insert into ${tableName}
values(1,"2017-01-15",1),(2,"2017-02-15",2),(3,"2017-03-15",3);;
+ """
+ jobName1 = getJobName(dbName, mvName1);
+ waitingMTMVTaskFinished(jobName1)
+ order_qt_mv1_init "SELECT * FROM ${mvName1}"
+
+ sql """alter table ${tableName} drop PARTITION p201701"""
+ waitingMTMVTaskFinished(jobName1)
+ order_qt_mv1_drop "SELECT * FROM ${mvName1}"
+
+ // test replace partition
+ sql """ALTER TABLE ${tableName} ADD TEMPORARY PARTITION p201702_t VALUES
[('2017-02-01'), ('2017-03-01'));"""
+ sql """ALTER TABLE ${tableName} REPLACE PARTITION (p201702) WITH TEMPORARY
PARTITION (p201702_t);"""
+ waitingMTMVTaskFinished(jobName1)
+ order_qt_mv1_replace "SELECT * FROM ${mvName1}"
+
+ sql """drop materialized view if exists ${mvName1};"""
+ sql """drop materialized view if exists ${mvName2};"""
+ sql """drop table if exists `${tableName}`"""
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]