This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a5b10d5df0a [enhance](mtmv)use version instead of timestamp (#30599)
a5b10d5df0a is described below
commit a5b10d5df0ad73c80b9b736be4def2b71c1921a3
Author: zhangdong <[email protected]>
AuthorDate: Thu Feb 1 14:42:47 2024 +0800
[enhance](mtmv)use version instead of timestamp (#30599)
MTMV records snapshot information for each refresh of data, used to compare
whether partitions need to be updated
---
.../main/java/org/apache/doris/alter/Alter.java | 2 +-
.../main/java/org/apache/doris/catalog/Env.java | 5 +-
.../main/java/org/apache/doris/catalog/MTMV.java | 16 +-
.../java/org/apache/doris/catalog/OlapTable.java | 30 ++--
.../doris/catalog/external/HMSExternalTable.java | 91 +++++++++---
.../apache/doris/job/extensions/mtmv/MTMVTask.java | 23 ++-
.../doris/mtmv/MTMVMaxTimestampSnapshot.java | 59 ++++++++
.../doris/mtmv/MTMVRefreshPartitionSnapshot.java | 43 ++++++
.../org/apache/doris/mtmv/MTMVRefreshSnapshot.java | 75 ++++++++++
.../org/apache/doris/mtmv/MTMVRelatedTableIf.java | 38 +++--
.../java/org/apache/doris/mtmv/MTMVSnapshotIf.java | 24 +++
.../apache/doris/mtmv/MTMVTimestampSnapshot.java | 51 +++++++
.../main/java/org/apache/doris/mtmv/MTMVUtil.java | 164 +++++++++++++++------
.../org/apache/doris/mtmv/MTMVVersionSnapshot.java | 47 ++++++
.../java/org/apache/doris/persist/AlterMTMV.java | 12 ++
.../org/apache/doris/persist/gson/GsonUtils.java | 11 ++
16 files changed, 591 insertions(+), 100 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
index a5a12860d62..5de7fee4fa0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
@@ -893,7 +893,7 @@ public class Alter {
mtmv.alterMvProperties(alterMTMV.getMvProperties());
break;
case ADD_TASK:
- mtmv.addTaskResult(alterMTMV.getTask(),
alterMTMV.getRelation());
+ mtmv.addTaskResult(alterMTMV.getTask(),
alterMTMV.getRelation(), alterMTMV.getPartitionSnapshots());
Env.getCurrentEnv().getMtmvService()
.refreshComplete(mtmv, alterMTMV.getRelation(),
alterMTMV.getTask());
break;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 5dd1740aeaa..fefe9cbd110 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -178,6 +178,7 @@ import
org.apache.doris.master.PartitionInMemoryInfoCollector;
import org.apache.doris.meta.MetaContext;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mtmv.MTMVAlterOpType;
+import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot;
import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mtmv.MTMVService;
import org.apache.doris.mtmv.MTMVStatus;
@@ -5980,10 +5981,12 @@ public class Env {
this.alter.processAlterMTMV(alter, false);
}
- public void addMTMVTaskResult(TableNameInfo mvName, MTMVTask task,
MTMVRelation relation) {
+ public void addMTMVTaskResult(TableNameInfo mvName, MTMVTask task,
MTMVRelation relation,
+ Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots) {
AlterMTMV alter = new AlterMTMV(mvName, MTMVAlterOpType.ADD_TASK);
alter.setTask(task);
alter.setRelation(relation);
+ alter.setPartitionSnapshots(partitionSnapshots);
this.alter.processAlterMTMV(alter, false);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
index 9cd5bdac78a..f0730f0d371 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
@@ -32,6 +32,8 @@ import org.apache.doris.mtmv.MTMVPlanUtil;
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState;
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
import org.apache.doris.mtmv.MTMVRefreshInfo;
+import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot;
+import org.apache.doris.mtmv.MTMVRefreshSnapshot;
import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mtmv.MTMVStatus;
import org.apache.doris.persist.gson.GsonUtils;
@@ -69,6 +71,8 @@ public class MTMV extends OlapTable {
private MTMVRelation relation;
@SerializedName("mpi")
private MTMVPartitionInfo mvPartitionInfo;
+ @SerializedName("rs")
+ private MTMVRefreshSnapshot refreshSnapshot;
// Should update after every fresh, not persist
private MTMVCache cache;
@@ -96,6 +100,7 @@ public class MTMV extends OlapTable {
this.mvProperties = params.mvProperties;
this.mvPartitionInfo = params.mvPartitionInfo;
this.relation = params.relation;
+ this.refreshSnapshot = new MTMVRefreshSnapshot();
mvRwLock = new ReentrantReadWriteLock(true);
}
@@ -145,7 +150,8 @@ public class MTMV extends OlapTable {
}
}
- public void addTaskResult(MTMVTask task, MTMVRelation relation) {
+ public void addTaskResult(MTMVTask task, MTMVRelation relation,
+ Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots) {
try {
writeMvLock();
if (task.getStatus() == TaskStatus.SUCCESS) {
@@ -165,6 +171,7 @@ public class MTMV extends OlapTable {
this.status.setRefreshState(MTMVRefreshState.FAIL);
}
this.jobInfo.addHistoryTask(task);
+ this.refreshSnapshot.updateSnapshots(partitionSnapshots,
getPartitionNames());
} finally {
writeMvUnlock();
}
@@ -177,7 +184,7 @@ public class MTMV extends OlapTable {
public long getGracePeriod() {
if
(mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)) {
- return
Long.parseLong(mvProperties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD));
+ return
Long.parseLong(mvProperties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)) *
1000;
} else {
return 0L;
}
@@ -222,6 +229,10 @@ public class MTMV extends OlapTable {
return mvPartitionInfo;
}
+ public MTMVRefreshSnapshot getRefreshSnapshot() {
+ return refreshSnapshot;
+ }
+
public void readMvLock() {
this.mvRwLock.readLock().lock();
}
@@ -256,6 +267,7 @@ public class MTMV extends OlapTable {
mvProperties = materializedView.mvProperties;
relation = materializedView.relation;
mvPartitionInfo = materializedView.mvPartitionInfo;
+ refreshSnapshot = materializedView.refreshSnapshot;
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 4cdc1bb9945..35df110819f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -48,6 +48,8 @@ import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.Util;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
+import org.apache.doris.mtmv.MTMVSnapshotIf;
+import org.apache.doris.mtmv.MTMVVersionSnapshot;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.resource.Tag;
@@ -2571,25 +2573,25 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf {
}
@Override
- public long getPartitionLastModifyTime(long partitionId, PartitionItem
item) throws AnalysisException {
- return
getPartitionOrAnalysisException(partitionId).getVisibleVersionTimeIgnoreInit();
+ public List<Column> getPartitionColumns() {
+ return getPartitionInfo().getPartitionColumns();
}
@Override
- public long getLastModifyTime() {
- long result = 0L;
- long visibleVersionTime;
- for (Partition partition : getAllPartitions()) {
- visibleVersionTime = partition.getVisibleVersionTimeIgnoreInit();
- if (visibleVersionTime > result) {
- result = visibleVersionTime;
- }
- }
- return result;
+ public MTMVSnapshotIf getPartitionSnapshot(long partitionId) throws
AnalysisException {
+ long visibleVersion =
getPartitionOrAnalysisException(partitionId).getVisibleVersion();
+ return new MTMVVersionSnapshot(visibleVersion);
}
@Override
- public List<Column> getPartitionColumns() {
- return getPartitionInfo().getPartitionColumns();
+ public MTMVSnapshotIf getTableSnapshot() {
+ long visibleVersion = getVisibleVersion();
+ return new MTMVVersionSnapshot(visibleVersion);
+ }
+
+ @Override
+ public String getPartitionName(long partitionId) throws AnalysisException {
+ return getPartitionOrAnalysisException(partitionId).getName();
}
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
index 96efdb8de13..aa4258baadb 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
@@ -33,7 +33,10 @@ import org.apache.doris.datasource.hive.HMSCachedClient;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.datasource.hive.HivePartition;
import org.apache.doris.external.iceberg.util.IcebergUtils;
+import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
+import org.apache.doris.mtmv.MTMVSnapshotIf;
+import org.apache.doris.mtmv.MTMVTimestampSnapshot;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
@@ -452,6 +455,15 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
return initSchema();
}
+ public long getLastDdlTime() {
+ makeSureInitialized();
+ Map<String, String> parameters = remoteTable.getParameters();
+ if (parameters == null ||
!parameters.containsKey(TBL_PROP_TRANSIENT_LAST_DDL_TIME)) {
+ return 0L;
+ }
+ return
Long.parseLong(parameters.get(TBL_PROP_TRANSIENT_LAST_DDL_TIME)) * 1000;
+ }
+
@Override
public List<Column> initSchema() {
makeSureInitialized();
@@ -561,7 +573,7 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
public boolean hasColumnStatistics(String colName) {
Map<String, String> parameters = remoteTable.getParameters();
return parameters.keySet().stream()
- .filter(k -> k.startsWith(SPARK_COL_STATS + colName +
".")).findAny().isPresent();
+ .filter(k -> k.startsWith(SPARK_COL_STATS + colName +
".")).findAny().isPresent();
}
public boolean fillColumnStatistics(String colName, Map<StatsType, String>
statsTypes, Map<String, String> stats) {
@@ -772,13 +784,13 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
@Override
public boolean isDistributionColumn(String columnName) {
return
getRemoteTable().getSd().getBucketCols().stream().map(String::toLowerCase)
- .collect(Collectors.toSet()).contains(columnName.toLowerCase());
+
.collect(Collectors.toSet()).contains(columnName.toLowerCase());
}
@Override
public Set<String> getDistributionColumnNames() {
return
getRemoteTable().getSd().getBucketCols().stream().map(String::toLowerCase)
- .collect(Collectors.toSet());
+ .collect(Collectors.toSet());
}
@Override
@@ -805,32 +817,73 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
}
@Override
- public long getPartitionLastModifyTime(long partitionId, PartitionItem
item) throws AnalysisException {
- List<List<String>> partitionValuesList =
Lists.newArrayListWithCapacity(1);
- partitionValuesList.add(
- ((ListPartitionItem)
item).getItems().get(0).getPartitionValuesAsStringListForHive());
+ public String getPartitionName(long partitionId) throws AnalysisException {
+ Map<String, Long> partitionNameToIdMap =
getHivePartitionValues().getPartitionNameToIdMap();
+ for (Entry<String, Long> entry : partitionNameToIdMap.entrySet()) {
+ if (entry.getValue().equals(partitionId)) {
+ return entry.getKey();
+ }
+ }
+ throw new AnalysisException("can not find partition, partitionId: " +
partitionId);
+ }
+
+ private HiveMetaStoreCache.HivePartitionValues getHivePartitionValues() {
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) getCatalog());
- List<HivePartition> resPartitions =
cache.getAllPartitionsWithCache(getDbName(), getName(),
- partitionValuesList);
- if (resPartitions.size() != 1) {
- throw new AnalysisException("partition not normal, size: " +
resPartitions.size());
- }
- return resPartitions.get(0).getLastModifiedTimeIgnoreInit();
+ return cache.getPartitionValues(
+ getDbName(), getName(), getPartitionColumnTypes());
}
@Override
- public long getLastModifyTime() throws AnalysisException {
+ public MTMVSnapshotIf getPartitionSnapshot(long partitionId) throws
AnalysisException {
+ long partitionLastModifyTime = getPartitionLastModifyTime(partitionId);
+ return new MTMVTimestampSnapshot(partitionLastModifyTime);
+ }
- long result = 0L;
+ @Override
+ public MTMVSnapshotIf getTableSnapshot() throws AnalysisException {
+ if (getPartitionType() == PartitionType.UNPARTITIONED) {
+ return new MTMVMaxTimestampSnapshot(-1L, getLastDdlTime());
+ }
+ long partitionId = 0L;
+ long maxVersionTime = 0L;
long visibleVersionTime;
for (Entry<Long, PartitionItem> entry :
getPartitionItems().entrySet()) {
- visibleVersionTime = getPartitionLastModifyTime(entry.getKey(),
entry.getValue());
- if (visibleVersionTime > result) {
- result = visibleVersionTime;
+ visibleVersionTime = getPartitionLastModifyTime(entry.getKey());
+ if (visibleVersionTime > maxVersionTime) {
+ maxVersionTime = visibleVersionTime;
+ partitionId = entry.getKey();
}
}
- return result;
+ return new MTMVMaxTimestampSnapshot(partitionId, maxVersionTime);
+ }
+
+ private long getPartitionLastModifyTime(long partitionId) throws
AnalysisException {
+ return getPartitionById(partitionId).getLastModifiedTime();
+ }
+
+ private HivePartition getPartitionById(long partitionId) throws
AnalysisException {
+ PartitionItem item = getPartitionItems().get(partitionId);
+ List<List<String>> partitionValuesList =
transferPartitionItemToPartitionValues(item);
+ List<HivePartition> partitions =
getPartitionsByPartitionValues(partitionValuesList);
+ if (partitions.size() != 1) {
+ throw new AnalysisException("partition not normal, size: " +
partitions.size());
+ }
+ return partitions.get(0);
+ }
+
+ private List<HivePartition>
getPartitionsByPartitionValues(List<List<String>> partitionValuesList) {
+ HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
+ .getMetaStoreCache((HMSExternalCatalog) getCatalog());
+ return cache.getAllPartitionsWithCache(getDbName(), getName(),
+ partitionValuesList);
+ }
+
+ private List<List<String>>
transferPartitionItemToPartitionValues(PartitionItem item) {
+ List<List<String>> partitionValuesList =
Lists.newArrayListWithCapacity(1);
+ partitionValuesList.add(
+ ((ListPartitionItem)
item).getItems().get(0).getPartitionValuesAsStringListForHive());
+ return partitionValuesList;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
index 9b5added1b9..194172a6732 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
@@ -38,6 +38,7 @@ import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import org.apache.doris.mtmv.MTMVPlanUtil;
import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod;
+import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot;
import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
@@ -133,6 +134,7 @@ public class MTMVTask extends AbstractTask {
private MTMV mtmv;
private MTMVRelation relation;
private StmtExecutor executor;
+ private Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots;
public MTMVTask() {
}
@@ -155,8 +157,9 @@ public class MTMVTask extends AbstractTask {
// Every time a task is run, the relation is regenerated because
baseTables and baseViews may change,
// such as deleting a table and creating a view with the same name
this.relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx);
- // Before obtaining information from hmsTable, refresh to ensure
that the data is up-to-date
- refreshHmsTable();
+ // Now, the MTMV first ensures consistency with the data in the
cache.
+ // To be completely consistent with hive, you need to manually
refresh the cache
+ // refreshHmsTable();
if (mtmv.getMvPartitionInfo().getPartitionType() ==
MTMVPartitionType.FOLLOW_BASE_TABLE) {
MTMVUtil.alignMvPartition(mtmv,
mtmv.getMvPartitionInfo().getRelatedTable());
}
@@ -171,6 +174,7 @@ public class MTMVTask extends AbstractTask {
int refreshPartitionNum = mtmv.getRefreshPartitionNum();
long execNum = (needRefreshPartitionIds.size() /
refreshPartitionNum) + ((needRefreshPartitionIds.size()
% refreshPartitionNum) > 0 ? 1 : 0);
+ this.partitionSnapshots = Maps.newHashMap();
for (int i = 0; i < execNum; i++) {
int start = i * refreshPartitionNum;
int end = start + refreshPartitionNum;
@@ -178,8 +182,11 @@ public class MTMVTask extends AbstractTask {
.subList(start, end > needRefreshPartitionIds.size() ?
needRefreshPartitionIds.size() : end));
// need get names before exec
List<String> execPartitionNames =
MTMVUtil.getPartitionNamesByIds(mtmv, execPartitionIds);
+ Map<String, MTMVRefreshPartitionSnapshot>
execPartitionSnapshots = MTMVUtil
+ .generatePartitionSnapshots(mtmv,
relation.getBaseTables(), execPartitionIds);
exec(ctx, execPartitionIds, tableWithPartKey);
completedPartitions.addAll(execPartitionNames);
+ partitionSnapshots.putAll(execPartitionSnapshots);
}
} catch (Throwable e) {
LOG.warn("run task failed: ", e);
@@ -241,6 +248,12 @@ public class MTMVTask extends AbstractTask {
}
}
+ /**
+ * // Before obtaining information from hmsTable, refresh to ensure that
the data is up-to-date
+ *
+ * @throws AnalysisException
+ * @throws DdlException
+ */
private void refreshHmsTable() throws AnalysisException, DdlException {
for (BaseTableInfo tableInfo : relation.getBaseTables()) {
TableIf tableIf = MTMVUtil.getTable(tableInfo);
@@ -345,11 +358,13 @@ public class MTMVTask extends AbstractTask {
private void after() {
if (mtmv != null) {
Env.getCurrentEnv()
- .addMTMVTaskResult(new
TableNameInfo(mtmv.getQualifiedDbName(), mtmv.getName()), this, relation);
+ .addMTMVTaskResult(new
TableNameInfo(mtmv.getQualifiedDbName(), mtmv.getName()), this, relation,
+ partitionSnapshots);
}
mtmv = null;
relation = null;
executor = null;
+ partitionSnapshots = null;
}
private Map<TableIf, String> getIncrementalTableMap() throws
AnalysisException {
@@ -384,7 +399,7 @@ public class MTMVTask extends AbstractTask {
// check if data is fresh
// We need to use a newly generated relationship and cannot retrieve
it using mtmv.getRelation()
// to avoid rebuilding the baseTable and causing a change in the
tableId
- boolean fresh = MTMVUtil.isMTMVSync(mtmv, relation.getBaseTables(),
mtmv.getExcludedTriggerTables(), 0L);
+ boolean fresh = MTMVUtil.isMTMVSync(mtmv, relation.getBaseTables(),
mtmv.getExcludedTriggerTables());
if (fresh) {
return Lists.newArrayList();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVMaxTimestampSnapshot.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVMaxTimestampSnapshot.java
new file mode 100644
index 00000000000..5b551cebef6
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVMaxTimestampSnapshot.java
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import com.google.common.base.Objects;
+import com.google.gson.annotations.SerializedName;
+
+/**
+ * The version cannot be obtained from the hive table,
+ * so the update time is used instead of the version
+ */
+public class MTMVMaxTimestampSnapshot implements MTMVSnapshotIf {
+ // partitionId corresponding to timestamp
+ // The reason why both timestamp and partitionId are stored is to avoid
+ // deleting the partition corresponding to timestamp
+ @SerializedName("p")
+ private long partitionId;
+ // The maximum modify time in all partitions
+ @SerializedName("t")
+ private long timestamp;
+
+ public MTMVMaxTimestampSnapshot(long partitionId, long timestamp) {
+ this.partitionId = partitionId;
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ MTMVMaxTimestampSnapshot that = (MTMVMaxTimestampSnapshot) o;
+ return partitionId == that.partitionId
+ && timestamp == that.timestamp;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(partitionId, timestamp);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java
new file mode 100644
index 00000000000..8de2b4cdfed
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import com.google.common.collect.Maps;
+import com.google.gson.annotations.SerializedName;
+
+import java.util.Map;
+
+public class MTMVRefreshPartitionSnapshot {
+ @SerializedName("p")
+ private Map<String, MTMVSnapshotIf> partitions;
+ @SerializedName("t")
+ private Map<Long, MTMVSnapshotIf> tables;
+
+ public MTMVRefreshPartitionSnapshot() {
+ this.partitions = Maps.newConcurrentMap();
+ this.tables = Maps.newConcurrentMap();
+ }
+
+ public Map<String, MTMVSnapshotIf> getPartitions() {
+ return partitions;
+ }
+
+ public Map<Long, MTMVSnapshotIf> getTables() {
+ return tables;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshSnapshot.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshSnapshot.java
new file mode 100644
index 00000000000..5132f06a12e
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshSnapshot.java
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import com.google.common.collect.Maps;
+import com.google.gson.annotations.SerializedName;
+import org.apache.commons.collections.MapUtils;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+public class MTMVRefreshSnapshot {
+ @SerializedName("ps")
+ private Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots;
+
+ public MTMVRefreshSnapshot() {
+ this.partitionSnapshots = Maps.newConcurrentMap();
+ }
+
+ public boolean equalsWithRelatedPartition(String mtmvPartitionName, String
relatedPartitionName,
+ MTMVSnapshotIf relatedPartitionCurrentSnapshot) {
+ MTMVRefreshPartitionSnapshot partitionSnapshot =
partitionSnapshots.get(mtmvPartitionName);
+ if (partitionSnapshot == null) {
+ return false;
+ }
+ MTMVSnapshotIf relatedPartitionSnapshot =
partitionSnapshot.getPartitions().get(relatedPartitionName);
+ if (relatedPartitionSnapshot == null) {
+ return false;
+ }
+ return
relatedPartitionSnapshot.equals(relatedPartitionCurrentSnapshot);
+ }
+
+ public boolean equalsWithBaseTable(String mtmvPartitionName, long
baseTableId,
+ MTMVSnapshotIf baseTableCurrentSnapshot) {
+ MTMVRefreshPartitionSnapshot partitionSnapshot =
partitionSnapshots.get(mtmvPartitionName);
+ if (partitionSnapshot == null) {
+ return false;
+ }
+ MTMVSnapshotIf relatedPartitionSnapshot =
partitionSnapshot.getTables().get(baseTableId);
+ if (relatedPartitionSnapshot == null) {
+ return false;
+ }
+ return relatedPartitionSnapshot.equals(baseTableCurrentSnapshot);
+ }
+
+ public void updateSnapshots(Map<String, MTMVRefreshPartitionSnapshot>
addPartitionSnapshots,
+ Set<String> mvPartitionNames) {
+ if (!MapUtils.isEmpty(addPartitionSnapshots)) {
+ this.partitionSnapshots.putAll(addPartitionSnapshots);
+ }
+ Iterator<String> iterator = partitionSnapshots.keySet().iterator();
+ while (iterator.hasNext()) {
+ String partitionName = iterator.next();
+ if (!mvPartitionNames.contains(partitionName)) {
+ iterator.remove();
+ }
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java
index d4a9cf3aca7..51773db0df1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java
@@ -40,16 +40,6 @@ public interface MTMVRelatedTableIf extends TableIf {
*/
Map<Long, PartitionItem> getPartitionItems();
- /**
- * Obtain the latest update time of partition data
- *
- * @param partitionId
- * @param item
- * @return millisecond
- * @throws AnalysisException
- */
- long getPartitionLastModifyTime(long partitionId, PartitionItem item)
throws AnalysisException;
-
/**
* getPartitionType LIST/RANGE/UNPARTITIONED
*
@@ -66,17 +56,35 @@ public interface MTMVRelatedTableIf extends TableIf {
Set<String> getPartitionColumnNames() throws DdlException;
/**
- * Obtain the latest update time of table data
+ * getPartitionColumns
*
* @return
+ */
+ List<Column> getPartitionColumns();
+
+ /**
+ * getPartitionSnapshot
+ *
+ * @param partitionId
+ * @return partition snapshot at current time
* @throws AnalysisException
*/
- long getLastModifyTime() throws AnalysisException;
+ MTMVSnapshotIf getPartitionSnapshot(long partitionId) throws
AnalysisException;
/**
- * getPartitionColumns
+ * getTableSnapshot
*
- * @return
+ * @return table snapshot at current time
+ * @throws AnalysisException
*/
- List<Column> getPartitionColumns();
+ MTMVSnapshotIf getTableSnapshot() throws AnalysisException;
+
+ /**
+ * getPartitionName
+ *
+ * @param partitionId
+ * @return partitionName
+ * @throws AnalysisException
+ */
+ String getPartitionName(long partitionId) throws AnalysisException;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVSnapshotIf.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVSnapshotIf.java
new file mode 100644
index 00000000000..9a15ab7ef90
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVSnapshotIf.java
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+/**
+ * MTMV refresh snapshot
+ */
+public interface MTMVSnapshotIf {
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTimestampSnapshot.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTimestampSnapshot.java
new file mode 100644
index 00000000000..b3fd88a94de
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTimestampSnapshot.java
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import com.google.common.base.Objects;
+import com.google.gson.annotations.SerializedName;
+
+/**
+ * The version cannot be obtained from the hive table,
+ * so the update time is used instead of the version
+ */
+public class MTMVTimestampSnapshot implements MTMVSnapshotIf {
+ @SerializedName("t")
+ private long timestamp;
+
+ public MTMVTimestampSnapshot(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ MTMVTimestampSnapshot that = (MTMVTimestampSnapshot) o;
+ return timestamp == that.timestamp;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(timestamp);
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
index 481670d9448..74593e5def5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
@@ -73,12 +73,11 @@ public class MTMVUtil {
* @param partitionId
* @param tables
* @param excludedTriggerTables
- * @param gracePeriod
* @return
* @throws AnalysisException
*/
private static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId,
Set<BaseTableInfo> tables,
- Set<String> excludedTriggerTables, Long gracePeriod) throws
AnalysisException {
+ Set<String> excludedTriggerTables) throws AnalysisException {
boolean isSyncWithPartition = true;
if (mtmv.getMvPartitionInfo().getPartitionType() ==
MTMVPartitionType.FOLLOW_BASE_TABLE) {
MTMVRelatedTableIf relatedTable =
mtmv.getMvPartitionInfo().getRelatedTable();
@@ -92,12 +91,9 @@ public class MTMVUtil {
LOG.warn("can not found related partition: " + partitionId);
return false;
}
- isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, item,
relatedTable, relatedPartitionId,
- relatedPartitionItems.get(relatedPartitionId));
+ isSyncWithPartition = isSyncWithPartition(mtmv, partitionId,
relatedTable, relatedPartitionId);
}
- return isSyncWithPartition && isFresherThanTables(
-
mtmv.getPartitionOrAnalysisException(partitionId).getVisibleVersionTimeIgnoreInit(),
tables,
- excludedTriggerTables, gracePeriod);
+ return isSyncWithPartition && isSyncWithAllBaseTables(mtmv,
partitionId, tables, excludedTriggerTables);
}
@@ -158,7 +154,7 @@ public class MTMVUtil {
return false;
}
try {
- return isMTMVSync(mtmv, mtmvRelation.getBaseTables(),
Sets.newHashSet(), 0L);
+ return isMTMVSync(mtmv, mtmvRelation.getBaseTables(),
Sets.newHashSet());
} catch (AnalysisException e) {
LOG.warn("isMTMVSync failed: ", e);
return false;
@@ -171,16 +167,14 @@ public class MTMVUtil {
* @param mtmv
* @param tables
* @param excludeTables
- * @param gracePeriod
* @return
* @throws AnalysisException
*/
- public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables,
Set<String> excludeTables, long gracePeriod)
+ public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables,
Set<String> excludeTables)
throws AnalysisException {
Collection<Partition> partitions = mtmv.getPartitions();
for (Partition partition : partitions) {
- if (!isMTMVPartitionSync(mtmv, partition.getId(), tables,
excludeTables,
- gracePeriod)) {
+ if (!isMTMVPartitionSync(mtmv, partition.getId(), tables,
excludeTables)) {
return false;
}
}
@@ -197,7 +191,6 @@ public class MTMVUtil {
*/
public static List<String> getPartitionUnSyncTables(MTMV mtmv, Long
partitionId) throws AnalysisException {
List<String> res = Lists.newArrayList();
- long maxAvailableTime =
mtmv.getPartitionOrAnalysisException(partitionId).getVisibleVersionTimeIgnoreInit();
for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTables())
{
TableIf table = getTable(baseTableInfo);
if (!(table instanceof MTMVRelatedTableIf)) {
@@ -213,14 +206,13 @@ public class MTMVUtil {
if (relatedPartitionId == -1L) {
throw new AnalysisException("can not found related
partition");
}
- boolean isSyncWithPartition = isSyncWithPartition(mtmv,
partitionId, item, mtmvRelatedTableIf,
- relatedPartitionId,
relatedPartitionItems.get(relatedPartitionId));
+ boolean isSyncWithPartition = isSyncWithPartition(mtmv,
partitionId, mtmvRelatedTableIf,
+ relatedPartitionId);
if (!isSyncWithPartition) {
res.add(mtmvRelatedTableIf.getName());
}
} else {
- long tableLastVisibleVersionTime =
mtmvRelatedTableIf.getLastModifyTime();
- if (tableLastVisibleVersionTime > maxAvailableTime) {
+ if (!isSyncWithBaseTable(mtmv, partitionId, baseTableInfo)) {
res.add(table.getName());
}
}
@@ -257,16 +249,16 @@ public class MTMVUtil {
return res;
}
// check gracePeriod
- Long gracePeriod = mtmv.getGracePeriod();
- // do not care data is delayed
- if (gracePeriod < 0) {
- return allPartitions;
- }
-
+ long gracePeriodMills = mtmv.getGracePeriod();
+ long currentTimeMills = System.currentTimeMillis();
for (Partition partition : allPartitions) {
+ if (gracePeriodMills > 0 && currentTimeMills <=
(partition.getVisibleVersionTime()
+ + gracePeriodMills)) {
+ res.add(partition);
+ continue;
+ }
try {
- if (isMTMVPartitionSync(mtmv, partition.getId(),
mtmvRelation.getBaseTables(), Sets.newHashSet(),
- gracePeriod)) {
+ if (isMTMVPartitionSync(mtmv, partition.getId(),
mtmvRelation.getBaseTables(), Sets.newHashSet())) {
res.add(partition);
}
} catch (AnalysisException e) {
@@ -290,8 +282,7 @@ public class MTMVUtil {
for (Partition partition : allPartitions) {
try {
if (!isMTMVPartitionSync(mtmv, partition.getId(), baseTables,
- mtmv.getExcludedTriggerTables(),
- 0L)) {
+ mtmv.getExcludedTriggerTables())) {
res.add(partition.getId());
}
} catch (AnalysisException e) {
@@ -312,11 +303,15 @@ public class MTMVUtil {
* @return
* @throws AnalysisException
*/
- private static boolean isSyncWithPartition(MTMV mtmv, Long
mtmvPartitionId, PartitionItem mtmvPartitionItem,
+ private static boolean isSyncWithPartition(MTMV mtmv, Long mtmvPartitionId,
MTMVRelatedTableIf relatedTable,
- Long relatedPartitionId, PartitionItem relatedPartitionItem)
throws AnalysisException {
- return mtmv.getPartitionLastModifyTime(mtmvPartitionId,
mtmvPartitionItem) >= relatedTable
- .getPartitionLastModifyTime(relatedPartitionId,
relatedPartitionItem);
+ Long relatedPartitionId) throws AnalysisException {
+ MTMVSnapshotIf relatedPartitionCurrentSnapshot = relatedTable
+ .getPartitionSnapshot(relatedPartitionId);
+ String relatedPartitionName =
relatedTable.getPartitionName(relatedPartitionId);
+ String mtmvPartitionName = mtmv.getPartitionName(mtmvPartitionId);
+ return mtmv.getRefreshSnapshot()
+ .equalsWithRelatedPartition(mtmvPartitionName,
relatedPartitionName, relatedPartitionCurrentSnapshot);
}
/**
@@ -343,9 +338,17 @@ public class MTMVUtil {
* @param partitionId
*/
private static void dropPartition(MTMV mtmv, Long partitionId) throws
AnalysisException, DdlException {
- Partition partition =
mtmv.getPartitionOrAnalysisException(partitionId);
- DropPartitionClause dropPartitionClause = new
DropPartitionClause(false, partition.getName(), false, false);
- Env.getCurrentEnv().dropPartition((Database) mtmv.getDatabase(), mtmv,
dropPartitionClause);
+ if (!mtmv.writeLockIfExist()) {
+ return;
+ }
+ try {
+ Partition partition =
mtmv.getPartitionOrAnalysisException(partitionId);
+ DropPartitionClause dropPartitionClause = new
DropPartitionClause(false, partition.getName(), false, false);
+ Env.getCurrentEnv().dropPartition((Database) mtmv.getDatabase(),
mtmv, dropPartitionClause);
+ } finally {
+ mtmv.writeUnlock();
+ }
+
}
/**
@@ -388,15 +391,13 @@ public class MTMVUtil {
/**
* Determine is sync, ignoring excludedTriggerTables and non OlapTanle
*
- * @param visibleVersionTime
+ * @param mtmvPartitionId
* @param tables
* @param excludedTriggerTables
- * @param gracePeriod
* @return
*/
- private static boolean isFresherThanTables(long visibleVersionTime,
Set<BaseTableInfo> tables,
- Set<String> excludedTriggerTables, Long gracePeriod) throws
AnalysisException {
- long maxAvailableTime = visibleVersionTime + gracePeriod;
+ private static boolean isSyncWithAllBaseTables(MTMV mtmv, long
mtmvPartitionId, Set<BaseTableInfo> tables,
+ Set<String> excludedTriggerTables) throws AnalysisException {
for (BaseTableInfo baseTableInfo : tables) {
TableIf table = null;
try {
@@ -408,17 +409,36 @@ public class MTMVUtil {
if (excludedTriggerTables.contains(table.getName())) {
continue;
}
- if (!(table instanceof MTMVRelatedTableIf)) {
- continue;
- }
- long tableLastVisibleVersionTime = ((MTMVRelatedTableIf)
table).getLastModifyTime();
- if (tableLastVisibleVersionTime > maxAvailableTime) {
+ boolean syncWithBaseTable = isSyncWithBaseTable(mtmv,
mtmvPartitionId, baseTableInfo);
+ if (!syncWithBaseTable) {
return false;
}
}
return true;
}
+ private static boolean isSyncWithBaseTable(MTMV mtmv, long
mtmvPartitionId, BaseTableInfo baseTableInfo)
+ throws AnalysisException {
+ TableIf table = null;
+ try {
+ table = getTable(baseTableInfo);
+ } catch (AnalysisException e) {
+ LOG.warn("get table failed, {}", baseTableInfo, e);
+ return false;
+ }
+
+ if (!(table instanceof MTMVRelatedTableIf)) {
+ // if not MTMVRelatedTableIf, we can not get snapshot from it,
+ // Currently, it is believed to be synchronous
+ return true;
+ }
+ MTMVRelatedTableIf baseTable = (MTMVRelatedTableIf) table;
+ MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot();
+ String mtmvPartitionName = mtmv.getPartitionName(mtmvPartitionId);
+ return mtmv.getRefreshSnapshot()
+ .equalsWithBaseTable(mtmvPartitionName, baseTable.getId(),
baseTableCurrentSnapshot);
+ }
+
private static boolean mtmvContainsExternalTable(MTMV mtmv) {
Set<BaseTableInfo> baseTables = mtmv.getRelation().getBaseTables();
for (BaseTableInfo baseTableInfo : baseTables) {
@@ -428,4 +448,60 @@ public class MTMVUtil {
}
return false;
}
+
+ public static Map<String, MTMVRefreshPartitionSnapshot>
generatePartitionSnapshots(MTMV mtmv,
+ Set<BaseTableInfo> baseTables, Set<Long> partitionIds)
+ throws AnalysisException {
+ Map<String, MTMVRefreshPartitionSnapshot> res = Maps.newHashMap();
+ for (Long partitionId : partitionIds) {
+ res.put(mtmv.getPartition(partitionId).getName(),
generatePartitionSnapshot(mtmv, baseTables, partitionId));
+ }
+ return res;
+ }
+
+
+ private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMV
mtmv,
+ Set<BaseTableInfo> baseTables, Long partitionId)
+ throws AnalysisException {
+ MTMVRefreshPartitionSnapshot refreshPartitionSnapshot = new
MTMVRefreshPartitionSnapshot();
+ if (mtmv.getMvPartitionInfo().getPartitionType() ==
MTMVPartitionType.FOLLOW_BASE_TABLE) {
+ MTMVRelatedTableIf relatedTable =
mtmv.getMvPartitionInfo().getRelatedTable();
+ List<Long> relatedPartitionIds = getMTMVPartitionRelatedPartitions(
+ mtmv.getPartitionItems().get(partitionId),
+ relatedTable);
+
+ for (Long relatedPartitionId : relatedPartitionIds) {
+ MTMVSnapshotIf partitionSnapshot = relatedTable
+ .getPartitionSnapshot(relatedPartitionId);
+ refreshPartitionSnapshot.getPartitions()
+
.put(relatedTable.getPartitionName(relatedPartitionId), partitionSnapshot);
+ }
+ }
+ for (BaseTableInfo baseTableInfo : baseTables) {
+ if (mtmv.getMvPartitionInfo().getPartitionType() ==
MTMVPartitionType.FOLLOW_BASE_TABLE && mtmv
+
.getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) {
+ continue;
+ }
+ TableIf table = getTable(baseTableInfo);
+ if (!(table instanceof MTMVRelatedTableIf)) {
+ continue;
+ }
+ refreshPartitionSnapshot.getTables().put(table.getId(),
((MTMVRelatedTableIf) table).getTableSnapshot());
+ }
+ return refreshPartitionSnapshot;
+ }
+
+ private static List<Long> getMTMVPartitionRelatedPartitions(PartitionItem
mtmvPartitionItem,
+ MTMVRelatedTableIf relatedTable) {
+ List<Long> res = Lists.newArrayList();
+ Map<Long, PartitionItem> relatedPartitionItems =
relatedTable.getPartitionItems();
+ for (Entry<Long, PartitionItem> entry :
relatedPartitionItems.entrySet()) {
+ if (mtmvPartitionItem.equals(entry.getValue())) {
+ res.add(entry.getKey());
+ // current, the partitioning of MTMV corresponds one-to-one
with the partitioning of related table
+ return res;
+ }
+ }
+ return res;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVVersionSnapshot.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVVersionSnapshot.java
new file mode 100644
index 00000000000..14304e60183
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVVersionSnapshot.java
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import com.google.common.base.Objects;
+import com.google.gson.annotations.SerializedName;
+
+public class MTMVVersionSnapshot implements MTMVSnapshotIf {
+ @SerializedName("v")
+ private long version;
+
+ public MTMVVersionSnapshot(long version) {
+ this.version = version;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ MTMVVersionSnapshot that = (MTMVVersionSnapshot) o;
+ return version == that.version;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(version);
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/AlterMTMV.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterMTMV.java
index d2084ff6fa3..6c974c57f18 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/AlterMTMV.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterMTMV.java
@@ -22,6 +22,7 @@ import org.apache.doris.common.io.Writable;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
import org.apache.doris.mtmv.MTMVAlterOpType;
import org.apache.doris.mtmv.MTMVRefreshInfo;
+import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot;
import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mtmv.MTMVStatus;
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
@@ -52,6 +53,8 @@ public class AlterMTMV implements Writable {
private MTMVTask task;
@SerializedName("r")
private MTMVRelation relation;
+ @SerializedName("ps")
+ private Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots;
public AlterMTMV(TableNameInfo mvName, MTMVRefreshInfo refreshInfo,
MTMVAlterOpType opType) {
this.mvName = Objects.requireNonNull(mvName, "require mvName object");
@@ -125,6 +128,15 @@ public class AlterMTMV implements Writable {
this.relation = relation;
}
+ public Map<String, MTMVRefreshPartitionSnapshot> getPartitionSnapshots() {
+ return partitionSnapshots;
+ }
+
+ public void setPartitionSnapshots(
+ Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots) {
+ this.partitionSnapshots = partitionSnapshots;
+ }
+
@Override
public String toString() {
return "AlterMTMV{"
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
index 607bfe0ee4f..7284751cdb3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
@@ -96,6 +96,10 @@ import
org.apache.doris.load.routineload.AbstractDataSourceProperties;
import org.apache.doris.load.routineload.kafka.KafkaDataSourceProperties;
import org.apache.doris.load.sync.SyncJob;
import org.apache.doris.load.sync.canal.CanalSyncJob;
+import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot;
+import org.apache.doris.mtmv.MTMVSnapshotIf;
+import org.apache.doris.mtmv.MTMVTimestampSnapshot;
+import org.apache.doris.mtmv.MTMVVersionSnapshot;
import org.apache.doris.policy.Policy;
import org.apache.doris.policy.RowPolicy;
import org.apache.doris.policy.StoragePolicy;
@@ -245,6 +249,12 @@ public class GsonUtils {
.registerSubtype(InsertJob.class,
InsertJob.class.getSimpleName())
.registerSubtype(MTMVJob.class,
MTMVJob.class.getSimpleName());
+ private static RuntimeTypeAdapterFactory<MTMVSnapshotIf>
mtmvSnapshotTypeAdapterFactory =
+ RuntimeTypeAdapterFactory.of(MTMVSnapshotIf.class, "clazz")
+ .registerSubtype(MTMVMaxTimestampSnapshot.class,
MTMVMaxTimestampSnapshot.class.getSimpleName())
+ .registerSubtype(MTMVTimestampSnapshot.class,
MTMVTimestampSnapshot.class.getSimpleName())
+ .registerSubtype(MTMVVersionSnapshot.class,
MTMVVersionSnapshot.class.getSimpleName());
+
private static RuntimeTypeAdapterFactory<DatabaseIf> dbTypeAdapterFactory
= RuntimeTypeAdapterFactory.of(
DatabaseIf.class, "clazz")
.registerSubtype(ExternalDatabase.class,
ExternalDatabase.class.getSimpleName())
@@ -319,6 +329,7 @@ public class GsonUtils {
.registerTypeAdapterFactory(hbResponseTypeAdapterFactory)
.registerTypeAdapterFactory(rdsTypeAdapterFactory)
.registerTypeAdapterFactory(jobExecutorRuntimeTypeAdapterFactory)
+ .registerTypeAdapterFactory(mtmvSnapshotTypeAdapterFactory)
.registerTypeAdapterFactory(constraintTypeAdapterFactory)
.registerTypeAdapter(ImmutableMap.class, new
ImmutableMapDeserializer())
.registerTypeAdapter(AtomicBoolean.class, new
AtomicBooleanAdapter())
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]