This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ca0c1a59b62 [enhance](mtmv)MTMV supports Hive multi-level partitioning
(#31060)
ca0c1a59b62 is described below
commit ca0c1a59b629b83e0316f83213d5e3c5e3e2204b
Author: zhangdong <[email protected]>
AuthorDate: Sun Feb 25 17:47:19 2024 +0800
[enhance](mtmv)MTMV supports Hive multi-level partitioning (#31060)
Issue Number: close #xxx
For example, the hive table is partitioned by `date` and `region`, with the
following 6 partitions
```
20200101
beijing
shanghai
20200102
beijing
shanghai
20200103
beijing
shanghai
```
If the MTMV is partitioned by `date`, then the MTMV will have three
partitions: 20200101, 202000102, 20200103
If the MTMV is partitioned by `region`, then the MTMV will have two
partitions: beijing, shanghai
---
.../Create/CREATE-ASYNC-MATERIALIZED-VIEW.md | 2 +-
.../Create/CREATE-ASYNC-MATERIALIZED-VIEW.md | 2 +-
.../apache/doris/analysis/PartitionKeyDesc.java | 23 ++
.../org/apache/doris/analysis/PartitionValue.java | 20 ++
.../apache/doris/catalog/ListPartitionItem.java | 19 ++
.../main/java/org/apache/doris/catalog/MTMV.java | 68 ++++++
.../org/apache/doris/catalog/PartitionItem.java | 10 +
.../apache/doris/catalog/RangePartitionItem.java | 7 +
.../doris/common/proc/PartitionsProcDir.java | 20 +-
.../apache/doris/job/extensions/mtmv/MTMVTask.java | 15 +-
.../org/apache/doris/mtmv/MTMVPartitionInfo.java | 13 ++
.../org/apache/doris/mtmv/MTMVPartitionUtil.java | 198 ++++++++--------
.../org/apache/doris/mtmv/MTMVRewriteUtil.java | 9 +-
.../trees/plans/commands/info/CreateMTMVInfo.java | 8 +-
.../apache/doris/mtmv/MTMVPartitionUtilTest.java | 6 +-
.../org/apache/doris/mtmv/MTMVRewriteUtilTest.java | 12 +-
.../java/org/apache/doris/mtmv/MTMVTaskTest.java | 26 ++-
.../mtmv_p0/test_hive_multi_partition_mtmv.out | 43 ++++
.../mtmv_p0/test_hive_multi_partition_mtmv.groovy | 253 +++++++++++++++++++++
.../mtmv_p0/test_partition_refresh_mtmv.groovy | 90 ++++++--
20 files changed, 698 insertions(+), 146 deletions(-)
diff --git
a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-ASYNC-MATERIALIZED-VIEW.md
b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-ASYNC-MATERIALIZED-VIEW.md
index 37156bc7011..91263a6ff51 100644
---
a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-ASYNC-MATERIALIZED-VIEW.md
+++
b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-ASYNC-MATERIALIZED-VIEW.md
@@ -160,7 +160,7 @@ KEY(k1,k2)
```
##### partition
-There are two types of partitioning methods for materialized views. If no
partitioning is specified, there will be a default single partition. If a
partitioning field is specified, the system will automatically deduce the
source base table of that field and synchronize all partitions of the base
table (currently supporting `OlapTable` and `hive`). (Limitation: the current
base table can only have one partitioning field.)
+There are two types of partitioning methods for materialized views. If no
partitioning is specified, there will be a default single partition. If a
partitioning field is specified, the system will automatically deduce the
source base table of that field and synchronize all partitions of the base
table (currently supporting `OlapTable` and `hive`). (Limitation: If the base
table is an `OlapTable`, it can only have one partition field)
For example, if the base table is a range partition with a partition field of
`create_time` and partitioning by day, and `partition by(ct) as select
create_time as ct from t1` is specified when creating a materialized view,
then the materialized view will also be a range partition with a partition
field of 'ct' and partitioning by day
diff --git
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-ASYNC-MATERIALIZED-VIEW.md
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-ASYNC-MATERIALIZED-VIEW.md
index 8add74079bf..52050763c21 100644
---
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-ASYNC-MATERIALIZED-VIEW.md
+++
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-ASYNC-MATERIALIZED-VIEW.md
@@ -160,7 +160,7 @@ KEY(k1,k2)
```
##### partition
-物化视图有两种分区方式,如果不指定分区,默认只有一个分区,如果指定分区字段,会自动推导出字段来自哪个基表并同步基表(当前支持`OlapTable`和`hive`)的所有分区(限制条件:当前基表只能有一个分区字段)
+物化视图有两种分区方式,如果不指定分区,默认只有一个分区,如果指定分区字段,会自动推导出字段来自哪个基表并同步基表(当前支持`OlapTable`和`hive`)的所有分区(限制条件:基表如果是`OlapTable`,那么只能有一个分区字段)
例如:基表是range分区,分区字段为`create_time`并按天分区,创建物化视图时指定`partition by(ct) as select
create_time as ct from t1`
那么物化视图也会是range分区,分区字段为`ct`,并且按天分区
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionKeyDesc.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionKeyDesc.java
index f7f63490ee8..4cd87953572 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionKeyDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionKeyDesc.java
@@ -21,6 +21,7 @@ import org.apache.doris.common.AnalysisException;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
+import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import java.util.List;
@@ -223,4 +224,26 @@ public class PartitionKeyDesc {
})).append(")");
return sb.toString();
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PartitionKeyDesc that = (PartitionKeyDesc) o;
+ return Objects.equal(lowerValues, that.lowerValues)
+ && Objects.equal(upperValues, that.upperValues)
+ && Objects.equal(inValues, that.inValues)
+ && partitionKeyValueType == that.partitionKeyValueType
+ && Objects.equal(timeInterval, that.timeInterval)
+ && Objects.equal(timeType, that.timeType);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(lowerValues, upperValues, inValues,
partitionKeyValueType, timeInterval, timeType);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionValue.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionValue.java
index 6d19c52c800..5dea5c767b7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionValue.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionValue.java
@@ -20,6 +20,8 @@ package org.apache.doris.analysis;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
+import com.google.common.base.Objects;
+
public class PartitionValue {
public static final PartitionValue MAX_VALUE = new PartitionValue();
@@ -69,4 +71,22 @@ public class PartitionValue {
public boolean isHiveDefaultPartition() {
return isHiveDefaultPartition;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PartitionValue that = (PartitionValue) o;
+ return isHiveDefaultPartition == that.isHiveDefaultPartition
+ && Objects.equal(value, that.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(value, isHiveDefaultPartition);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java
index ef23a444965..47302c21d1b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java
@@ -19,14 +19,17 @@ package org.apache.doris.catalog;
import org.apache.doris.analysis.PartitionKeyDesc;
import org.apache.doris.analysis.PartitionValue;
+import org.apache.doris.common.AnalysisException;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import java.util.stream.Collectors;
public class ListPartitionItem extends PartitionItem {
@@ -80,6 +83,22 @@ public class ListPartitionItem extends PartitionItem {
return PartitionKeyDesc.createIn(inValues);
}
+ @Override
+ public PartitionKeyDesc toPartitionKeyDesc(int pos) throws
AnalysisException {
+ List<List<PartitionValue>> inValues =
partitionKeys.stream().map(PartitionInfo::toPartitionValue)
+ .collect(Collectors.toList());
+ Set<List<PartitionValue>> res = Sets.newHashSet();
+ for (List<PartitionValue> values : inValues) {
+ if (values.size() <= pos) {
+ throw new AnalysisException(
+ String.format("toPartitionKeyDesc IndexOutOfBounds,
values: %s, pos: %d", values.toString(),
+ pos));
+ }
+ res.add(Lists.newArrayList(values.get(pos)));
+ }
+ return PartitionKeyDesc.createIn(Lists.newArrayList(res));
+ }
+
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(partitionKeys.size());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
index e7b0a79dd53..10739be8538 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
@@ -17,6 +17,7 @@
package org.apache.doris.catalog;
+import org.apache.doris.analysis.PartitionKeyDesc;
import org.apache.doris.catalog.OlapTableFactory.MTMVParams;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.io.Text;
@@ -28,6 +29,7 @@ import org.apache.doris.mtmv.MTMVCache;
import org.apache.doris.mtmv.MTMVJobInfo;
import org.apache.doris.mtmv.MTMVJobManager;
import org.apache.doris.mtmv.MTMVPartitionInfo;
+import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import org.apache.doris.mtmv.MTMVPlanUtil;
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState;
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
@@ -38,6 +40,7 @@ import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mtmv.MTMVStatus;
import org.apache.doris.persist.gson.GsonUtils;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.StringUtils;
@@ -47,7 +50,9 @@ import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -243,6 +248,69 @@ public class MTMV extends OlapTable {
return refreshSnapshot;
}
+ /**
+ * generateMvPartitionDescs
+ *
+ * @return mvPartitionId ==> mvPartitionKeyDesc
+ */
+ public Map<Long, PartitionKeyDesc> generateMvPartitionDescs() {
+ Map<Long, PartitionItem> mtmvItems = getPartitionItems();
+ Map<Long, PartitionKeyDesc> result = Maps.newHashMap();
+ for (Entry<Long, PartitionItem> entry : mtmvItems.entrySet()) {
+ result.put(entry.getKey(), entry.getValue().toPartitionKeyDesc());
+ }
+ return result;
+ }
+
+ /**
+ * generateRelatedPartitionDescs
+ * <p>
+ * Different partitions may generate the same PartitionKeyDesc through
logical calculations
+ * (such as selecting only one column, or rolling up partitions), so it is
a one to many relationship
+ *
+ * @return related PartitionKeyDesc ==> relatedPartitionIds
+ * @throws AnalysisException
+ */
+ public Map<PartitionKeyDesc, Set<Long>> generateRelatedPartitionDescs()
throws AnalysisException {
+ if (mvPartitionInfo.getPartitionType() ==
MTMVPartitionType.SELF_MANAGE) {
+ return Maps.newHashMap();
+ }
+ Map<PartitionKeyDesc, Set<Long>> res = new HashMap<>();
+ Map<Long, PartitionItem> relatedPartitionItems =
mvPartitionInfo.getRelatedTable().getPartitionItems();
+ int relatedColPos = mvPartitionInfo.getRelatedColPos();
+ for (Entry<Long, PartitionItem> entry :
relatedPartitionItems.entrySet()) {
+ PartitionKeyDesc partitionKeyDesc =
entry.getValue().toPartitionKeyDesc(relatedColPos);
+ if (res.containsKey(partitionKeyDesc)) {
+ res.get(partitionKeyDesc).add(entry.getKey());
+ } else {
+ res.put(partitionKeyDesc, Sets.newHashSet(entry.getKey()));
+ }
+ }
+ return res;
+ }
+
+ /**
+ * Calculate the partition and associated partition mapping relationship
of the MTMV
+ * It is the result of real-time comparison calculation, so there may be
some costs,
+ * so it should be called with caution
+ *
+ * @return mvPartitionId ==> relationPartitionIds
+ * @throws AnalysisException
+ */
+ public Map<Long, Set<Long>> calculatePartitionMappings() throws
AnalysisException {
+ if (mvPartitionInfo.getPartitionType() ==
MTMVPartitionType.SELF_MANAGE) {
+ return Maps.newHashMap();
+ }
+ Map<Long, Set<Long>> res = Maps.newHashMap();
+ Map<PartitionKeyDesc, Set<Long>> relatedPartitionDescs =
generateRelatedPartitionDescs();
+ Map<Long, PartitionItem> mvPartitionItems =
getPartitionInfo().getIdToItem(false);
+ for (Entry<Long, PartitionItem> entry : mvPartitionItems.entrySet()) {
+ res.put(entry.getKey(),
+
relatedPartitionDescs.getOrDefault(entry.getValue().toPartitionKeyDesc(),
Sets.newHashSet()));
+ }
+ return res;
+ }
+
public void readMvLock() {
this.mvRwLock.readLock().lock();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionItem.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionItem.java
index 8ea754abff4..3c29aa2f48b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionItem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionItem.java
@@ -18,6 +18,7 @@
package org.apache.doris.catalog;
import org.apache.doris.analysis.PartitionKeyDesc;
+import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.io.Writable;
import java.util.Comparator;
@@ -36,4 +37,13 @@ public abstract class PartitionItem implements
Comparable<PartitionItem>, Writab
}
public abstract PartitionKeyDesc toPartitionKeyDesc();
+
+ /**
+ * Generate PartitionKeyDesc using only the posth PartitionValue
+ *
+ * @param pos
+ * @return
+ * @throws AnalysisException
+ */
+ public abstract PartitionKeyDesc toPartitionKeyDesc(int pos) throws
AnalysisException;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java
index cadb95ed3d9..7a9162c874c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java
@@ -53,6 +53,13 @@ public class RangePartitionItem extends PartitionItem {
PartitionInfo.toPartitionValue(partitionKeyRange.upperEndpoint()));
}
+ @Override
+ public PartitionKeyDesc toPartitionKeyDesc(int pos) {
+ // MTMV do not allow base tables with partition type range to have
multiple partition columns,
+ // so pos is ignored here
+ return toPartitionKeyDesc();
+ }
+
@Override
public void write(DataOutput out) throws IOException {
RangeUtils.writeRange(out, partitionKeyRange);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
index 4703429fa18..6b8f59e6508 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
@@ -220,7 +220,7 @@ public class PartitionsProcDir implements ProcDirInterface {
return result;
}
- private List<List<Comparable>> getPartitionInfos() {
+ private List<List<Comparable>> getPartitionInfos() throws
AnalysisException {
Preconditions.checkNotNull(db);
Preconditions.checkNotNull(olapTable);
Preconditions.checkState(olapTable.isManagedTable());
@@ -244,6 +244,12 @@ public class PartitionsProcDir implements ProcDirInterface
{
}
Joiner joiner = Joiner.on(", ");
+ Map<Long, List<String>> partitionsUnSyncTables = null;
+ if (olapTable instanceof MTMV) {
+ partitionsUnSyncTables = MTMVPartitionUtil
+ .getPartitionsUnSyncTables((MTMV) olapTable,
partitionIds);
+
+ }
for (Long partitionId : partitionIds) {
Partition partition = olapTable.getPartition(partitionId);
@@ -308,15 +314,9 @@ public class PartitionsProcDir implements ProcDirInterface
{
partitionInfo.add(tblPartitionInfo.getIsMutable(partitionId));
if (olapTable instanceof MTMV) {
- try {
- List<String> partitionUnSyncTables = MTMVPartitionUtil
- .getPartitionUnSyncTables((MTMV) olapTable,
partitionId);
-
partitionInfo.add(CollectionUtils.isEmpty(partitionUnSyncTables));
- partitionInfo.add(partitionUnSyncTables.toString());
- } catch (AnalysisException e) {
- partitionInfo.add(false);
- partitionInfo.add(e.getMessage());
- }
+ List<String> partitionUnSyncTables =
partitionsUnSyncTables.get(partitionId);
+
partitionInfo.add(CollectionUtils.isEmpty(partitionUnSyncTables));
+ partitionInfo.add(partitionUnSyncTables.toString());
} else {
partitionInfo.add(true);
partitionInfo.add(FeConstants.null_string);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
index a859a8ca1b7..fa0a88fa93d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
@@ -169,9 +169,10 @@ public class MTMVTask extends AbstractTask {
// To be completely consistent with hive, you need to manually
refresh the cache
// refreshHmsTable();
if (mtmv.getMvPartitionInfo().getPartitionType() ==
MTMVPartitionType.FOLLOW_BASE_TABLE) {
- MTMVPartitionUtil.alignMvPartition(mtmv,
mtmv.getMvPartitionInfo().getRelatedTable());
+ MTMVPartitionUtil.alignMvPartition(mtmv);
}
- List<Long> needRefreshPartitionIds =
calculateNeedRefreshPartitions();
+ Map<Long, Set<Long>> partitionMappings =
mtmv.calculatePartitionMappings();
+ List<Long> needRefreshPartitionIds =
calculateNeedRefreshPartitions(partitionMappings);
this.needRefreshPartitions =
MTMVPartitionUtil.getPartitionNamesByIds(mtmv, needRefreshPartitionIds);
this.refreshMode = generateRefreshMode(needRefreshPartitionIds);
if (refreshMode == MTMVTaskRefreshMode.NOT_REFRESH) {
@@ -191,7 +192,8 @@ public class MTMVTask extends AbstractTask {
// need get names before exec
List<String> execPartitionNames =
MTMVPartitionUtil.getPartitionNamesByIds(mtmv, execPartitionIds);
Map<String, MTMVRefreshPartitionSnapshot>
execPartitionSnapshots = MTMVPartitionUtil
- .generatePartitionSnapshots(mtmv,
relation.getBaseTables(), execPartitionIds);
+ .generatePartitionSnapshots(mtmv,
relation.getBaseTables(), execPartitionIds,
+ partitionMappings);
exec(ctx, execPartitionIds, tableWithPartKey);
completedPartitions.addAll(execPartitionNames);
partitionSnapshots.putAll(execPartitionSnapshots);
@@ -389,7 +391,7 @@ public class MTMVTask extends AbstractTask {
}
}
- public List<Long> calculateNeedRefreshPartitions() throws
AnalysisException {
+ public List<Long> calculateNeedRefreshPartitions(Map<Long, Set<Long>>
partitionMappings) throws AnalysisException {
// check whether the user manually triggers it
if (taskContext.getTriggerMode() == MTMVTaskTriggerMode.MANUAL) {
if (taskContext.isComplete()) {
@@ -402,7 +404,8 @@ public class MTMVTask extends AbstractTask {
// check if data is fresh
// We need to use a newly generated relationship and cannot retrieve
it using mtmv.getRelation()
// to avoid rebuilding the baseTable and causing a change in the
tableId
- boolean fresh = MTMVPartitionUtil.isMTMVSync(mtmv,
relation.getBaseTables(), mtmv.getExcludedTriggerTables());
+ boolean fresh = MTMVPartitionUtil.isMTMVSync(mtmv,
relation.getBaseTables(), mtmv.getExcludedTriggerTables(),
+ partitionMappings);
if (fresh) {
return Lists.newArrayList();
}
@@ -416,7 +419,7 @@ public class MTMVTask extends AbstractTask {
}
// We need to use a newly generated relationship and cannot retrieve
it using mtmv.getRelation()
// to avoid rebuilding the baseTable and causing a change in the
tableId
- return MTMVPartitionUtil.getMTMVNeedRefreshPartitions(mtmv,
relation.getBaseTables());
+ return MTMVPartitionUtil.getMTMVNeedRefreshPartitions(mtmv,
relation.getBaseTables(), partitionMappings);
}
public MTMVTaskContext getTaskContext() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java
index c48594847f3..3a364e0749d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java
@@ -89,6 +89,19 @@ public class MTMVPartitionInfo {
this.partitionCol = partitionCol;
}
+ /**
+ * Get the position of relatedCol in the relatedTable partition column
+ *
+ * @return
+ * @throws AnalysisException
+ */
+ public int getRelatedColPos() throws AnalysisException {
+ if (partitionType == MTMVPartitionType.SELF_MANAGE) {
+ throw new AnalysisException("partitionType is: " + partitionType);
+ }
+ return MTMVPartitionUtil.getPos(getRelatedTable(), relatedCol);
+ }
+
@Override
public String toString() {
return "MTMVPartitionInfo{"
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java
index 6657b3d243f..88fe02a8b4f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java
@@ -22,6 +22,7 @@ import org.apache.doris.analysis.AllPartitionDesc;
import org.apache.doris.analysis.DropPartitionClause;
import org.apache.doris.analysis.PartitionKeyDesc;
import org.apache.doris.analysis.SinglePartitionDesc;
+import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
@@ -35,11 +36,13 @@ import
org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -58,27 +61,26 @@ public class MTMVPartitionUtil {
*
* @param mtmv
* @param partitionId
+ * @param relatedPartitionIds
* @param tables
* @param excludedTriggerTables
* @return
* @throws AnalysisException
*/
- public static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId,
Set<BaseTableInfo> tables,
+ public static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId,
Set<Long> relatedPartitionIds,
+ Set<BaseTableInfo> tables,
Set<String> excludedTriggerTables) throws AnalysisException {
boolean isSyncWithPartition = true;
if (mtmv.getMvPartitionInfo().getPartitionType() ==
MTMVPartitionType.FOLLOW_BASE_TABLE) {
MTMVRelatedTableIf relatedTable =
mtmv.getMvPartitionInfo().getRelatedTable();
// if follow base table, not need compare with related table, only
should compare with related partition
excludedTriggerTables.add(relatedTable.getName());
- PartitionItem item =
mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId);
- Map<Long, PartitionItem> relatedPartitionItems =
relatedTable.getPartitionItems();
- long relatedPartitionId = getExistPartitionId(item,
- relatedPartitionItems);
- if (relatedPartitionId == -1L) {
- LOG.warn("can not found related partition: " + partitionId);
+ if (CollectionUtils.isEmpty(relatedPartitionIds)) {
+ LOG.warn("can not found related partition, partitionId: {},
mtmvName: {}, relatedTableName: {}",
+ partitionId, mtmv.getName(), relatedTable.getName());
return false;
}
- isSyncWithPartition = isSyncWithPartition(mtmv, partitionId,
relatedTable, relatedPartitionId);
+ isSyncWithPartition = isSyncWithPartitions(mtmv, partitionId,
relatedTable, relatedPartitionIds);
}
return isSyncWithPartition && isSyncWithAllBaseTables(mtmv,
partitionId, tables, excludedTriggerTables);
@@ -88,26 +90,24 @@ public class MTMVPartitionUtil {
* Align the partitions of mtmv and related tables, delete more and add
less
*
* @param mtmv
- * @param relatedTable
* @throws DdlException
* @throws AnalysisException
*/
- public static void alignMvPartition(MTMV mtmv, MTMVRelatedTableIf
relatedTable)
+ public static void alignMvPartition(MTMV mtmv)
throws DdlException, AnalysisException {
- Map<Long, PartitionItem> relatedTableItems =
Maps.newHashMap(relatedTable.getPartitionItems());
- Map<Long, PartitionItem> mtmvItems =
Maps.newHashMap(mtmv.getPartitionItems());
+ Map<Long, PartitionKeyDesc> mtmvPartitionDescs =
mtmv.generateMvPartitionDescs();
+ Set<PartitionKeyDesc> relatedPartitionDescs =
mtmv.generateRelatedPartitionDescs().keySet();
// drop partition of mtmv
- for (Entry<Long, PartitionItem> entry : mtmvItems.entrySet()) {
- long partitionId = getExistPartitionId(entry.getValue(),
relatedTableItems);
- if (partitionId == -1L) {
+ for (Entry<Long, PartitionKeyDesc> entry :
mtmvPartitionDescs.entrySet()) {
+ if (!relatedPartitionDescs.contains(entry.getValue())) {
dropPartition(mtmv, entry.getKey());
}
}
// add partition for mtmv
- for (Entry<Long, PartitionItem> entry : relatedTableItems.entrySet()) {
- long partitionId = getExistPartitionId(entry.getValue(),
mtmvItems);
- if (partitionId == -1L) {
- addPartition(mtmv, entry.getValue());
+ HashSet<PartitionKeyDesc> mtmvPartitionDescsSet =
Sets.newHashSet(mtmvPartitionDescs.values());
+ for (PartitionKeyDesc desc : relatedPartitionDescs) {
+ if (!mtmvPartitionDescsSet.contains(desc)) {
+ addPartition(mtmv, desc);
}
}
}
@@ -117,19 +117,19 @@ public class MTMVPartitionUtil {
*
* @param relatedTable
* @param tableProperties
+ * @param relatedCol
* @return
* @throws AnalysisException
*/
public static List<AllPartitionDesc>
getPartitionDescsByRelatedTable(MTMVRelatedTableIf relatedTable,
- Map<String, String> tableProperties) throws AnalysisException {
+ Map<String, String> tableProperties, String relatedCol) throws
AnalysisException {
HashMap<String, String> partitionProperties = Maps.newHashMap();
List<AllPartitionDesc> res = Lists.newArrayList();
- Map<Long, PartitionItem> relatedTableItems =
relatedTable.getPartitionItems();
- for (Entry<Long, PartitionItem> entry : relatedTableItems.entrySet()) {
- PartitionKeyDesc oldPartitionKeyDesc =
entry.getValue().toPartitionKeyDesc();
+ Set<PartitionKeyDesc> relatedPartitionDescs =
getRelatedPartitionDescs(relatedTable, relatedCol);
+ for (PartitionKeyDesc partitionKeyDesc : relatedPartitionDescs) {
SinglePartitionDesc singlePartitionDesc = new
SinglePartitionDesc(true,
- generatePartitionName(oldPartitionKeyDesc),
- oldPartitionKeyDesc, partitionProperties);
+ generatePartitionName(partitionKeyDesc),
+ partitionKeyDesc, partitionProperties);
// mtmv can only has one partition col
singlePartitionDesc.analyze(1, tableProperties);
res.add(singlePartitionDesc);
@@ -137,6 +137,29 @@ public class MTMVPartitionUtil {
return res;
}
+ private static Set<PartitionKeyDesc>
getRelatedPartitionDescs(MTMVRelatedTableIf relatedTable, String relatedCol)
+ throws AnalysisException {
+ int pos = getPos(relatedTable, relatedCol);
+ Set<PartitionKeyDesc> res = Sets.newHashSet();
+ for (Entry<Long, PartitionItem> entry :
relatedTable.getPartitionItems().entrySet()) {
+ PartitionKeyDesc partitionKeyDesc =
entry.getValue().toPartitionKeyDesc(pos);
+ res.add(partitionKeyDesc);
+ }
+ return res;
+ }
+
+ public static int getPos(MTMVRelatedTableIf relatedTable, String
relatedCol) throws AnalysisException {
+ List<Column> partitionColumns = relatedTable.getPartitionColumns();
+ for (int i = 0; i < partitionColumns.size(); i++) {
+ if
(partitionColumns.get(i).getName().equalsIgnoreCase(relatedCol)) {
+ return i;
+ }
+ }
+ throw new AnalysisException(
+ String.format("getRelatedColPos error, relatedCol: %s,
partitionColumns: %s", relatedCol,
+ partitionColumns));
+ }
+
public static List<String> getPartitionNamesByIds(MTMV mtmv,
Collection<Long> ids) throws AnalysisException {
List<String> res = Lists.newArrayList();
for (Long partitionId : ids) {
@@ -166,7 +189,7 @@ public class MTMVPartitionUtil {
return false;
}
try {
- return isMTMVSync(mtmv, mtmvRelation.getBaseTables(),
Sets.newHashSet());
+ return isMTMVSync(mtmv, mtmvRelation.getBaseTables(),
Sets.newHashSet(), mtmv.calculatePartitionMappings());
} catch (AnalysisException e) {
LOG.warn("isMTMVSync failed: ", e);
return false;
@@ -179,14 +202,17 @@ public class MTMVPartitionUtil {
* @param mtmv
* @param tables
* @param excludeTables
+ * @param partitionMappings
* @return
* @throws AnalysisException
*/
- public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables,
Set<String> excludeTables)
+ public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables,
Set<String> excludeTables,
+ Map<Long, Set<Long>> partitionMappings)
throws AnalysisException {
Collection<Partition> partitions = mtmv.getPartitions();
for (Partition partition : partitions) {
- if (!isMTMVPartitionSync(mtmv, partition.getId(), tables,
excludeTables)) {
+ if (!isMTMVPartitionSync(mtmv, partition.getId(),
partitionMappings.get(partition.getId()), tables,
+ excludeTables)) {
return false;
}
}
@@ -194,14 +220,25 @@ public class MTMVPartitionUtil {
}
/**
- * get not sync tables
+ * getPartitionsUnSyncTables
*
* @param mtmv
- * @param partitionId
- * @return
+ * @param partitionIds
+ * @return partitionId ==> UnSyncTableNames
* @throws AnalysisException
*/
- public static List<String> getPartitionUnSyncTables(MTMV mtmv, Long
partitionId) throws AnalysisException {
+ public static Map<Long, List<String>> getPartitionsUnSyncTables(MTMV mtmv,
List<Long> partitionIds)
+ throws AnalysisException {
+ Map<Long, List<String>> res = Maps.newHashMap();
+ Map<Long, Set<Long>> partitionMappings =
mtmv.calculatePartitionMappings();
+ for (Long partitionId : partitionIds) {
+ res.put(partitionId, getPartitionUnSyncTables(mtmv, partitionId,
partitionMappings.get(partitionId)));
+ }
+ return res;
+ }
+
+ private static List<String> getPartitionUnSyncTables(MTMV mtmv, Long
partitionId, Set<Long> relatedPartitionIds)
+ throws AnalysisException {
List<String> res = Lists.newArrayList();
for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTables())
{
TableIf table = MTMVUtil.getTable(baseTableInfo);
@@ -214,15 +251,11 @@ public class MTMVPartitionUtil {
}
if (mtmv.getMvPartitionInfo().getPartitionType() ==
MTMVPartitionType.FOLLOW_BASE_TABLE && mtmv
.getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) {
- PartitionItem item =
mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId);
- Map<Long, PartitionItem> relatedPartitionItems =
mtmvRelatedTableIf.getPartitionItems();
- long relatedPartitionId = getExistPartitionId(item,
- relatedPartitionItems);
- if (relatedPartitionId == -1L) {
+ if (CollectionUtils.isEmpty(relatedPartitionIds)) {
throw new AnalysisException("can not found related
partition");
}
- boolean isSyncWithPartition = isSyncWithPartition(mtmv,
partitionId, mtmvRelatedTableIf,
- relatedPartitionId);
+ boolean isSyncWithPartition = isSyncWithPartitions(mtmv,
partitionId, mtmvRelatedTableIf,
+ relatedPartitionIds);
if (!isSyncWithPartition) {
res.add(mtmvRelatedTableIf.getName());
}
@@ -242,12 +275,13 @@ public class MTMVPartitionUtil {
* @param baseTables
* @return
*/
- public static List<Long> getMTMVNeedRefreshPartitions(MTMV mtmv,
Set<BaseTableInfo> baseTables) {
+ public static List<Long> getMTMVNeedRefreshPartitions(MTMV mtmv,
Set<BaseTableInfo> baseTables,
+ Map<Long, Set<Long>> partitionMappings) {
Collection<Partition> allPartitions = mtmv.getPartitions();
List<Long> res = Lists.newArrayList();
for (Partition partition : allPartitions) {
try {
- if (!isMTMVPartitionSync(mtmv, partition.getId(), baseTables,
+ if (!isMTMVPartitionSync(mtmv, partition.getId(),
partitionMappings.get(partition.getId()), baseTables,
mtmv.getExcludedTriggerTables())) {
res.add(partition.getId());
}
@@ -260,27 +294,33 @@ public class MTMVPartitionUtil {
}
/**
- * compare last update time of mtmvPartition and tablePartition
+ * Compare the current and last updated partition (or table) snapshot of
the associated partition (or table)
*
* @param mtmv
* @param mtmvPartitionId
* @param relatedTable
- * @param relatedPartitionId
+ * @param relatedPartitionIds
* @return
* @throws AnalysisException
*/
- public static boolean isSyncWithPartition(MTMV mtmv, Long mtmvPartitionId,
+ public static boolean isSyncWithPartitions(MTMV mtmv, Long mtmvPartitionId,
MTMVRelatedTableIf relatedTable,
- Long relatedPartitionId) throws AnalysisException {
+ Set<Long> relatedPartitionIds) throws AnalysisException {
if (!relatedTable.needAutoRefresh()) {
return true;
}
- MTMVSnapshotIf relatedPartitionCurrentSnapshot = relatedTable
- .getPartitionSnapshot(relatedPartitionId);
- String relatedPartitionName =
relatedTable.getPartitionName(relatedPartitionId);
String mtmvPartitionName = mtmv.getPartitionName(mtmvPartitionId);
- return mtmv.getRefreshSnapshot()
- .equalsWithRelatedPartition(mtmvPartitionName,
relatedPartitionName, relatedPartitionCurrentSnapshot);
+ for (Long relatedPartitionId : relatedPartitionIds) {
+ MTMVSnapshotIf relatedPartitionCurrentSnapshot = relatedTable
+ .getPartitionSnapshot(relatedPartitionId);
+ String relatedPartitionName =
relatedTable.getPartitionName(relatedPartitionId);
+ if (!mtmv.getRefreshSnapshot()
+ .equalsWithRelatedPartition(mtmvPartitionName,
relatedPartitionName,
+ relatedPartitionCurrentSnapshot)) {
+ return false;
+ }
+ }
+ return true;
}
/**
@@ -323,12 +363,11 @@ public class MTMVPartitionUtil {
* add partition for mtmv like relatedPartitionId of relatedTable
*
* @param mtmv
- * @param partitionItem
+ * @param oldPartitionKeyDesc
* @throws DdlException
*/
- private static void addPartition(MTMV mtmv, PartitionItem partitionItem)
+ private static void addPartition(MTMV mtmv, PartitionKeyDesc
oldPartitionKeyDesc)
throws DdlException {
- PartitionKeyDesc oldPartitionKeyDesc =
partitionItem.toPartitionKeyDesc();
Map<String, String> partitionProperties = Maps.newHashMap();
SinglePartitionDesc singlePartitionDesc = new SinglePartitionDesc(true,
generatePartitionName(oldPartitionKeyDesc),
@@ -339,23 +378,6 @@ public class MTMVPartitionUtil {
Env.getCurrentEnv().addPartition((Database) mtmv.getDatabase(),
mtmv.getName(), addPartitionClause);
}
- /**
- * compare PartitionItem and return equals partitionId
- * if not found, return -1L
- *
- * @param target
- * @param sources
- * @return
- */
- private static long getExistPartitionId(PartitionItem target, Map<Long,
PartitionItem> sources) {
- for (Entry<Long, PartitionItem> entry : sources.entrySet()) {
- if (target.equals(entry.getValue())) {
- return entry.getKey();
- }
- }
- return -1L;
- }
-
/**
* Determine is sync, ignoring excludedTriggerTables and non OlapTanle
*
@@ -410,27 +432,35 @@ public class MTMVPartitionUtil {
.equalsWithBaseTable(mtmvPartitionName, baseTable.getId(),
baseTableCurrentSnapshot);
}
+ /**
+ * Generate updated snapshots of partitions to determine if they are
synchronized
+ *
+ * @param mtmv
+ * @param baseTables
+ * @param partitionIds
+ * @param partitionMappings
+ * @return
+ * @throws AnalysisException
+ */
public static Map<String, MTMVRefreshPartitionSnapshot>
generatePartitionSnapshots(MTMV mtmv,
- Set<BaseTableInfo> baseTables, Set<Long> partitionIds)
+ Set<BaseTableInfo> baseTables, Set<Long> partitionIds,
+ Map<Long, Set<Long>> partitionMappings)
throws AnalysisException {
Map<String, MTMVRefreshPartitionSnapshot> res = Maps.newHashMap();
for (Long partitionId : partitionIds) {
- res.put(mtmv.getPartition(partitionId).getName(),
generatePartitionSnapshot(mtmv, baseTables, partitionId));
+ res.put(mtmv.getPartition(partitionId).getName(),
+ generatePartitionSnapshot(mtmv, baseTables,
partitionMappings.get(partitionId)));
}
return res;
}
private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMV
mtmv,
- Set<BaseTableInfo> baseTables, Long partitionId)
+ Set<BaseTableInfo> baseTables, Set<Long> relatedPartitionIds)
throws AnalysisException {
MTMVRefreshPartitionSnapshot refreshPartitionSnapshot = new
MTMVRefreshPartitionSnapshot();
if (mtmv.getMvPartitionInfo().getPartitionType() ==
MTMVPartitionType.FOLLOW_BASE_TABLE) {
MTMVRelatedTableIf relatedTable =
mtmv.getMvPartitionInfo().getRelatedTable();
- List<Long> relatedPartitionIds = getMTMVPartitionRelatedPartitions(
- mtmv.getPartitionItems().get(partitionId),
- relatedTable);
-
for (Long relatedPartitionId : relatedPartitionIds) {
MTMVSnapshotIf partitionSnapshot = relatedTable
.getPartitionSnapshot(relatedPartitionId);
@@ -451,18 +481,4 @@ public class MTMVPartitionUtil {
}
return refreshPartitionSnapshot;
}
-
- private static List<Long> getMTMVPartitionRelatedPartitions(PartitionItem
mtmvPartitionItem,
- MTMVRelatedTableIf relatedTable) {
- List<Long> res = Lists.newArrayList();
- Map<Long, PartitionItem> relatedPartitionItems =
relatedTable.getPartitionItems();
- for (Entry<Long, PartitionItem> entry :
relatedPartitionItems.entrySet()) {
- if (mtmvPartitionItem.equals(entry.getValue())) {
- res.add(entry.getKey());
- // current, the partitioning of MTMV corresponds one-to-one
with the partitioning of related table
- return res;
- }
- }
- return res;
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java
index 666a79eba97..f0199169859 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java
@@ -31,6 +31,8 @@ import org.apache.logging.log4j.Logger;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
public class MTMVRewriteUtil {
private static final Logger LOG =
LogManager.getLogger(MTMVRewriteUtil.class);
@@ -64,6 +66,7 @@ public class MTMVRewriteUtil {
&& mtmv.getStatus().getRefreshState() ==
MTMVRefreshState.SUCCESS)) {
return res;
}
+ Map<Long, Set<Long>> partitionMappings = null;
// check gracePeriod
long gracePeriodMills = mtmv.getGracePeriod();
for (Partition partition : allPartitions) {
@@ -73,7 +76,11 @@ public class MTMVRewriteUtil {
continue;
}
try {
- if (MTMVPartitionUtil.isMTMVPartitionSync(mtmv,
partition.getId(), mtmvRelation.getBaseTables(),
+ if (partitionMappings == null) {
+ partitionMappings = mtmv.calculatePartitionMappings();
+ }
+ if (MTMVPartitionUtil.isMTMVPartitionSync(mtmv,
partition.getId(),
+ partitionMappings.get(partition.getId()),
mtmvRelation.getBaseTables(),
Sets.newHashSet())) {
res.add(partition);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java
index 2b2fe8ab9f2..90c98a44294 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java
@@ -35,6 +35,7 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.util.PropertyAnalyzer;
+import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.mtmv.EnvInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
@@ -304,8 +305,9 @@ public class CreateMTMVInfo {
if
(!partitionColumnNames.contains(relatedTableInfo.get().getColumn())) {
throw new AnalysisException("error related column: " +
relatedTableInfo.get().getColumn());
}
- if (partitionColumnNames.size() != 1) {
- throw new AnalysisException("base table for partitioning
only support single column.");
+ if (!(mtmvBaseRealtedTable instanceof HMSExternalTable)
+ && partitionColumnNames.size() != 1) {
+ throw new AnalysisException("only hms table support multi
column partition.");
}
mvPartitionInfo.setRelatedTable(relatedTableInfo.get().getTableInfo());
mvPartitionInfo.setRelatedCol(relatedTableInfo.get().getColumn());
@@ -322,7 +324,7 @@ public class CreateMTMVInfo {
List<AllPartitionDesc> allPartitionDescs = null;
try {
allPartitionDescs = MTMVPartitionUtil
- .getPartitionDescsByRelatedTable(relatedTable, properties);
+ .getPartitionDescsByRelatedTable(relatedTable, properties,
mvPartitionInfo.getRelatedCol());
} catch (org.apache.doris.common.AnalysisException e) {
throw new AnalysisException("getPartitionDescsByRelatedTable
failed", e);
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java
b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java
index 62f2fd5ff58..df38ce18720 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java
@@ -151,7 +151,8 @@ public class MTMVPartitionUtilTest {
@Test
public void testIsSyncWithPartition() throws AnalysisException {
- boolean isSyncWithPartition =
MTMVPartitionUtil.isSyncWithPartition(mtmv, 1L, baseOlapTable, 2L);
+ boolean isSyncWithPartition = MTMVPartitionUtil
+ .isSyncWithPartitions(mtmv, 1L, baseOlapTable,
Sets.newHashSet(2L));
Assert.assertTrue(isSyncWithPartition);
}
@@ -164,7 +165,8 @@ public class MTMVPartitionUtilTest {
result = false;
}
};
- boolean isSyncWithPartition =
MTMVPartitionUtil.isSyncWithPartition(mtmv, 1L, baseOlapTable, 2L);
+ boolean isSyncWithPartition = MTMVPartitionUtil
+ .isSyncWithPartitions(mtmv, 1L, baseOlapTable,
Sets.newHashSet(2L));
Assert.assertFalse(isSyncWithPartition);
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java
b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java
index 55394897e42..8de7ed75ccd 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java
@@ -103,7 +103,8 @@ public class MTMVRewriteUtilTest {
minTimes = 0;
result = true;
- MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong,
(Set<BaseTableInfo>) any, (Set<String>) any);
+ MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong,
(Set<Long>) any, (Set<BaseTableInfo>) any,
+ (Set<String>) any);
minTimes = 0;
result = true;
@@ -129,7 +130,8 @@ public class MTMVRewriteUtilTest {
minTimes = 0;
result = 2L;
- MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong,
(Set<BaseTableInfo>) any, (Set<String>) any);
+ MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong,
(Set<Long>) any, (Set<BaseTableInfo>) any,
+ (Set<String>) any);
minTimes = 0;
result = false;
}
@@ -148,7 +150,8 @@ public class MTMVRewriteUtilTest {
minTimes = 0;
result = 1L;
- MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong,
(Set<BaseTableInfo>) any, (Set<String>) any);
+ MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong,
(Set<Long>) any, (Set<BaseTableInfo>) any,
+ (Set<String>) any);
minTimes = 0;
result = false;
}
@@ -177,7 +180,8 @@ public class MTMVRewriteUtilTest {
public void testGetMTMVCanRewritePartitionsNotSync() throws
AnalysisException {
new Expectations() {
{
- MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong,
(Set<BaseTableInfo>) any, (Set<String>) any);
+ MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong,
(Set<Long>) any, (Set<BaseTableInfo>) any,
+ (Set<String>) any);
minTimes = 0;
result = false;
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java
b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java
index b1fc52dad46..1d1c952bb3c 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java
@@ -28,6 +28,7 @@ import
org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import mockit.Expectations;
import mockit.Mocked;
@@ -37,6 +38,7 @@ import org.junit.Before;
import org.junit.Test;
import java.util.List;
+import java.util.Map;
import java.util.Set;
public class MTMVTaskTest {
@@ -84,7 +86,8 @@ public class MTMVTaskTest {
minTimes = 0;
result = poneId;
- mtmvPartitionUtil.isMTMVSync(mtmv, (Set<BaseTableInfo>) any,
(Set<String>) any);
+ mtmvPartitionUtil.isMTMVSync(mtmv, (Set<BaseTableInfo>) any,
(Set<String>) any,
+ (Map<Long, Set<Long>>) any);
minTimes = 0;
result = true;
@@ -101,9 +104,9 @@ public class MTMVTaskTest {
@Test
public void testCalculateNeedRefreshPartitionsManualComplete() throws
AnalysisException {
- MTMVTaskContext context = new
MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, Lists.newArrayList(poneName), true);
+ MTMVTaskContext context = new
MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, null, true);
MTMVTask task = new MTMVTask(mtmv, relation, context);
- List<Long> result = task.calculateNeedRefreshPartitions();
+ List<Long> result =
task.calculateNeedRefreshPartitions(Maps.newHashMap());
Assert.assertEquals(allPartitionIds, result);
}
@@ -111,7 +114,7 @@ public class MTMVTaskTest {
public void testCalculateNeedRefreshPartitionsManualPartitions() throws
AnalysisException {
MTMVTaskContext context = new
MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, Lists.newArrayList(poneName),
false);
MTMVTask task = new MTMVTask(mtmv, relation, context);
- List<Long> result = task.calculateNeedRefreshPartitions();
+ List<Long> result =
task.calculateNeedRefreshPartitions(Maps.newHashMap());
Assert.assertEquals(Lists.newArrayList(poneId), result);
}
@@ -119,7 +122,7 @@ public class MTMVTaskTest {
public void testCalculateNeedRefreshPartitionsSystem() throws
AnalysisException {
MTMVTaskContext context = new
MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM);
MTMVTask task = new MTMVTask(mtmv, relation, context);
- List<Long> result = task.calculateNeedRefreshPartitions();
+ List<Long> result =
task.calculateNeedRefreshPartitions(Maps.newHashMap());
Assert.assertTrue(CollectionUtils.isEmpty(result));
}
@@ -127,14 +130,15 @@ public class MTMVTaskTest {
public void testCalculateNeedRefreshPartitionsSystemNotSyncComplete()
throws AnalysisException {
new Expectations() {
{
- mtmvPartitionUtil.isMTMVSync(mtmv, (Set<BaseTableInfo>) any,
(Set<String>) any);
+ mtmvPartitionUtil.isMTMVSync(mtmv, (Set<BaseTableInfo>) any,
(Set<String>) any,
+ (Map<Long, Set<Long>>) any);
minTimes = 0;
result = false;
}
};
MTMVTaskContext context = new
MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM);
MTMVTask task = new MTMVTask(mtmv, relation, context);
- List<Long> result = task.calculateNeedRefreshPartitions();
+ List<Long> result =
task.calculateNeedRefreshPartitions(Maps.newHashMap());
Assert.assertEquals(allPartitionIds, result);
}
@@ -142,7 +146,8 @@ public class MTMVTaskTest {
public void testCalculateNeedRefreshPartitionsSystemNotSyncAuto() throws
AnalysisException {
new Expectations() {
{
- mtmvPartitionUtil.isMTMVSync(mtmv, (Set<BaseTableInfo>) any,
(Set<String>) any);
+ mtmvPartitionUtil
+ .isMTMVSync(mtmv, (Set<BaseTableInfo>) any,
(Set<String>) any, (Map<Long, Set<Long>>) any);
minTimes = 0;
result = false;
@@ -150,14 +155,15 @@ public class MTMVTaskTest {
minTimes = 0;
result = RefreshMethod.AUTO;
- mtmvPartitionUtil.getMTMVNeedRefreshPartitions(mtmv,
(Set<BaseTableInfo>) any);
+ mtmvPartitionUtil
+ .getMTMVNeedRefreshPartitions(mtmv,
(Set<BaseTableInfo>) any, (Map<Long, Set<Long>>) any);
minTimes = 0;
result = Lists.newArrayList(ptwoId);
}
};
MTMVTaskContext context = new
MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM);
MTMVTask task = new MTMVTask(mtmv, relation, context);
- List<Long> result = task.calculateNeedRefreshPartitions();
+ List<Long> result =
task.calculateNeedRefreshPartitions(Maps.newHashMap());
Assert.assertEquals(Lists.newArrayList(ptwoId), result);
}
}
diff --git a/regression-test/data/mtmv_p0/test_hive_multi_partition_mtmv.out
b/regression-test/data/mtmv_p0/test_hive_multi_partition_mtmv.out
new file mode 100644
index 00000000000..6f00353bf59
--- /dev/null
+++ b/regression-test/data/mtmv_p0/test_hive_multi_partition_mtmv.out
@@ -0,0 +1,43 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_base_table --
+1 2020 bj
+2 2020 sh
+3 2021 bj
+4 2021 sh
+5 2022 bj
+6 2022 sh
+
+-- !mtmv_year_2020 --
+1 2020 bj
+2 2020 sh
+
+-- !mtmv_year_complete --
+1 2020 bj
+2 2020 sh
+3 2021 bj
+4 2021 sh
+5 2022 bj
+6 2022 sh
+
+-- !mtmv_region_bj --
+1 2020 bj
+3 2021 bj
+5 2022 bj
+
+-- !mtmv_region_complete --
+1 2020 bj
+2 2020 sh
+3 2021 bj
+4 2021 sh
+5 2022 bj
+6 2022 sh
+
+-- !mtmv_data_change --
+1 2020 bj
+2 2020 sh
+3 2021 bj
+4 2021 sh
+5 2022 bj
+6 2022 sh
+7 2020 bj
+
diff --git
a/regression-test/suites/mtmv_p0/test_hive_multi_partition_mtmv.groovy
b/regression-test/suites/mtmv_p0/test_hive_multi_partition_mtmv.groovy
new file mode 100644
index 00000000000..c8e5830dcac
--- /dev/null
+++ b/regression-test/suites/mtmv_p0/test_hive_multi_partition_mtmv.groovy
@@ -0,0 +1,253 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_hive_multi_partition_mtmv",
"p0,external,hive,external_docker,external_docker_hive") {
+ String enabled = context.config.otherConfigs.get("enableHiveTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("diable Hive test.")
+ return;
+ }
+ // prepare data in hive
+ def hive_database = "test_hive_multi_partition_mtmv_db"
+ def hive_table = "partition2"
+
+ def drop_table_str = """ drop table if exists
${hive_database}.${hive_table} """
+ def drop_database_str = """ drop database if exists ${hive_database}"""
+ def create_database_str = """ create database ${hive_database}"""
+ def create_table_str = """ CREATE TABLE ${hive_database}.${hive_table} (
+ `k1` int)
+ PARTITIONED BY (
+ `year` int,
+ `region` string)
+ STORED AS ORC;
+ """
+ def add_partition_str = """
+ alter table ${hive_database}.${hive_table} add
if not exists
+ partition(year=2020,region="bj")
+ partition(year=2020,region="sh")
+ partition(year=2021,region="bj")
+ partition(year=2021,region="sh")
+ partition(year=2022,region="bj")
+ partition(year=2022,region="sh")
+ """
+ def insert_str1 = """insert into ${hive_database}.${hive_table}
PARTITION(year=2020,region="bj") values(1)"""
+ def insert_str2 = """insert into ${hive_database}.${hive_table}
PARTITION(year=2020,region="sh") values(2)"""
+ def insert_str3 = """insert into ${hive_database}.${hive_table}
PARTITION(year=2021,region="bj") values(3)"""
+ def insert_str4 = """insert into ${hive_database}.${hive_table}
PARTITION(year=2021,region="sh") values(4)"""
+ def insert_str5 = """insert into ${hive_database}.${hive_table}
PARTITION(year=2022,region="bj") values(5)"""
+ def insert_str6 = """insert into ${hive_database}.${hive_table}
PARTITION(year=2022,region="sh") values(6)"""
+
+ logger.info("hive sql: " + drop_table_str)
+ hive_docker """ ${drop_table_str} """
+ logger.info("hive sql: " + drop_database_str)
+ hive_docker """ ${drop_database_str} """
+ logger.info("hive sql: " + create_database_str)
+ hive_docker """ ${create_database_str}"""
+ logger.info("hive sql: " + create_table_str)
+ hive_docker """ ${create_table_str} """
+ logger.info("hive sql: " + add_partition_str)
+ hive_docker """ ${add_partition_str} """
+ logger.info("hive sql: " + insert_str1)
+ hive_docker """ ${insert_str1} """
+ logger.info("hive sql: " + insert_str2)
+ hive_docker """ ${insert_str2} """
+ logger.info("hive sql: " + insert_str3)
+ hive_docker """ ${insert_str3} """
+ logger.info("hive sql: " + insert_str4)
+ hive_docker """ ${insert_str4} """
+ logger.info("hive sql: " + insert_str5)
+ hive_docker """ ${insert_str5} """
+ logger.info("hive sql: " + insert_str6)
+ hive_docker """ ${insert_str6} """
+
+
+ // prepare catalog
+ String hms_port = context.config.otherConfigs.get("hms_port")
+ String catalog_name = "test_hive_multi_partition_mtmv_catalog"
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """create catalog if not exists ${catalog_name} properties (
+ "type"="hms",
+ 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}'
+ );"""
+
+ order_qt_select_base_table "SELECT * FROM
${catalog_name}.${hive_database}.${hive_table}"
+
+
+ // prepare mtmv
+ def mvName = "test_hive_multi_partition_mtmv"
+ def dbName = "regression_test_mtmv_p0"
+ sql """drop materialized view if exists ${mvName};"""
+
+ // partition by year
+ sql """
+ CREATE MATERIALIZED VIEW ${mvName}
+ BUILD DEFERRED REFRESH AUTO ON MANUAL
+ partition by(`year`)
+ DISTRIBUTED BY RANDOM BUCKETS 2
+ PROPERTIES ('replication_num' = '1')
+ AS
+ SELECT k1,year,region FROM
${catalog_name}.${hive_database}.${hive_table};
+ """
+ def showPartitionsResult = sql """show partitions from ${mvName}"""
+ logger.info("showPartitionsResult: " + showPartitionsResult.toString())
+ assertTrue(showPartitionsResult.toString().contains("p_2020"))
+ assertTrue(showPartitionsResult.toString().contains("p_2021"))
+ assertTrue(showPartitionsResult.toString().contains("p_2022"))
+ assertEquals(showPartitionsResult.size(),3)
+ // refresh p_2020
+ sql """
+ REFRESH MATERIALIZED VIEW ${mvName} partitions(p_2020);
+ """
+ def jobName = getJobName(dbName, mvName);
+ waitingMTMVTaskFinished(jobName)
+ order_qt_mtmv_year_2020 "SELECT * FROM ${mvName} order by k1,year,region"
+
+ // refresh complete
+ sql """
+ REFRESH MATERIALIZED VIEW ${mvName} complete
+ """
+ waitingMTMVTaskFinished(jobName)
+ order_qt_mtmv_year_complete "SELECT * FROM ${mvName} order by
k1,year,region"
+
+ sql """drop materialized view if exists ${mvName};"""
+ // partition by region
+ sql """
+ CREATE MATERIALIZED VIEW ${mvName}
+ BUILD DEFERRED REFRESH AUTO ON MANUAL
+ partition by(`region`)
+ DISTRIBUTED BY RANDOM BUCKETS 2
+ PROPERTIES ('replication_num' = '1')
+ AS
+ SELECT k1,year,region FROM
${catalog_name}.${hive_database}.${hive_table};
+ """
+ showPartitionsResult = sql """show partitions from ${mvName}"""
+ logger.info("showPartitionsResult: " + showPartitionsResult.toString())
+ assertTrue(showPartitionsResult.toString().contains("p_bj"))
+ assertTrue(showPartitionsResult.toString().contains("p_sh"))
+ assertEquals(showPartitionsResult.size(),2)
+ // refresh p_bj
+ sql """
+ REFRESH MATERIALIZED VIEW ${mvName} partitions(p_bj);
+ """
+ jobName = getJobName(dbName, mvName);
+ waitingMTMVTaskFinished(jobName)
+ order_qt_mtmv_region_bj "SELECT * FROM ${mvName} order by k1,year,region"
+
+ // refresh complete
+ sql """
+ REFRESH MATERIALIZED VIEW ${mvName} complete
+ """
+ waitingMTMVTaskFinished(jobName)
+ order_qt_mtmv_region_complete "SELECT * FROM ${mvName} order by
k1,year,region"
+
+ // hive data change
+ def insert_str7 = """
+ insert into ${hive_database}.${hive_table}
PARTITION(year=2020,region="bj") values(7);
+ """
+ logger.info("hive sql: " + insert_str7)
+ hive_docker """ ${insert_str7} """
+ sql """
+ REFRESH catalog ${catalog_name}
+ """
+ sql """
+ REFRESH MATERIALIZED VIEW ${mvName} partitions(p_bj);
+ """
+ waitingMTMVTaskFinished(jobName)
+ order_qt_mtmv_data_change "SELECT * FROM ${mvName} order by
k1,year,region"
+
+ // hive add partition year
+ def add_partition2023_bj_str = """
+ alter table
${hive_database}.${hive_table} add if not exists
+ partition(year=2023,region="bj");
+ """
+ logger.info("hive sql: " + add_partition2023_bj_str)
+ hive_docker """ ${add_partition2023_bj_str} """
+ sql """
+ REFRESH catalog ${catalog_name}
+ """
+ sql """
+ REFRESH MATERIALIZED VIEW ${mvName} complete
+ """
+ waitingMTMVTaskFinished(jobName)
+ showPartitionsResult = sql """show partitions from ${mvName}"""
+ logger.info("showPartitionsResult: " + showPartitionsResult.toString())
+ assertEquals(showPartitionsResult.size(),2)
+
+ // hive add partition region
+ def add_partition2023_tj_str = """
+ alter table
${hive_database}.${hive_table} add if not exists
+ partition(year=2023,region="tj");
+ """
+ logger.info("hive sql: " + add_partition2023_tj_str)
+ hive_docker """ ${add_partition2023_tj_str} """
+ sql """
+ REFRESH catalog ${catalog_name}
+ """
+ sql """
+ REFRESH MATERIALIZED VIEW ${mvName} complete
+ """
+ waitingMTMVTaskFinished(jobName)
+ showPartitionsResult = sql """show partitions from ${mvName}"""
+ logger.info("showPartitionsResult: " + showPartitionsResult.toString())
+ assertEquals(showPartitionsResult.size(),3)
+ assertTrue(showPartitionsResult.toString().contains("p_tj"))
+
+ // hive drop partition
+ def drop_partition2023_bj_str = """
+ alter table
${hive_database}.${hive_table} drop if exists
+ partition(year=2023,region="bj");
+ """
+ logger.info("hive sql: " + drop_partition2023_bj_str)
+ hive_docker """ ${drop_partition2023_bj_str} """
+ sql """
+ REFRESH catalog ${catalog_name}
+ """
+ sql """
+ REFRESH MATERIALIZED VIEW ${mvName} complete
+ """
+ waitingMTMVTaskFinished(jobName)
+ showPartitionsResult = sql """show partitions from ${mvName}"""
+ logger.info("showPartitionsResult: " + showPartitionsResult.toString())
+ assertTrue(showPartitionsResult.toString().contains("p_bj"))
+ assertTrue(showPartitionsResult.toString().contains("p_sh"))
+ assertTrue(showPartitionsResult.toString().contains("p_tj"))
+
+ def drop_partition2023_tj_str = """
+ alter table
${hive_database}.${hive_table} drop if exists
+ partition(year=2023,region="tj");
+ """
+ logger.info("hive sql: " + drop_partition2023_tj_str)
+ hive_docker """ ${drop_partition2023_tj_str} """
+ sql """
+ REFRESH catalog ${catalog_name}
+ """
+ sql """
+ REFRESH MATERIALIZED VIEW ${mvName} complete
+ """
+ waitingMTMVTaskFinished(jobName)
+ showPartitionsResult = sql """show partitions from ${mvName}"""
+ logger.info("showPartitionsResult: " + showPartitionsResult.toString())
+ assertTrue(showPartitionsResult.toString().contains("p_bj"))
+ assertTrue(showPartitionsResult.toString().contains("p_sh"))
+ assertFalse(showPartitionsResult.toString().contains("p_tj"))
+
+ sql """drop materialized view if exists ${mvName};"""
+ sql """drop catalog if exists ${catalog_name}"""
+}
+
diff --git a/regression-test/suites/mtmv_p0/test_partition_refresh_mtmv.groovy
b/regression-test/suites/mtmv_p0/test_partition_refresh_mtmv.groovy
index c9a3fd27128..82ca125104d 100644
--- a/regression-test/suites/mtmv_p0/test_partition_refresh_mtmv.groovy
+++ b/regression-test/suites/mtmv_p0/test_partition_refresh_mtmv.groovy
@@ -61,7 +61,7 @@ suite("test_partition_refresh_mtmv") {
sql """drop table if exists `${tableNameNum}`"""
sql """drop materialized view if exists ${mvName};"""
- // base table has two partition col
+ // base table has two partition col(range)
sql """
CREATE TABLE `${tableNameNum}` (
`user_id` LARGEINT NOT NULL COMMENT '\"用户id\"',
@@ -79,21 +79,56 @@ suite("test_partition_refresh_mtmv") {
"""
try {
- sql """
- CREATE MATERIALIZED VIEW ${mvName}
- BUILD DEFERRED REFRESH AUTO ON MANUAL
- partition by(`date`)
- DISTRIBUTED BY RANDOM BUCKETS 2
- PROPERTIES ('replication_num' = '1')
- AS
- SELECT * FROM ${tableNameNum};
- """
- Assert.fail();
- } catch (Exception e) {
- log.info(e.getMessage())
- }
- sql """drop table if exists `${tableNameNum}`"""
- sql """drop materialized view if exists ${mvName};"""
+ sql """
+ CREATE MATERIALIZED VIEW ${mvName}
+ BUILD DEFERRED REFRESH AUTO ON MANUAL
+ partition by(`date`)
+ DISTRIBUTED BY RANDOM BUCKETS 2
+ PROPERTIES ('replication_num' = '1')
+ AS
+ SELECT * FROM ${tableNameNum};
+ """
+ Assert.fail();
+ } catch (Exception e) {
+ log.info(e.getMessage())
+ }
+ sql """drop table if exists `${tableNameNum}`"""
+ sql """drop materialized view if exists ${mvName};"""
+
+ // base table has two partition col(list)
+ sql """
+ CREATE TABLE `${tableNameNum}` (
+ `user_id` LARGEINT NOT NULL COMMENT '\"用户id\"',
+ `date` DATE NOT NULL COMMENT '\"数据灌入日期时间\"',
+ `num` SMALLINT NOT NULL COMMENT '\"数量\"'
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`user_id`, `date`, `num`)
+ COMMENT 'OLAP'
+ PARTITION BY LIST(`date`,`num`)
+ (
+ PARTITION p201701_1000 VALUES IN (('2017-01-01',1), ('2017-01-01',2)),
+ PARTITION p201702_2000 VALUES IN (('2017-02-01',3), ('2017-02-01',4))
+ )
+ DISTRIBUTED BY HASH(`user_id`) BUCKETS 2
+ PROPERTIES ('replication_num' = '1') ;
+ """
+
+ try {
+ sql """
+ CREATE MATERIALIZED VIEW ${mvName}
+ BUILD DEFERRED REFRESH AUTO ON MANUAL
+ partition by(`date`)
+ DISTRIBUTED BY RANDOM BUCKETS 2
+ PROPERTIES ('replication_num' = '1')
+ AS
+ SELECT * FROM ${tableNameNum};
+ """
+ Assert.fail();
+ } catch (Exception e) {
+ log.info(e.getMessage())
+ }
+ sql """drop table if exists `${tableNameNum}`"""
+ sql """drop materialized view if exists ${mvName};"""
// range date partition
sql """
@@ -220,7 +255,8 @@ suite("test_partition_refresh_mtmv") {
showPartitionsResult = sql """show partitions from ${mvName}"""
logger.info("showPartitionsResult: " + showPartitionsResult.toString())
assertTrue(showPartitionsResult.toString().contains("p_1"))
- assertTrue(showPartitionsResult.toString().contains("p_2_3"))
+ assertTrue(showPartitionsResult.toString().contains("_2"))
+ assertTrue(showPartitionsResult.toString().contains("_3"))
sql """
REFRESH MATERIALIZED VIEW ${mvName}
@@ -330,6 +366,26 @@ suite("test_partition_refresh_mtmv") {
waitingMTMVTaskFinished(jobName)
order_qt_refresh_other_table_change_other "SELECT * FROM ${mvName} order
by user_id,age,date,num"
+ //test base table add partition
+ sql """alter table ${tableNameNum} ADD PARTITION p201704 VALUES
[('2017-04-01'), ('2017-05-01'))"""
+ sql """
+ REFRESH MATERIALIZED VIEW ${mvName};
+ """
+ waitingMTMVTaskFinished(jobName)
+ showPartitionsResult = sql """show partitions from ${mvName}"""
+ logger.info("showPartitionsResult: " + showPartitionsResult.toString())
+ assertTrue(showPartitionsResult.toString().contains("p_20170401_20170501"))
+
+ //test base table drop partition
+ sql """alter table ${tableNameNum} drop PARTITION p201704"""
+ sql """
+ REFRESH MATERIALIZED VIEW ${mvName};
+ """
+ waitingMTMVTaskFinished(jobName)
+ showPartitionsResult = sql """show partitions from ${mvName}"""
+ logger.info("showPartitionsResult: " + showPartitionsResult.toString())
+
assertFalse(showPartitionsResult.toString().contains("p_20170401_20170501"))
+
// test exclude table
sql """drop materialized view if exists ${mvName};"""
sql """drop table if exists `${tableNameNum}`"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]