This is an automated email from the ASF dual-hosted git repository.

lijibing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d6ff2d56dc [feature](mtmv)Support iceberg mtmv query. (#45659)
6d6ff2d56dc is described below

commit 6d6ff2d56dca970756b9b78778cf80c772b35f2d
Author: James <[email protected]>
AuthorDate: Tue Dec 24 11:58:51 2024 +0800

    [feature](mtmv)Support iceberg mtmv query. (#45659)
    
    ### What problem does this PR solve?
    
    1. Implement MvccTable interface for IcebertExternalTable
    2. IcebergExternalTable overrides the methods in ExternalTable and
    supports partition pruning
    3. Add snapshot cache in IcebergMetadataCache to store
    IcebergExternalTable partition infos.
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    ### Release note
    
    None
---
 .../apache/doris/catalog/RangePartitionItem.java   |   7 +-
 .../doris/datasource/hive/HMSExternalTable.java    |   2 +-
 .../datasource/iceberg/IcebergExternalTable.java   | 171 +++++++++++++--------
 .../datasource/iceberg/IcebergMetadataCache.java   |  42 ++++-
 .../datasource/iceberg/IcebergMvccSnapshot.java    |  32 ++++
 .../datasource/iceberg/IcebergSchemaCacheKey.java  |  55 +++++++
 .../iceberg/IcebergSchemaCacheValue.java           |  15 +-
 ...gSchemaCacheValue.java => IcebergSnapshot.java} |  30 +---
 ...heValue.java => IcebergSnapshotCacheValue.java} |  25 +--
 .../doris/datasource/iceberg/IcebergUtils.java     |  14 +-
 .../iceberg/IcebergExternalTableTest.java          |  48 ++++--
 regression-test/data/mtmv_p0/test_iceberg_mtmv.out |  15 ++
 .../suites/mtmv_p0/test_iceberg_mtmv.groovy        |  56 +++++++
 13 files changed, 364 insertions(+), 148 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java
index 96bf0097c28..cad6ca38130 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java
@@ -65,14 +65,9 @@ public class RangePartitionItem extends PartitionItem {
 
     @Override
     public PartitionKeyDesc toPartitionKeyDesc() {
-        if (partitionKeyRange.hasLowerBound()) {
-            return PartitionKeyDesc.createFixed(
+        return PartitionKeyDesc.createFixed(
                 
PartitionInfo.toPartitionValue(partitionKeyRange.lowerEndpoint()),
                 
PartitionInfo.toPartitionValue(partitionKeyRange.upperEndpoint()));
-        } else {
-            // For null partition value.
-            return 
PartitionKeyDesc.createLessThan(PartitionInfo.toPartitionValue(partitionKeyRange.upperEndpoint()));
-        }
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index da4670d6d05..a6fb486bed9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -543,7 +543,7 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
     }
 
     private List<Column> getIcebergSchema() {
-        return IcebergUtils.getSchema(catalog, dbName, name);
+        return IcebergUtils.getSchema(catalog, dbName, name, 
IcebergUtils.UNKNOWN_SNAPSHOT_ID);
     }
 
     private List<Column> getHudiSchema() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
index e259399f637..7f7d2fdf578 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
@@ -27,9 +27,14 @@ import org.apache.doris.catalog.PartitionType;
 import org.apache.doris.catalog.RangePartitionItem;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.datasource.CacheException;
+import org.apache.doris.datasource.ExternalSchemaCache;
+import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
 import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.SchemaCacheValue;
 import org.apache.doris.datasource.mvcc.MvccSnapshot;
+import org.apache.doris.datasource.mvcc.MvccTable;
+import org.apache.doris.datasource.mvcc.MvccUtil;
 import org.apache.doris.mtmv.MTMVBaseTableIf;
 import org.apache.doris.mtmv.MTMVRefreshContext;
 import org.apache.doris.mtmv.MTMVRelatedTableIf;
@@ -77,7 +82,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-public class IcebergExternalTable extends ExternalTable implements 
MTMVRelatedTableIf, MTMVBaseTableIf {
+public class IcebergExternalTable extends ExternalTable implements 
MTMVRelatedTableIf, MTMVBaseTableIf, MvccTable {
 
     public static final String YEAR = "year";
     public static final String MONTH = "month";
@@ -117,39 +122,23 @@ public class IcebergExternalTable extends ExternalTable 
implements MTMVRelatedTa
     }
 
     @Override
-    public Optional<SchemaCacheValue> initSchema() {
-        table = IcebergUtils.getIcebergTable(catalog, dbName, name);
-        List<Column> schema = IcebergUtils.getSchema(catalog, dbName, name);
-        Snapshot snapshot = table.currentSnapshot();
-        if (snapshot == null) {
-            LOG.debug("Table {} is empty", name);
-            return Optional.of(new IcebergSchemaCacheValue(schema, null, -1, 
null));
-        }
-        long snapshotId = snapshot.snapshotId();
-        partitionColumns = null;
-        IcebergPartitionInfo partitionInfo = null;
-        if (isValidRelatedTable()) {
-            PartitionSpec spec = table.spec();
-            partitionColumns = Lists.newArrayList();
-
-            // For iceberg table, we only support table with 1 partition 
column as RelatedTable.
-            // So we use spec.fields().get(0) to get the partition column.
-            Types.NestedField col = 
table.schema().findField(spec.fields().get(0).sourceId());
+    public Optional<SchemaCacheValue> initSchema(SchemaCacheKey key) {
+        table = getIcebergTable();
+        List<Column> schema = IcebergUtils.getSchema(catalog, dbName, name,
+                ((IcebergSchemaCacheKey) key).getSchemaId());
+        List<Column> tmpColumns = Lists.newArrayList();
+        PartitionSpec spec = table.spec();
+        for (PartitionField field : spec.fields()) {
+            Types.NestedField col = table.schema().findField(field.sourceId());
             for (Column c : schema) {
                 if (c.getName().equalsIgnoreCase(col.name())) {
-                    partitionColumns.add(c);
+                    tmpColumns.add(c);
                     break;
                 }
             }
-            Preconditions.checkState(partitionColumns.size() == 1,
-                    "Support 1 partition column for iceberg table, but found " 
+ partitionColumns.size());
-            try {
-                partitionInfo = loadPartitionInfo();
-            } catch (AnalysisException e) {
-                LOG.warn("Failed to load iceberg table {} partition info.", 
name, e);
-            }
         }
-        return Optional.of(new IcebergSchemaCacheValue(schema, 
partitionColumns, snapshotId, partitionInfo));
+        partitionColumns = tmpColumns;
+        return Optional.of(new IcebergSchemaCacheValue(schema, 
partitionColumns));
     }
 
     @Override
@@ -187,6 +176,11 @@ public class IcebergExternalTable extends ExternalTable 
implements MTMVRelatedTa
         return IcebergUtils.getIcebergTable(getCatalog(), getDbName(), 
getName());
     }
 
+    private IcebergSnapshotCacheValue getIcebergSnapshotCacheValue() {
+        return 
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache()
+            .getSnapshotCache(catalog, dbName, name);
+    }
+
     @Override
     public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
         Env.getCurrentEnv().getRefreshManager()
@@ -195,46 +189,36 @@ public class IcebergExternalTable extends ExternalTable 
implements MTMVRelatedTa
 
     @Override
     public Map<String, PartitionItem> 
getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
-        return 
Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem());
+        return 
Maps.newHashMap(getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartitionItem());
     }
 
-    private IcebergPartitionInfo getPartitionInfoFromCache() {
-        makeSureInitialized();
-        Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
-        if (!schemaCacheValue.isPresent()) {
-            return new IcebergPartitionInfo();
-        }
-        return ((IcebergSchemaCacheValue) 
schemaCacheValue.get()).getPartitionInfo();
+    @Override
+    public Map<String, PartitionItem> 
getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
+        return 
getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartitionItem();
     }
 
     @Override
     public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) {
-        makeSureInitialized();
         return isValidRelatedTable() ? PartitionType.RANGE : 
PartitionType.UNPARTITIONED;
     }
 
     @Override
     public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> 
snapshot) throws DdlException {
-        return 
getPartitionColumnsFromCache().stream().map(Column::getName).collect(Collectors.toSet());
+        return 
getPartitionColumns(snapshot).stream().map(Column::getName).collect(Collectors.toSet());
     }
 
     @Override
     public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
-        return getPartitionColumnsFromCache();
-    }
-
-    private List<Column> getPartitionColumnsFromCache() {
-        makeSureInitialized();
-        Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
-        return schemaCacheValue
-                .map(cacheValue -> ((IcebergSchemaCacheValue) 
cacheValue).getPartitionColumns())
-                .orElseGet(Lists::newArrayList);
+        IcebergSnapshotCacheValue snapshotValue = 
getOrFetchSnapshotCacheValue(snapshot);
+        IcebergSchemaCacheValue schemaValue = 
getIcebergSchemaCacheValue(snapshotValue.getSnapshot().getSchemaId());
+        return schemaValue.getPartitionColumns();
     }
 
     @Override
     public MTMVSnapshotIf getPartitionSnapshot(String partitionName, 
MTMVRefreshContext context,
                                                Optional<MvccSnapshot> 
snapshot) throws AnalysisException {
-        long latestSnapshotId = 
getPartitionInfoFromCache().getLatestSnapshotId(partitionName);
+        IcebergSnapshotCacheValue snapshotValue = 
getOrFetchSnapshotCacheValue(snapshot);
+        long latestSnapshotId = 
snapshotValue.getPartitionInfo().getLatestSnapshotId(partitionName);
         if (latestSnapshotId <= 0) {
             throw new AnalysisException("can not find partition: " + 
partitionName);
         }
@@ -244,16 +228,9 @@ public class IcebergExternalTable extends ExternalTable 
implements MTMVRelatedTa
     @Override
     public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, 
Optional<MvccSnapshot> snapshot)
             throws AnalysisException {
-        return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache());
-    }
-
-    public long getLatestSnapshotIdFromCache() throws AnalysisException {
         makeSureInitialized();
-        Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
-        if (!schemaCacheValue.isPresent()) {
-            throw new AnalysisException("Can't find schema cache of table " + 
name);
-        }
-        return ((IcebergSchemaCacheValue) 
schemaCacheValue.get()).getSnapshotId();
+        IcebergSnapshotCacheValue snapshotValue = 
getOrFetchSnapshotCacheValue(snapshot);
+        return new 
MTMVVersionSnapshot(snapshotValue.getSnapshot().getSnapshotId());
     }
 
     @Override
@@ -268,11 +245,13 @@ public class IcebergExternalTable extends ExternalTable 
implements MTMVRelatedTa
      */
     @Override
     public boolean isValidRelatedTable() {
+        makeSureInitialized();
         if (isValidRelatedTableCached) {
             return isValidRelatedTable;
         }
         isValidRelatedTable = false;
         Set<String> allFields = Sets.newHashSet();
+        table = getIcebergTable();
         for (PartitionSpec spec : table.specs().values()) {
             if (spec == null) {
                 isValidRelatedTableCached = true;
@@ -299,14 +278,62 @@ public class IcebergExternalTable extends ExternalTable 
implements MTMVRelatedTa
         return isValidRelatedTable;
     }
 
-    protected IcebergPartitionInfo loadPartitionInfo() throws 
AnalysisException {
-        List<IcebergPartition> icebergPartitions = loadIcebergPartition();
+    @Override
+    public MvccSnapshot loadSnapshot() {
+        return new IcebergMvccSnapshot(getIcebergSnapshotCacheValue());
+    }
+
+    public long getLatestSnapshotId() {
+        table = getIcebergTable();
+        Snapshot snapshot = table.currentSnapshot();
+        return snapshot == null ? IcebergUtils.UNKNOWN_SNAPSHOT_ID : 
table.currentSnapshot().snapshotId();
+    }
+
+    public long getSchemaId(long snapshotId) {
+        table = getIcebergTable();
+        return snapshotId == IcebergUtils.UNKNOWN_SNAPSHOT_ID
+                ? IcebergUtils.UNKNOWN_SNAPSHOT_ID
+                : table.snapshot(snapshotId).schemaId();
+    }
+
+    @Override
+    public List<Column> getFullSchema() {
+        Optional<MvccSnapshot> snapshotFromContext = 
MvccUtil.getSnapshotFromContext(this);
+        IcebergSnapshotCacheValue cacheValue = 
getOrFetchSnapshotCacheValue(snapshotFromContext);
+        return 
getIcebergSchemaCacheValue(cacheValue.getSnapshot().getSchemaId()).getSchema();
+    }
+
+    @Override
+    public boolean supportInternalPartitionPruned() {
+        return true;
+    }
+
+    public IcebergSchemaCacheValue getIcebergSchemaCacheValue(long schemaId) {
+        ExternalSchemaCache cache = 
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog);
+        Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue(
+            new IcebergSchemaCacheKey(dbName, name, schemaId));
+        if (!schemaCacheValue.isPresent()) {
+            throw new CacheException("failed to getSchema for: %s.%s.%s.%s",
+                null, catalog.getName(), dbName, name, schemaId);
+        }
+        return (IcebergSchemaCacheValue) schemaCacheValue.get();
+    }
+
+    public IcebergPartitionInfo loadPartitionInfo(long snapshotId) throws 
AnalysisException {
+        // snapshotId == UNKNOWN_SNAPSHOT_ID means this is an empty table, 
haven't contained any snapshot yet.
+        if (!isValidRelatedTable() || snapshotId == 
IcebergUtils.UNKNOWN_SNAPSHOT_ID) {
+            return new IcebergPartitionInfo();
+        }
+        List<IcebergPartition> icebergPartitions = 
loadIcebergPartition(snapshotId);
         Map<String, IcebergPartition> nameToPartition = Maps.newHashMap();
         Map<String, PartitionItem> nameToPartitionItem = Maps.newHashMap();
+        table = getIcebergTable();
+        partitionColumns = 
getIcebergSchemaCacheValue(table.snapshot(snapshotId).schemaId()).getPartitionColumns();
         for (IcebergPartition partition : icebergPartitions) {
             nameToPartition.put(partition.getPartitionName(), partition);
             String transform = 
table.specs().get(partition.getSpecId()).fields().get(0).transform().toString();
-            Range<PartitionKey> partitionRange = 
getPartitionRange(partition.getPartitionValues().get(0), transform);
+            Range<PartitionKey> partitionRange = getPartitionRange(
+                    partition.getPartitionValues().get(0), transform, 
partitionColumns);
             PartitionItem item = new RangePartitionItem(partitionRange);
             nameToPartitionItem.put(partition.getPartitionName(), item);
         }
@@ -314,11 +341,11 @@ public class IcebergExternalTable extends ExternalTable 
implements MTMVRelatedTa
         return new IcebergPartitionInfo(nameToPartitionItem, nameToPartition, 
partitionNameMap);
     }
 
-    public List<IcebergPartition> loadIcebergPartition() {
+    public List<IcebergPartition> loadIcebergPartition(long snapshotId) {
         PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils
                 .createMetadataTableInstance(table, 
MetadataTableType.PARTITIONS);
         List<IcebergPartition> partitions = Lists.newArrayList();
-        try (CloseableIterable<FileScanTask> tasks = 
partitionsTable.newScan().planFiles()) {
+        try (CloseableIterable<FileScanTask> tasks = 
partitionsTable.newScan().useSnapshot(snapshotId).planFiles()) {
             for (FileScanTask task : tasks) {
                 CloseableIterable<StructLike> rows = task.asDataTask().rows();
                 for (StructLike row : rows) {
@@ -344,6 +371,7 @@ public class IcebergExternalTable extends ExternalTable 
implements MTMVRelatedTa
         // 8. equality_delete_file_count,
         // 9. last_updated_at,
         // 10. last_updated_snapshot_id
+        table = getIcebergTable();
         Preconditions.checkState(!table.spec().fields().isEmpty(), 
table.name() + " is not a partition table.");
         int specId = row.get(1, Integer.class);
         PartitionSpec partitionSpec = table.specs().get(specId);
@@ -382,13 +410,14 @@ public class IcebergExternalTable extends ExternalTable 
implements MTMVRelatedTa
     }
 
     @VisibleForTesting
-    public Range<PartitionKey> getPartitionRange(String value, String 
transform)
+    public Range<PartitionKey> getPartitionRange(String value, String 
transform, List<Column> partitionColumns)
             throws AnalysisException {
-        // For NULL value, create a lessThan partition for it.
+        // For NULL value, create a minimum partition for it.
         if (value == null) {
-            PartitionKey nullKey = PartitionKey.createPartitionKey(
-                    Lists.newArrayList(new PartitionValue("0000-01-02")), 
partitionColumns);
-            return Range.lessThan(nullKey);
+            PartitionKey nullLowKey = PartitionKey.createPartitionKey(
+                    Lists.newArrayList(new PartitionValue("0000-01-01")), 
partitionColumns);
+            PartitionKey nullUpKey = nullLowKey.successor();
+            return Range.closedOpen(nullLowKey, nullUpKey);
         }
         LocalDateTime epoch = 
Instant.EPOCH.atZone(ZoneId.of("UTC")).toLocalDateTime();
         LocalDateTime target;
@@ -525,4 +554,12 @@ public class IcebergExternalTable extends ExternalTable 
implements MTMVRelatedTa
     public void setIsValidRelatedTableCached(boolean isCached) {
         this.isValidRelatedTableCached = isCached;
     }
+
+    private IcebergSnapshotCacheValue 
getOrFetchSnapshotCacheValue(Optional<MvccSnapshot> snapshot) {
+        if (snapshot.isPresent()) {
+            return ((IcebergMvccSnapshot) 
snapshot.get()).getSnapshotCacheValue();
+        } else {
+            return getIcebergSnapshotCacheValue();
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
index ad347ca78f2..e80a013cc92 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
@@ -18,6 +18,7 @@
 package org.apache.doris.datasource.iceberg;
 
 import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.CacheFactory;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.UserException;
@@ -49,6 +50,7 @@ public class IcebergMetadataCache {
 
     private final LoadingCache<IcebergMetadataCacheKey, List<Snapshot>> 
snapshotListCache;
     private final LoadingCache<IcebergMetadataCacheKey, Table> tableCache;
+    private final LoadingCache<IcebergMetadataCacheKey, 
IcebergSnapshotCacheValue> snapshotCache;
 
     public IcebergMetadataCache(ExecutorService executor) {
         CacheFactory snapshotListCacheFactory = new CacheFactory(
@@ -66,6 +68,14 @@ public class IcebergMetadataCache {
                 true,
                 null);
         this.tableCache = tableCacheFactory.buildCache(key -> loadTable(key), 
null, executor);
+
+        CacheFactory snapshotCacheFactory = new CacheFactory(
+                OptionalLong.of(28800L),
+                
OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60),
+                Config.max_external_table_cache_num,
+                true,
+                null);
+        this.snapshotCache = snapshotCacheFactory.buildCache(key -> 
loadSnapshot(key), null, executor);
     }
 
     public List<Snapshot> getSnapshotList(TIcebergMetadataParams params) 
throws UserException {
@@ -92,6 +102,11 @@ public class IcebergMetadataCache {
         return restTable;
     }
 
+    public IcebergSnapshotCacheValue getSnapshotCache(CatalogIf catalog, 
String dbName, String tbName) {
+        IcebergMetadataCacheKey key = IcebergMetadataCacheKey.of(catalog, 
dbName, tbName);
+        return snapshotCache.get(key);
+    }
+
     @NotNull
     private List<Snapshot> loadSnapshots(IcebergMetadataCacheKey key) {
         Table icebergTable = getIcebergTable(key.catalog, key.dbName, 
key.tableName);
@@ -114,6 +129,16 @@ public class IcebergMetadataCache {
             () -> ops.loadTable(key.dbName, key.tableName));
     }
 
+    @NotNull
+    private IcebergSnapshotCacheValue loadSnapshot(IcebergMetadataCacheKey 
key) throws AnalysisException {
+        IcebergExternalTable table = (IcebergExternalTable) 
key.catalog.getDbOrAnalysisException(key.dbName)
+                .getTableOrAnalysisException(key.tableName);
+        long snapshotId = table.getLatestSnapshotId();
+        long schemaId = table.getSchemaId(snapshotId);
+        IcebergPartitionInfo icebergPartitionInfo = 
table.loadPartitionInfo(snapshotId);
+        return new IcebergSnapshotCacheValue(icebergPartitionInfo, new 
IcebergSnapshot(snapshotId, schemaId));
+    }
+
     public void invalidateCatalogCache(long catalogId) {
         snapshotListCache.asMap().keySet().stream()
                 .filter(key -> key.catalog.getId() == catalogId)
@@ -125,6 +150,10 @@ public class IcebergMetadataCache {
                     ManifestFiles.dropCache(entry.getValue().io());
                     tableCache.invalidate(entry.getKey());
                 });
+
+        snapshotCache.asMap().keySet().stream()
+                .filter(key -> key.catalog.getId() == catalogId)
+                .forEach(snapshotCache::invalidate);
     }
 
     public void invalidateTableCache(long catalogId, String dbName, String 
tblName) {
@@ -143,6 +172,11 @@ public class IcebergMetadataCache {
                     ManifestFiles.dropCache(entry.getValue().io());
                     tableCache.invalidate(entry.getKey());
                 });
+
+        snapshotCache.asMap().keySet().stream()
+                .filter(key -> key.catalog.getId() == catalogId && 
key.dbName.equals(dbName) && key.tableName.equals(
+                    tblName))
+                .forEach(snapshotCache::invalidate);
     }
 
     public void invalidateDbCache(long catalogId, String dbName) {
@@ -159,6 +193,10 @@ public class IcebergMetadataCache {
                     ManifestFiles.dropCache(entry.getValue().io());
                     tableCache.invalidate(entry.getKey());
                 });
+
+        snapshotCache.asMap().keySet().stream()
+                .filter(key -> key.catalog.getId() == catalogId && 
key.dbName.equals(dbName))
+                .forEach(snapshotCache::invalidate);
     }
 
     private static void initIcebergTableFileIO(Table table, Map<String, 
String> props) {
@@ -212,10 +250,12 @@ public class IcebergMetadataCache {
 
     public Map<String, Map<String, String>> getCacheStats() {
         Map<String, Map<String, String>> res = Maps.newHashMap();
-        res.put("iceberg_snapshot_cache", 
ExternalMetaCacheMgr.getCacheStats(snapshotListCache.stats(),
+        res.put("iceberg_snapshot_list_cache", 
ExternalMetaCacheMgr.getCacheStats(snapshotListCache.stats(),
                 snapshotListCache.estimatedSize()));
         res.put("iceberg_table_cache", 
ExternalMetaCacheMgr.getCacheStats(tableCache.stats(),
                 tableCache.estimatedSize()));
+        res.put("iceberg_snapshot_cache", 
ExternalMetaCacheMgr.getCacheStats(snapshotCache.stats(),
+                snapshotCache.estimatedSize()));
         return res;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMvccSnapshot.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMvccSnapshot.java
new file mode 100644
index 00000000000..2c0155a71cd
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMvccSnapshot.java
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg;
+
+import org.apache.doris.datasource.mvcc.MvccSnapshot;
+
+public class IcebergMvccSnapshot implements MvccSnapshot {
+    private final IcebergSnapshotCacheValue snapshotCacheValue;
+
+    public IcebergMvccSnapshot(IcebergSnapshotCacheValue snapshotCacheValue) {
+        this.snapshotCacheValue = snapshotCacheValue;
+    }
+
+    public IcebergSnapshotCacheValue getSnapshotCacheValue() {
+        return snapshotCacheValue;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheKey.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheKey.java
new file mode 100644
index 00000000000..7931d91831f
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheKey.java
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg;
+
+import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
+
+import com.google.common.base.Objects;
+
+public class IcebergSchemaCacheKey extends SchemaCacheKey {
+    private final long schemaId;
+
+    public IcebergSchemaCacheKey(String dbName, String tableName, long 
schemaId) {
+        super(dbName, tableName);
+        this.schemaId = schemaId;
+    }
+
+    public long getSchemaId() {
+        return schemaId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof IcebergSchemaCacheKey)) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        IcebergSchemaCacheKey that = (IcebergSchemaCacheKey) o;
+        return schemaId == that.schemaId;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(super.hashCode(), schemaId);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheValue.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheValue.java
index e1fde8049fe..ccfcaab0c72 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheValue.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheValue.java
@@ -25,26 +25,13 @@ import java.util.List;
 public class IcebergSchemaCacheValue extends SchemaCacheValue {
 
     private final List<Column> partitionColumns;
-    private final IcebergPartitionInfo partitionInfo;
-    private final long snapshotId;
 
-    public IcebergSchemaCacheValue(List<Column> schema, List<Column> 
partitionColumns,
-                                  long snapshotId, IcebergPartitionInfo 
partitionInfo) {
+    public IcebergSchemaCacheValue(List<Column> schema, List<Column> 
partitionColumns) {
         super(schema);
         this.partitionColumns = partitionColumns;
-        this.snapshotId = snapshotId;
-        this.partitionInfo = partitionInfo;
     }
 
     public List<Column> getPartitionColumns() {
         return partitionColumns;
     }
-
-    public IcebergPartitionInfo getPartitionInfo() {
-        return partitionInfo;
-    }
-
-    public long getSnapshotId() {
-        return snapshotId;
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheValue.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSnapshot.java
similarity index 57%
copy from 
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheValue.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSnapshot.java
index e1fde8049fe..5903c362d74 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheValue.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSnapshot.java
@@ -17,34 +17,20 @@
 
 package org.apache.doris.datasource.iceberg;
 
-import org.apache.doris.catalog.Column;
-import org.apache.doris.datasource.SchemaCacheValue;
-
-import java.util.List;
-
-public class IcebergSchemaCacheValue extends SchemaCacheValue {
-
-    private final List<Column> partitionColumns;
-    private final IcebergPartitionInfo partitionInfo;
+public class IcebergSnapshot {
     private final long snapshotId;
+    private final long schemaId;
 
-    public IcebergSchemaCacheValue(List<Column> schema, List<Column> 
partitionColumns,
-                                  long snapshotId, IcebergPartitionInfo 
partitionInfo) {
-        super(schema);
-        this.partitionColumns = partitionColumns;
+    public IcebergSnapshot(long snapshotId, long schemaId) {
         this.snapshotId = snapshotId;
-        this.partitionInfo = partitionInfo;
-    }
-
-    public List<Column> getPartitionColumns() {
-        return partitionColumns;
-    }
-
-    public IcebergPartitionInfo getPartitionInfo() {
-        return partitionInfo;
+        this.schemaId = schemaId;
     }
 
     public long getSnapshotId() {
         return snapshotId;
     }
+
+    public long getSchemaId() {
+        return schemaId;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheValue.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSnapshotCacheValue.java
similarity index 60%
copy from 
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheValue.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSnapshotCacheValue.java
index e1fde8049fe..95c9a6f26cc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheValue.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSnapshotCacheValue.java
@@ -17,34 +17,21 @@
 
 package org.apache.doris.datasource.iceberg;
 
-import org.apache.doris.catalog.Column;
-import org.apache.doris.datasource.SchemaCacheValue;
+public class IcebergSnapshotCacheValue {
 
-import java.util.List;
-
-public class IcebergSchemaCacheValue extends SchemaCacheValue {
-
-    private final List<Column> partitionColumns;
     private final IcebergPartitionInfo partitionInfo;
-    private final long snapshotId;
+    private final IcebergSnapshot snapshot;
 
-    public IcebergSchemaCacheValue(List<Column> schema, List<Column> 
partitionColumns,
-                                  long snapshotId, IcebergPartitionInfo 
partitionInfo) {
-        super(schema);
-        this.partitionColumns = partitionColumns;
-        this.snapshotId = snapshotId;
+    public IcebergSnapshotCacheValue(IcebergPartitionInfo partitionInfo, 
IcebergSnapshot snapshot) {
         this.partitionInfo = partitionInfo;
-    }
-
-    public List<Column> getPartitionColumns() {
-        return partitionColumns;
+        this.snapshot = snapshot;
     }
 
     public IcebergPartitionInfo getPartitionInfo() {
         return partitionInfo;
     }
 
-    public long getSnapshotId() {
-        return snapshotId;
+    public IcebergSnapshot getSnapshot() {
+        return snapshot;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
index ba6d628e492..a7507fe031f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
@@ -52,6 +52,7 @@ import 
org.apache.doris.datasource.property.constants.HMSProperties;
 import org.apache.doris.nereids.exceptions.NotSupportedException;
 import org.apache.doris.thrift.TExprOpcode;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.FileFormat;
@@ -107,6 +108,8 @@ public class IcebergUtils {
     // nickname in spark
     public static final String SPARK_SQL_COMPRESSION_CODEC = 
"spark.sql.iceberg.compression-codec";
 
+    public static final long UNKNOWN_SNAPSHOT_ID = -1;
+
     public static Expression convertToIcebergExpr(Expr expr, Schema schema) {
         if (expr == null) {
             return null;
@@ -573,10 +576,17 @@ public class IcebergUtils {
     /**
      * Get iceberg schema from catalog and convert them to doris schema
      */
-    public static List<Column> getSchema(ExternalCatalog catalog, String 
dbName, String name) {
+    public static List<Column> getSchema(ExternalCatalog catalog, String 
dbName, String name, long schemaId) {
         return HiveMetaStoreClientHelper.ugiDoAs(catalog.getConfiguration(), 
() -> {
             org.apache.iceberg.Table icebergTable = getIcebergTable(catalog, 
dbName, name);
-            Schema schema = icebergTable.schema();
+            Schema schema;
+            if (schemaId == UNKNOWN_SNAPSHOT_ID || 
icebergTable.currentSnapshot() == null) {
+                schema = icebergTable.schema();
+            } else {
+                schema = icebergTable.schemas().get((int) schemaId);
+            }
+            Preconditions.checkNotNull(schema,
+                    "Schema for table " + catalog.getName() + "." + dbName + 
"." + name + " is null");
             List<Types.NestedField> columns = schema.columns();
             List<Column> tmpSchema = 
Lists.newArrayListWithCapacity(columns.size());
             for (Types.NestedField field : columns) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java
index 80d0a7c2429..3ba4804e522 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java
@@ -28,17 +28,21 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Range;
 import mockit.Expectations;
+import mockit.Mock;
+import mockit.MockUp;
 import mockit.Mocked;
 import mockit.Verifications;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.transforms.Days;
 import org.apache.iceberg.transforms.Hours;
 import org.apache.iceberg.transforms.Months;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -52,6 +56,16 @@ public class IcebergExternalTableTest {
                                               @Mocked Schema schema) {
         IcebergExternalTable table = new IcebergExternalTable(1, "1", "2", 
null);
         Map<Integer, PartitionSpec> specs = Maps.newHashMap();
+        new MockUp<IcebergExternalTable>() {
+            @Mock
+            private void makeSureInitialized() {
+            }
+
+            @Mock
+            public Table getIcebergTable() {
+                return icebergTable;
+            }
+        };
         // Test null
         specs.put(0, null);
         new Expectations() {{
@@ -139,34 +153,35 @@ public class IcebergExternalTableTest {
         table.setPartitionColumns(partitionColumns);
 
         // Test null partition value
-        Range<PartitionKey> nullRange = table.getPartitionRange(null, "hour");
-        Assertions.assertFalse(nullRange.hasLowerBound());
-        Assertions.assertEquals("0000-01-02 00:00:00",
+        Range<PartitionKey> nullRange = table.getPartitionRange(null, "hour", 
partitionColumns);
+        Assertions.assertEquals("0000-01-01 00:00:00",
+                
nullRange.lowerEndpoint().getPartitionValuesAsStringList().get(0));
+        Assertions.assertEquals("0000-01-01 00:00:01",
                 
nullRange.upperEndpoint().getPartitionValuesAsStringList().get(0));
 
         // Test hour transform.
-        Range<PartitionKey> hour = table.getPartitionRange("100", "hour");
+        Range<PartitionKey> hour = table.getPartitionRange("100", "hour", 
partitionColumns);
         PartitionKey lowKey = hour.lowerEndpoint();
         PartitionKey upKey = hour.upperEndpoint();
         Assertions.assertEquals("1970-01-05 04:00:00", 
lowKey.getPartitionValuesAsStringList().get(0));
         Assertions.assertEquals("1970-01-05 05:00:00", 
upKey.getPartitionValuesAsStringList().get(0));
 
         // Test day transform.
-        Range<PartitionKey> day = table.getPartitionRange("100", "day");
+        Range<PartitionKey> day = table.getPartitionRange("100", "day", 
partitionColumns);
         lowKey = day.lowerEndpoint();
         upKey = day.upperEndpoint();
         Assertions.assertEquals("1970-04-11 00:00:00", 
lowKey.getPartitionValuesAsStringList().get(0));
         Assertions.assertEquals("1970-04-12 00:00:00", 
upKey.getPartitionValuesAsStringList().get(0));
 
         // Test month transform.
-        Range<PartitionKey> month = table.getPartitionRange("100", "month");
+        Range<PartitionKey> month = table.getPartitionRange("100", "month", 
partitionColumns);
         lowKey = month.lowerEndpoint();
         upKey = month.upperEndpoint();
         Assertions.assertEquals("1978-05-01 00:00:00", 
lowKey.getPartitionValuesAsStringList().get(0));
         Assertions.assertEquals("1978-06-01 00:00:00", 
upKey.getPartitionValuesAsStringList().get(0));
 
         // Test year transform.
-        Range<PartitionKey> year = table.getPartitionRange("100", "year");
+        Range<PartitionKey> year = table.getPartitionRange("100", "year", 
partitionColumns);
         lowKey = year.lowerEndpoint();
         upKey = year.upperEndpoint();
         Assertions.assertEquals("2070-01-01 00:00:00", 
lowKey.getPartitionValuesAsStringList().get(0));
@@ -174,7 +189,7 @@ public class IcebergExternalTableTest {
 
         // Test unsupported transform
         Exception exception = Assertions.assertThrows(RuntimeException.class, 
() -> {
-            table.getPartitionRange("100", "bucket");
+            table.getPartitionRange("100", "bucket", partitionColumns);
         });
         Assertions.assertEquals("Unsupported transform bucket", 
exception.getMessage());
     }
@@ -183,15 +198,16 @@ public class IcebergExternalTableTest {
     public void testSortRange() throws AnalysisException {
         IcebergExternalTable table = new IcebergExternalTable(1, "1", "2", 
null);
         Column c = new Column("c", PrimitiveType.DATETIMEV2);
+        ArrayList<Column> columns = Lists.newArrayList(c);
         table.setPartitionColumns(Lists.newArrayList(c));
-        PartitionItem nullRange = new 
RangePartitionItem(table.getPartitionRange(null, "hour"));
-        PartitionItem year1970 = new 
RangePartitionItem(table.getPartitionRange("0", "year"));
-        PartitionItem year1971 = new 
RangePartitionItem(table.getPartitionRange("1", "year"));
-        PartitionItem month197002 = new 
RangePartitionItem(table.getPartitionRange("1", "month"));
-        PartitionItem month197103 = new 
RangePartitionItem(table.getPartitionRange("14", "month"));
-        PartitionItem month197204 = new 
RangePartitionItem(table.getPartitionRange("27", "month"));
-        PartitionItem day19700202 = new 
RangePartitionItem(table.getPartitionRange("32", "day"));
-        PartitionItem day19730101 = new 
RangePartitionItem(table.getPartitionRange("1096", "day"));
+        PartitionItem nullRange = new 
RangePartitionItem(table.getPartitionRange(null, "hour", columns));
+        PartitionItem year1970 = new 
RangePartitionItem(table.getPartitionRange("0", "year", columns));
+        PartitionItem year1971 = new 
RangePartitionItem(table.getPartitionRange("1", "year", columns));
+        PartitionItem month197002 = new 
RangePartitionItem(table.getPartitionRange("1", "month", columns));
+        PartitionItem month197103 = new 
RangePartitionItem(table.getPartitionRange("14", "month", columns));
+        PartitionItem month197204 = new 
RangePartitionItem(table.getPartitionRange("27", "month", columns));
+        PartitionItem day19700202 = new 
RangePartitionItem(table.getPartitionRange("32", "day", columns));
+        PartitionItem day19730101 = new 
RangePartitionItem(table.getPartitionRange("1096", "day", columns));
         Map<String, PartitionItem> map = Maps.newHashMap();
         map.put("nullRange", nullRange);
         map.put("year1970", year1970);
diff --git a/regression-test/data/mtmv_p0/test_iceberg_mtmv.out 
b/regression-test/data/mtmv_p0/test_iceberg_mtmv.out
index c9d9799da81..483ac0957e6 100644
--- a/regression-test/data/mtmv_p0/test_iceberg_mtmv.out
+++ b/regression-test/data/mtmv_p0/test_iceberg_mtmv.out
@@ -103,3 +103,18 @@
 2024-09-30     6
 2024-10-28     7
 
+-- !refresh_one_partition --
+2024-01-01T00:00       4
+
+-- !refresh_one_partition_rewrite --
+2024-01-01T00:00       4
+2024-01-02T00:00       3
+
+-- !refresh_auto --
+2024-01-01T00:00       4
+2024-01-02T00:00       3
+
+-- !refresh_all_partition_rewrite --
+2024-01-01T00:00       4
+2024-01-02T00:00       3
+
diff --git a/regression-test/suites/mtmv_p0/test_iceberg_mtmv.groovy 
b/regression-test/suites/mtmv_p0/test_iceberg_mtmv.groovy
index 59cf1173acb..aee80d8d169 100644
--- a/regression-test/suites/mtmv_p0/test_iceberg_mtmv.groovy
+++ b/regression-test/suites/mtmv_p0/test_iceberg_mtmv.groovy
@@ -83,6 +83,7 @@ suite("test_iceberg_mtmv", 
"p0,external,iceberg,external_docker,external_docker_
         String icebergDb = "iceberg_mtmv_partition"
         String icebergTable1 = "tstable"
         String icebergTable2 = "dtable"
+        String icebergTable3 = "union_test"
         sql """drop catalog if exists ${catalog_name} """
         sql """create catalog if not exists ${catalog_name} properties (
             'type'='iceberg',
@@ -210,6 +211,61 @@ suite("test_iceberg_mtmv", 
"p0,external,iceberg,external_docker,external_docker_
         sql """drop materialized view if exists ${mvName2};"""
         sql """drop table if exists 
${catalog_name}.${icebergDb}.${icebergTable2}"""
 
+        // Test rewrite and union partitions
+        sql """set 
materialized_view_rewrite_enable_contain_external_table=true;"""
+        String mvSql = "SELECT par,count(*) as num FROM 
${catalog_name}.${icebergDb}.${icebergTable3} group by par"
+        String mvName = "union_mv"
+        sql """drop table if exists 
${catalog_name}.${icebergDb}.${icebergTable3}"""
+        sql """
+            CREATE TABLE ${catalog_name}.${icebergDb}.${icebergTable3} (
+              id int,
+              value int,
+              par datetime
+            ) ENGINE=iceberg
+            PARTITION BY LIST (day(par)) ();
+        """
+        sql """insert into ${catalog_name}.${icebergDb}.${icebergTable3} 
values (1, 1, "2024-01-01"), (2, 1, "2024-01-01"), (3, 1, "2024-01-01"), (4, 1, 
"2024-01-01")"""
+        sql """insert into ${catalog_name}.${icebergDb}.${icebergTable3} 
values (1, 2, "2024-01-02"), (2, 2, "2024-01-02"), (3, 2, "2024-01-02")"""
+        sql """analyze table ${catalog_name}.${icebergDb}.${icebergTable3} 
with sync"""
+
+        sql """drop materialized view if exists ${mvName};"""
+        sql """
+            CREATE MATERIALIZED VIEW ${mvName}
+                BUILD DEFERRED REFRESH AUTO ON MANUAL
+                partition by(`par`)
+                DISTRIBUTED BY RANDOM BUCKETS 2
+                PROPERTIES ('replication_num' = '1')
+                AS ${mvSql}
+        """
+
+        def showPartitions = sql """show partitions from ${mvName}"""
+        logger.info("showPartitions: " + showPartitions.toString())
+        
assertTrue(showPartitions.toString().contains("p_20240101000000_20240102000000"))
+        
assertTrue(showPartitions.toString().contains("p_20240102000000_20240103000000"))
+
+        // refresh one partiton
+        sql """REFRESH MATERIALIZED VIEW ${mvName} 
partitions(p_20240101000000_20240102000000);"""
+        waitingMTMVTaskFinishedByMvName(mvName)
+        order_qt_refresh_one_partition "SELECT * FROM ${mvName} "
+        def explainOnePartition = sql """ explain  ${mvSql} """
+        logger.info("explainOnePartition: " + explainOnePartition.toString())
+        assertTrue(explainOnePartition.toString().contains("VUNION"))
+        order_qt_refresh_one_partition_rewrite "${mvSql}"
+        mv_rewrite_success("${mvSql}", "${mvName}")
+
+        //refresh auto
+        sql """REFRESH MATERIALIZED VIEW ${mvName} auto"""
+        waitingMTMVTaskFinishedByMvName(mvName)
+        order_qt_refresh_auto "SELECT * FROM ${mvName} "
+        def explainAllPartition = sql """ explain  ${mvSql}; """
+        logger.info("explainAllPartition: " + explainAllPartition.toString())
+        assertTrue(explainAllPartition.toString().contains("VOlapScanNode"))
+        order_qt_refresh_all_partition_rewrite "${mvSql}"
+        mv_rewrite_success("${mvSql}", "${mvName}")
+
+        sql """drop materialized view if exists ${mvName};"""
+        sql """drop table if exists 
${catalog_name}.${icebergDb}.${icebergTable3}"""
+
         sql """ drop catalog if exists ${catalog_name} """
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to