This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new faff952bf76 branch-3.1: [fix](iceberg)Use the correct schema for query 
(#50376)  (#52219)
faff952bf76 is described below

commit faff952bf761b8dcfe793eab4c4f19f315b69e15
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Tue Jul 1 11:38:32 2025 +0800

    branch-3.1: [fix](iceberg)Use the correct schema for query (#50376)  
(#52219)
    
    bp #50376
    
    Co-authored-by: wuwenchi <[email protected]>
---
 .../create_preinstalled_scripts/iceberg/run09.sql  |  37 ++
 .../java/org/apache/doris/catalog/TableIf.java     |  11 -
 .../apache/doris/datasource/ExternalCatalog.java   |   2 +
 .../doris/datasource/ExternalSchemaCache.java      |   8 +-
 .../apache/doris/datasource/FileQueryScanNode.java |   9 +-
 .../org/apache/doris/datasource/FileScanNode.java  |   2 +-
 .../apache/doris/datasource/hive/HMSDlaTable.java  |   9 +
 .../doris/datasource/hive/HMSExternalCatalog.java  |   6 +
 .../doris/datasource/hive/HMSExternalTable.java    |  16 +-
 .../doris/datasource/hive/IcebergDlaTable.java     | 147 ++++++++
 .../datasource/iceberg/IcebergExternalCatalog.java |   1 -
 .../datasource/iceberg/IcebergExternalTable.java   | 352 ++-----------------
 .../datasource/iceberg/IcebergMetadataCache.java   |  17 +-
 .../datasource/iceberg/IcebergPartitionInfo.java   |   6 +
 .../doris/datasource/iceberg/IcebergUtils.java     | 391 +++++++++++++++++++--
 .../datasource/iceberg/source/IcebergScanNode.java |  14 +-
 .../java/org/apache/doris/planner/ScanNode.java    |  10 +-
 .../iceberg/IcebergExternalTableTest.java          | 114 ++++++
 .../test_iceberg_schema_change_with_timetravel.out | Bin 0 -> 691 bytes
 ...st_iceberg_schema_change_with_timetravel.groovy |  87 +++++
 20 files changed, 844 insertions(+), 395 deletions(-)

diff --git 
a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run09.sql
 
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run09.sql
new file mode 100644
index 00000000000..c5795883c4d
--- /dev/null
+++ 
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run09.sql
@@ -0,0 +1,37 @@
+use demo.test_db;
+
+create table schema_change_with_time_travel (c1 int);
+insert into schema_change_with_time_travel values (1);
+
+alter table schema_change_with_time_travel add column c2 int;
+insert into schema_change_with_time_travel values (2,3);
+
+alter table schema_change_with_time_travel add column c3 int; 
+insert into schema_change_with_time_travel values (4,5,6);
+
+alter table schema_change_with_time_travel drop column c2;
+insert into schema_change_with_time_travel values (7,8);
+
+alter table schema_change_with_time_travel add column c2 int;
+insert into schema_change_with_time_travel values (9,10,11);
+
+alter table schema_change_with_time_travel add column c4 int;
+
+
+create table schema_change_with_time_travel_orc (c1 int) tblproperties 
("write.format.default"="orc");
+insert into schema_change_with_time_travel_orc values (1);
+
+alter table schema_change_with_time_travel_orc add column c2 int;
+insert into schema_change_with_time_travel_orc values (2,3);
+
+alter table schema_change_with_time_travel_orc add column c3 int; 
+insert into schema_change_with_time_travel_orc values (4,5,6);
+
+alter table schema_change_with_time_travel_orc drop column c2;
+insert into schema_change_with_time_travel_orc values (7,8);
+
+alter table schema_change_with_time_travel_orc add column c2 int;
+insert into schema_change_with_time_travel_orc values (9,10,11);
+
+alter table schema_change_with_time_travel_orc add column c4 int;
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
index 78fbc77698d..7d95d959ba6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
@@ -143,17 +143,6 @@ public interface TableIf {
 
     Column getColumn(String name);
 
-    default int getBaseColumnIdxByName(String colName) {
-        int i = 0;
-        for (Column col : getBaseSchema()) {
-            if (col.getName().equalsIgnoreCase(colName)) {
-                return i;
-            }
-            ++i;
-        }
-        return -1;
-    }
-
     String getMysqlType();
 
     String getEngine();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 25b74bb6eff..5c2857cc0b6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -133,6 +133,8 @@ public abstract class ExternalCatalog
             CREATE_TIME,
             USE_META_CACHE);
 
+    protected static final int ICEBERG_CATALOG_EXECUTOR_THREAD_NUM = 
Runtime.getRuntime().availableProcessors();
+
     // Unique id of this catalog, will be assigned after catalog is loaded.
     @SerializedName(value = "id")
     protected long id;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
index 0fb66c4e3f9..da9a25d71be 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
@@ -100,11 +100,9 @@ public class ExternalSchemaCache {
     }
 
     public void invalidateTableCache(String dbName, String tblName) {
-        SchemaCacheKey key = new SchemaCacheKey(dbName, tblName);
-        schemaCache.invalidate(key);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("invalid schema cache for {}.{} in catalog {}", dbName, 
tblName, catalog.getName());
-        }
+        schemaCache.asMap().keySet().stream()
+            .filter(key -> key.dbName.equals(dbName) && 
key.tblName.equals(tblName))
+            .forEach(schemaCache::invalidate);
     }
 
     public void invalidateDbCache(String dbName) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index 56fecb24afc..fc1058105f1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -252,7 +252,14 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
             }
             SlotDescriptor slotDesc = desc.getSlot(slot.getSlotId());
             String colName = slotDesc.getColumn().getName();
-            int idx = tbl.getBaseColumnIdxByName(colName);
+            int idx = -1;
+            List<Column> columns = getColumns();
+            for (int i = 0; i < columns.size(); i++) {
+                if (columns.get(i).getName().equals(colName)) {
+                    idx = i;
+                    break;
+                }
+            }
             if (idx == -1) {
                 throw new UserException("Column " + colName + " not found in 
table " + tbl.getName());
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
index 8d3aeaa6a26..d2c6230ba6b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
@@ -193,7 +193,7 @@ public abstract class FileScanNode extends ExternalScanNode 
{
         TExpr tExpr = new TExpr();
         tExpr.setNodes(Lists.newArrayList());
 
-        for (Column column : tbl.getBaseSchema()) {
+        for (Column column : getColumns()) {
             Expr expr;
             if (column.getDefaultValue() != null) {
                 if (column.getDefaultValueExprDef() != null) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java
index 7894279b295..034ce1ae443 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java
@@ -73,4 +73,13 @@ public abstract class HMSDlaTable implements MTMVBaseTableIf 
{
     @Override
     public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
     }
+
+    /**
+     * If the table is supported as related table.
+     * For example, an Iceberg table may become unsupported after partition 
revolution.
+     * @return
+     */
+    protected boolean isValidRelatedTable() {
+        return true;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
index f232c8a1a76..90636495335 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
@@ -180,6 +180,12 @@ public class HMSExternalCatalog extends ExternalCatalog {
                     
String.valueOf(Config.hive_metastore_client_timeout_second));
         }
         HiveMetadataOps hiveOps = 
ExternalMetadataOperations.newHiveMetadataOps(hiveConf, jdbcClientConfig, this);
+        threadPoolWithPreAuth = 
ThreadPoolManager.newDaemonFixedThreadPoolWithPreAuth(
+            ICEBERG_CATALOG_EXECUTOR_THREAD_NUM,
+            Integer.MAX_VALUE,
+            String.format("hms_iceberg_catalog_%s_executor_pool", name),
+            true,
+            preExecutionAuthenticator);
         FileSystemProvider fileSystemProvider = new 
FileSystemProviderImpl(Env.getCurrentEnv().getExtMetaCacheMgr(),
                 this.bindBrokerName(), 
this.catalogProperty.getHadoopProperties());
         this.fileSystemExecutor = 
ThreadPoolManager.newDaemonFixedThreadPool(FILE_SYSTEM_EXECUTOR_THREAD_NUM,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index 2a29937653d..fde8ddf3b4d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -40,6 +40,7 @@ import org.apache.doris.datasource.hudi.HudiMvccSnapshot;
 import org.apache.doris.datasource.hudi.HudiSchemaCacheKey;
 import org.apache.doris.datasource.hudi.HudiSchemaCacheValue;
 import org.apache.doris.datasource.hudi.HudiUtils;
+import org.apache.doris.datasource.iceberg.IcebergMvccSnapshot;
 import org.apache.doris.datasource.iceberg.IcebergSchemaCacheKey;
 import org.apache.doris.datasource.iceberg.IcebergUtils;
 import org.apache.doris.datasource.mvcc.EmptyMvccSnapshot;
@@ -209,7 +210,7 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
             } else {
                 if (supportedIcebergTable()) {
                     dlaType = DLAType.ICEBERG;
-                    dlaTable = new HiveDlaTable(this);
+                    dlaTable = new IcebergDlaTable(this);
                 } else if (supportedHoodieTable()) {
                     dlaType = DLAType.HUDI;
                     dlaTable = new HudiDlaTable(this);
@@ -313,6 +314,8 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
         if (getDlaType() == DLAType.HUDI) {
             return ((HudiDlaTable) 
dlaTable).getHudiSchemaCacheValue(MvccUtil.getSnapshotFromContext(this))
                     .getSchema();
+        } else if (getDlaType() == DLAType.ICEBERG) {
+            return IcebergUtils.getIcebergSchema(this, getCatalog(), 
getDbName(), getName());
         }
         Optional<SchemaCacheValue> schemaCacheValue = 
cache.getSchemaValue(dbName, name);
         return schemaCacheValue.map(SchemaCacheValue::getSchema).orElse(null);
@@ -1047,8 +1050,12 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
     public MvccSnapshot loadSnapshot(Optional<TableSnapshot> tableSnapshot) {
         if (getDlaType() == DLAType.HUDI) {
             return new 
HudiMvccSnapshot(HudiUtils.getPartitionValues(tableSnapshot, this));
+        } else if (getDlaType() == DLAType.ICEBERG) {
+            return new IcebergMvccSnapshot(
+                    IcebergUtils.getIcebergSnapshotCacheValue(tableSnapshot, 
getCatalog(), getDbName(), getName()));
+        } else {
+            return new EmptyMvccSnapshot();
         }
-        return new EmptyMvccSnapshot();
     }
 
     public boolean firstColumnIsString() {
@@ -1084,4 +1091,9 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
                 return Lists.newArrayList();
         }
     }
+
+    public boolean isValidRelatedTable() {
+        makeSureInitialized();
+        return dlaTable.isValidRelatedTable();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/IcebergDlaTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/IcebergDlaTable.java
new file mode 100644
index 00000000000..36b871282a9
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/IcebergDlaTable.java
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.datasource.iceberg.IcebergSchemaCacheValue;
+import org.apache.doris.datasource.iceberg.IcebergSnapshotCacheValue;
+import org.apache.doris.datasource.iceberg.IcebergUtils;
+import org.apache.doris.datasource.mvcc.MvccSnapshot;
+import org.apache.doris.mtmv.MTMVRefreshContext;
+import org.apache.doris.mtmv.MTMVSnapshotIdSnapshot;
+import org.apache.doris.mtmv.MTMVSnapshotIf;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class IcebergDlaTable extends HMSDlaTable {
+
+    private boolean isValidRelatedTableCached = false;
+    private boolean isValidRelatedTable = false;
+
+    public IcebergDlaTable(HMSExternalTable table) {
+        super(table);
+    }
+
+    @Override
+    public Map<String, PartitionItem> 
getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
+        return Maps.newHashMap(
+            IcebergUtils.getOrFetchSnapshotCacheValue(
+                    snapshot, hmsTable.getCatalog(), hmsTable.getDbName(), 
hmsTable.getName())
+                .getPartitionInfo().getNameToPartitionItem());
+    }
+
+    @Override
+    public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) {
+        return isValidRelatedTable() ? PartitionType.RANGE : 
PartitionType.UNPARTITIONED;
+    }
+
+    @Override
+    public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> 
snapshot) {
+        return 
getPartitionColumns(snapshot).stream().map(Column::getName).collect(Collectors.toSet());
+    }
+
+    @Override
+    public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
+        IcebergSnapshotCacheValue snapshotValue =
+                IcebergUtils.getOrFetchSnapshotCacheValue(
+                    snapshot, hmsTable.getCatalog(), hmsTable.getDbName(), 
hmsTable.getName());
+        IcebergSchemaCacheValue schemaValue = IcebergUtils.getSchemaCacheValue(
+                hmsTable.getCatalog(), hmsTable.getDbName(), 
hmsTable.getName(),
+                snapshotValue.getSnapshot().getSchemaId());
+        return schemaValue.getPartitionColumns();
+    }
+
+    @Override
+    public MTMVSnapshotIf getPartitionSnapshot(String partitionName, 
MTMVRefreshContext context,
+                                               Optional<MvccSnapshot> 
snapshot) throws AnalysisException {
+        IcebergSnapshotCacheValue snapshotValue =
+                IcebergUtils.getOrFetchSnapshotCacheValue(
+                        snapshot, hmsTable.getCatalog(), hmsTable.getDbName(), 
hmsTable.getName());
+        long latestSnapshotId = 
snapshotValue.getPartitionInfo().getLatestSnapshotId(partitionName);
+        if (latestSnapshotId <= 0) {
+            throw new AnalysisException("can not find partition: " + 
partitionName);
+        }
+        return new MTMVSnapshotIdSnapshot(latestSnapshotId);
+    }
+
+    @Override
+    public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, 
Optional<MvccSnapshot> snapshot)
+            throws AnalysisException {
+        hmsTable.makeSureInitialized();
+        IcebergSnapshotCacheValue snapshotValue =
+                IcebergUtils.getOrFetchSnapshotCacheValue(
+                        snapshot, hmsTable.getCatalog(), hmsTable.getDbName(), 
hmsTable.getName());
+        return new 
MTMVSnapshotIdSnapshot(snapshotValue.getSnapshot().getSnapshotId());
+    }
+
+    @Override
+    boolean isPartitionColumnAllowNull() {
+        return true;
+    }
+
+    @Override
+    protected boolean isValidRelatedTable() {
+        if (isValidRelatedTableCached) {
+            return isValidRelatedTable;
+        }
+        isValidRelatedTable = false;
+        Set<String> allFields = Sets.newHashSet();
+        Table table = IcebergUtils.getIcebergTable(
+                hmsTable.getCatalog(),
+                hmsTable.getDbName(),
+                hmsTable.getName()
+        );
+        for (PartitionSpec spec : table.specs().values()) {
+            if (spec == null) {
+                isValidRelatedTableCached = true;
+                return false;
+            }
+            List<PartitionField> fields = spec.fields();
+            if (fields.size() != 1) {
+                isValidRelatedTableCached = true;
+                return false;
+            }
+            PartitionField partitionField = spec.fields().get(0);
+            String transformName = partitionField.transform().toString();
+            if (!IcebergUtils.YEAR.equals(transformName)
+                    && !IcebergUtils.MONTH.equals(transformName)
+                    && !IcebergUtils.DAY.equals(transformName)
+                    && !IcebergUtils.HOUR.equals(transformName)) {
+                isValidRelatedTableCached = true;
+                return false;
+            }
+            
allFields.add(table.schema().findColumnName(partitionField.sourceId()));
+        }
+        isValidRelatedTableCached = true;
+        isValidRelatedTable = allFields.size() == 1;
+        return isValidRelatedTable;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index b695a268b0f..41149370097 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -45,7 +45,6 @@ public abstract class IcebergExternalCatalog extends 
ExternalCatalog {
     public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name";
     protected String icebergCatalogType;
     protected Catalog catalog;
-    private static final int ICEBERG_CATALOG_EXECUTOR_THREAD_NUM = 16;
 
     public IcebergExternalCatalog(long catalogId, String name, String comment) 
{
         super(catalogId, name, InitCatalogLog.Type.ICEBERG, comment);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
index e9bffbf56e3..6990e8a7449 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
@@ -17,25 +17,18 @@
 
 package org.apache.doris.datasource.iceberg;
 
-import org.apache.doris.analysis.PartitionValue;
 import org.apache.doris.analysis.TableSnapshot;
 import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MTMV;
 import org.apache.doris.catalog.PartitionItem;
-import org.apache.doris.catalog.PartitionKey;
 import org.apache.doris.catalog.PartitionType;
-import org.apache.doris.catalog.RangePartitionItem;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
-import org.apache.doris.datasource.CacheException;
-import org.apache.doris.datasource.ExternalSchemaCache;
 import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
 import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.SchemaCacheValue;
 import org.apache.doris.datasource.mvcc.MvccSnapshot;
 import org.apache.doris.datasource.mvcc.MvccTable;
-import org.apache.doris.datasource.mvcc.MvccUtil;
 import org.apache.doris.datasource.systable.SupportedSysTables;
 import org.apache.doris.datasource.systable.SysTable;
 import org.apache.doris.mtmv.MTMVBaseTableIf;
@@ -52,32 +45,12 @@ import org.apache.doris.thrift.TTableDescriptor;
 import org.apache.doris.thrift.TTableType;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
-import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.MetadataTableType;
-import org.apache.iceberg.MetadataTableUtils;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.PartitionsTable;
-import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.StructProjection;
-
-import java.io.IOException;
-import java.time.Instant;
-import java.time.LocalDateTime;
-import java.time.Month;
-import java.time.ZoneId;
-import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.Comparator;
+
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -87,15 +60,7 @@ import java.util.stream.Collectors;
 
 public class IcebergExternalTable extends ExternalTable implements 
MTMVRelatedTableIf, MTMVBaseTableIf, MvccTable {
 
-    public static final String YEAR = "year";
-    public static final String MONTH = "month";
-    public static final String DAY = "day";
-    public static final String HOUR = "hour";
-    public static final String IDENTITY = "identity";
-    public static final int PARTITION_DATA_ID_START = 1000; // 
org.apache.iceberg.PartitionSpec
-
     private Table table;
-    private List<Column> partitionColumns;
     private boolean isValidRelatedTableCached = false;
     private boolean isValidRelatedTable = false;
 
@@ -120,29 +85,9 @@ public class IcebergExternalTable extends ExternalTable 
implements MTMVRelatedTa
         this.table = table;
     }
 
-    @VisibleForTesting
-    public void setPartitionColumns(List<Column> partitionColumns) {
-        this.partitionColumns = partitionColumns;
-    }
-
     @Override
     public Optional<SchemaCacheValue> initSchema(SchemaCacheKey key) {
-        table = getIcebergTable();
-        List<Column> schema = IcebergUtils.getSchema(catalog, dbName, name,
-                ((IcebergSchemaCacheKey) key).getSchemaId());
-        List<Column> tmpColumns = Lists.newArrayList();
-        PartitionSpec spec = table.spec();
-        for (PartitionField field : spec.fields()) {
-            Types.NestedField col = table.schema().findField(field.sourceId());
-            for (Column c : schema) {
-                if (c.getName().equalsIgnoreCase(col.name())) {
-                    tmpColumns.add(c);
-                    break;
-                }
-            }
-        }
-        partitionColumns = tmpColumns;
-        return Optional.of(new IcebergSchemaCacheValue(schema, 
partitionColumns));
+        return IcebergUtils.loadSchemaCacheValue(catalog, dbName, name, 
((IcebergSchemaCacheKey) key).getSchemaId());
     }
 
     @Override
@@ -180,23 +125,21 @@ public class IcebergExternalTable extends ExternalTable 
implements MTMVRelatedTa
         return IcebergUtils.getIcebergTable(getCatalog(), getDbName(), 
getName());
     }
 
-    private IcebergSnapshotCacheValue getIcebergSnapshotCacheValue() {
-        return 
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache()
-            .getSnapshotCache(catalog, dbName, name);
-    }
-
     @Override
     public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
     }
 
     @Override
     public Map<String, PartitionItem> 
getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
-        return 
Maps.newHashMap(getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartitionItem());
+        return Maps.newHashMap(
+            IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, getCatalog(), 
getDbName(), getName())
+                .getPartitionInfo().getNameToPartitionItem());
     }
 
     @Override
     public Map<String, PartitionItem> 
getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
-        return 
getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartitionItem();
+        return IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, 
getCatalog(), getDbName(), getName())
+            .getPartitionInfo().getNameToPartitionItem();
     }
 
     @Override
@@ -211,15 +154,18 @@ public class IcebergExternalTable extends ExternalTable 
implements MTMVRelatedTa
 
     @Override
     public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
-        IcebergSnapshotCacheValue snapshotValue = 
getOrFetchSnapshotCacheValue(snapshot);
-        IcebergSchemaCacheValue schemaValue = 
getIcebergSchemaCacheValue(snapshotValue.getSnapshot().getSchemaId());
+        IcebergSnapshotCacheValue snapshotValue =
+                IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, 
getCatalog(), getDbName(), getName());
+        IcebergSchemaCacheValue schemaValue = IcebergUtils.getSchemaCacheValue(
+                catalog, getDbName(), getName(), 
snapshotValue.getSnapshot().getSchemaId());
         return schemaValue.getPartitionColumns();
     }
 
     @Override
     public MTMVSnapshotIf getPartitionSnapshot(String partitionName, 
MTMVRefreshContext context,
                                                Optional<MvccSnapshot> 
snapshot) throws AnalysisException {
-        IcebergSnapshotCacheValue snapshotValue = 
getOrFetchSnapshotCacheValue(snapshot);
+        IcebergSnapshotCacheValue snapshotValue =
+                IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, 
getCatalog(), getDbName(), getName());
         long latestSnapshotId = 
snapshotValue.getPartitionInfo().getLatestSnapshotId(partitionName);
         if (latestSnapshotId <= 0) {
             throw new AnalysisException("can not find partition: " + 
partitionName);
@@ -231,7 +177,8 @@ public class IcebergExternalTable extends ExternalTable 
implements MTMVRelatedTa
     public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, 
Optional<MvccSnapshot> snapshot)
             throws AnalysisException {
         makeSureInitialized();
-        IcebergSnapshotCacheValue snapshotValue = 
getOrFetchSnapshotCacheValue(snapshot);
+        IcebergSnapshotCacheValue snapshotValue =
+                IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, 
getCatalog(), getDbName(), getName());
         return new 
MTMVSnapshotIdSnapshot(snapshotValue.getSnapshot().getSnapshotId());
     }
 
@@ -266,10 +213,10 @@ public class IcebergExternalTable extends ExternalTable 
implements MTMVRelatedTa
             }
             PartitionField partitionField = spec.fields().get(0);
             String transformName = partitionField.transform().toString();
-            if (!YEAR.equals(transformName)
-                    && !MONTH.equals(transformName)
-                    && !DAY.equals(transformName)
-                    && !HOUR.equals(transformName)) {
+            if (!IcebergUtils.YEAR.equals(transformName)
+                    && !IcebergUtils.MONTH.equals(transformName)
+                    && !IcebergUtils.DAY.equals(transformName)
+                    && !IcebergUtils.HOUR.equals(transformName)) {
                 isValidRelatedTableCached = true;
                 return false;
             }
@@ -282,27 +229,13 @@ public class IcebergExternalTable extends ExternalTable 
implements MTMVRelatedTa
 
     @Override
     public MvccSnapshot loadSnapshot(Optional<TableSnapshot> tableSnapshot) {
-        return new IcebergMvccSnapshot(getIcebergSnapshotCacheValue());
-    }
-
-    public long getLatestSnapshotId() {
-        table = getIcebergTable();
-        Snapshot snapshot = table.currentSnapshot();
-        return snapshot == null ? IcebergUtils.UNKNOWN_SNAPSHOT_ID : 
table.currentSnapshot().snapshotId();
-    }
-
-    public long getSchemaId(long snapshotId) {
-        table = getIcebergTable();
-        return snapshotId == IcebergUtils.UNKNOWN_SNAPSHOT_ID
-                ? IcebergUtils.UNKNOWN_SNAPSHOT_ID
-                : table.snapshot(snapshotId).schemaId();
+        return new 
IcebergMvccSnapshot(IcebergUtils.getIcebergSnapshotCacheValue(
+                tableSnapshot, getCatalog(), getDbName(), getName()));
     }
 
     @Override
     public List<Column> getFullSchema() {
-        Optional<MvccSnapshot> snapshotFromContext = 
MvccUtil.getSnapshotFromContext(this);
-        IcebergSnapshotCacheValue cacheValue = 
getOrFetchSnapshotCacheValue(snapshotFromContext);
-        return 
getIcebergSchemaCacheValue(cacheValue.getSnapshot().getSchemaId()).getSchema();
+        return IcebergUtils.getIcebergSchema(this, getCatalog(), getDbName(), 
getName());
     }
 
     @Override
@@ -310,239 +243,6 @@ public class IcebergExternalTable extends ExternalTable 
implements MTMVRelatedTa
         return true;
     }
 
-    public IcebergSchemaCacheValue getIcebergSchemaCacheValue(long schemaId) {
-        ExternalSchemaCache cache = 
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog);
-        Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue(
-            new IcebergSchemaCacheKey(dbName, name, schemaId));
-        if (!schemaCacheValue.isPresent()) {
-            throw new CacheException("failed to getSchema for: %s.%s.%s.%s",
-                null, catalog.getName(), dbName, name, schemaId);
-        }
-        return (IcebergSchemaCacheValue) schemaCacheValue.get();
-    }
-
-    public IcebergPartitionInfo loadPartitionInfo(long snapshotId) throws 
AnalysisException {
-        // snapshotId == UNKNOWN_SNAPSHOT_ID means this is an empty table, 
haven't contained any snapshot yet.
-        if (!isValidRelatedTable() || snapshotId == 
IcebergUtils.UNKNOWN_SNAPSHOT_ID) {
-            return new IcebergPartitionInfo();
-        }
-        List<IcebergPartition> icebergPartitions = 
loadIcebergPartition(snapshotId);
-        Map<String, IcebergPartition> nameToPartition = Maps.newHashMap();
-        Map<String, PartitionItem> nameToPartitionItem = Maps.newHashMap();
-        table = getIcebergTable();
-        partitionColumns = 
getIcebergSchemaCacheValue(table.snapshot(snapshotId).schemaId()).getPartitionColumns();
-        for (IcebergPartition partition : icebergPartitions) {
-            nameToPartition.put(partition.getPartitionName(), partition);
-            String transform = 
table.specs().get(partition.getSpecId()).fields().get(0).transform().toString();
-            Range<PartitionKey> partitionRange = getPartitionRange(
-                    partition.getPartitionValues().get(0), transform, 
partitionColumns);
-            PartitionItem item = new RangePartitionItem(partitionRange);
-            nameToPartitionItem.put(partition.getPartitionName(), item);
-        }
-        Map<String, Set<String>> partitionNameMap = 
mergeOverlapPartitions(nameToPartitionItem);
-        return new IcebergPartitionInfo(nameToPartitionItem, nameToPartition, 
partitionNameMap);
-    }
-
-    public List<IcebergPartition> loadIcebergPartition(long snapshotId) {
-        PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils
-                .createMetadataTableInstance(table, 
MetadataTableType.PARTITIONS);
-        List<IcebergPartition> partitions = Lists.newArrayList();
-        try (CloseableIterable<FileScanTask> tasks = 
partitionsTable.newScan().useSnapshot(snapshotId).planFiles()) {
-            for (FileScanTask task : tasks) {
-                CloseableIterable<StructLike> rows = task.asDataTask().rows();
-                for (StructLike row : rows) {
-                    partitions.add(generateIcebergPartition(row));
-                }
-            }
-        } catch (IOException e) {
-            LOG.warn("Failed to get Iceberg table {} partition info.", name, 
e);
-        }
-        return partitions;
-    }
-
-    public IcebergPartition generateIcebergPartition(StructLike row) {
-        // row format :
-        // 0. partitionData,
-        // 1. spec_id,
-        // 2. record_count,
-        // 3. file_count,
-        // 4. total_data_file_size_in_bytes,
-        // 5. position_delete_record_count,
-        // 6. position_delete_file_count,
-        // 7. equality_delete_record_count,
-        // 8. equality_delete_file_count,
-        // 9. last_updated_at,
-        // 10. last_updated_snapshot_id
-        table = getIcebergTable();
-        Preconditions.checkState(!table.spec().fields().isEmpty(), 
table.name() + " is not a partition table.");
-        int specId = row.get(1, Integer.class);
-        PartitionSpec partitionSpec = table.specs().get(specId);
-        StructProjection partitionData = row.get(0, StructProjection.class);
-        StringBuilder sb = new StringBuilder();
-        List<String> partitionValues = Lists.newArrayList();
-        List<String> transforms = Lists.newArrayList();
-        for (int i = 0; i < partitionSpec.fields().size(); ++i) {
-            PartitionField partitionField = partitionSpec.fields().get(i);
-            Class<?> fieldClass = partitionSpec.javaClasses()[i];
-            int fieldId = partitionField.fieldId();
-            // Iceberg partition field id starts at PARTITION_DATA_ID_START,
-            // So we can get the field index in partitionData using fieldId - 
PARTITION_DATA_ID_START
-            int index = fieldId - PARTITION_DATA_ID_START;
-            Object o = partitionData.get(index, fieldClass);
-            String fieldValue = o == null ? null : o.toString();
-            String fieldName = partitionField.name();
-            sb.append(fieldName);
-            sb.append("=");
-            sb.append(fieldValue);
-            sb.append("/");
-            partitionValues.add(fieldValue);
-            transforms.add(partitionField.transform().toString());
-        }
-        if (sb.length() > 0) {
-            sb.delete(sb.length() - 1, sb.length());
-        }
-        String partitionName = sb.toString();
-        long recordCount = row.get(2, Long.class);
-        long fileCount = row.get(3, Integer.class);
-        long fileSizeInBytes = row.get(4, Long.class);
-        long lastUpdateTime = row.get(9, Long.class);
-        long lastUpdateSnapShotId = row.get(10, Long.class);
-        return new IcebergPartition(partitionName, specId, recordCount, 
fileSizeInBytes, fileCount,
-                lastUpdateTime, lastUpdateSnapShotId, partitionValues, 
transforms);
-    }
-
-    @VisibleForTesting
-    public Range<PartitionKey> getPartitionRange(String value, String 
transform, List<Column> partitionColumns)
-            throws AnalysisException {
-        // For NULL value, create a minimum partition for it.
-        if (value == null) {
-            PartitionKey nullLowKey = PartitionKey.createPartitionKey(
-                    Lists.newArrayList(new PartitionValue("0000-01-01")), 
partitionColumns);
-            PartitionKey nullUpKey = nullLowKey.successor();
-            return Range.closedOpen(nullLowKey, nullUpKey);
-        }
-        LocalDateTime epoch = 
Instant.EPOCH.atZone(ZoneId.of("UTC")).toLocalDateTime();
-        LocalDateTime target;
-        LocalDateTime lower;
-        LocalDateTime upper;
-        long longValue = Long.parseLong(value);
-        switch (transform) {
-            case HOUR:
-                target = epoch.plusHours(longValue);
-                lower = LocalDateTime.of(target.getYear(), target.getMonth(), 
target.getDayOfMonth(),
-                        target.getHour(), 0, 0);
-                upper = lower.plusHours(1);
-                break;
-            case DAY:
-                target = epoch.plusDays(longValue);
-                lower = LocalDateTime.of(target.getYear(), target.getMonth(), 
target.getDayOfMonth(), 0, 0, 0);
-                upper = lower.plusDays(1);
-                break;
-            case MONTH:
-                target = epoch.plusMonths(longValue);
-                lower = LocalDateTime.of(target.getYear(), target.getMonth(), 
1, 0, 0, 0);
-                upper = lower.plusMonths(1);
-                break;
-            case YEAR:
-                target = epoch.plusYears(longValue);
-                lower = LocalDateTime.of(target.getYear(), Month.JANUARY, 1, 
0, 0, 0);
-                upper = lower.plusYears(1);
-                break;
-            default:
-                throw new RuntimeException("Unsupported transform " + 
transform);
-        }
-        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss");
-        Column c = partitionColumns.get(0);
-        Preconditions.checkState(c.getDataType().isDateType(), "Only support 
date type partition column");
-        if (c.getType().isDate() || c.getType().isDateV2()) {
-            formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
-        }
-        PartitionValue lowerValue = new 
PartitionValue(lower.format(formatter));
-        PartitionValue upperValue = new 
PartitionValue(upper.format(formatter));
-        PartitionKey lowKey = 
PartitionKey.createPartitionKey(Lists.newArrayList(lowerValue), 
partitionColumns);
-        PartitionKey upperKey =  
PartitionKey.createPartitionKey(Lists.newArrayList(upperValue), 
partitionColumns);
-        return Range.closedOpen(lowKey, upperKey);
-    }
-
-    /**
-     * Merge overlapped iceberg partitions into one Doris partition.
-     */
-    public Map<String, Set<String>> mergeOverlapPartitions(Map<String, 
PartitionItem> originPartitions) {
-        List<Map.Entry<String, PartitionItem>> entries = 
sortPartitionMap(originPartitions);
-        Map<String, Set<String>> map = Maps.newHashMap();
-        for (int i = 0; i < entries.size() - 1; i++) {
-            Range<PartitionKey> firstValue = 
entries.get(i).getValue().getItems();
-            String firstKey = entries.get(i).getKey();
-            Range<PartitionKey> secondValue = entries.get(i + 
1).getValue().getItems();
-            String secondKey = entries.get(i + 1).getKey();
-            // If the first entry enclose the second one, remove the second 
entry and keep a record in the return map.
-            // So we can track the iceberg partitions those contained by one 
Doris partition.
-            while (i < entries.size() && firstValue.encloses(secondValue)) {
-                originPartitions.remove(secondKey);
-                map.putIfAbsent(firstKey, Sets.newHashSet(firstKey));
-                String finalSecondKey = secondKey;
-                map.computeIfPresent(firstKey, (key, value) -> {
-                    value.add(finalSecondKey);
-                    return value;
-                });
-                i++;
-                if (i >= entries.size() - 1) {
-                    break;
-                }
-                secondValue = entries.get(i + 1).getValue().getItems();
-                secondKey = entries.get(i + 1).getKey();
-            }
-        }
-        return map;
-    }
-
-    /**
-     * Sort the given map entries by PartitionItem Range(LOW, HIGH)
-     * When comparing two ranges, the one with smaller LOW value is smaller 
than the other one.
-     * If two ranges have same values of LOW, the one with larger HIGH value 
is smaller.
-     *
-     * For now, we only support year, month, day and hour,
-     * so it is impossible to have two partially intersect partitions.
-     * One range is either enclosed by another or has no intersection at all 
with another.
-     *
-     *
-     * For example, we have these 4 ranges:
-     * [10, 20), [30, 40), [0, 30), [10, 15)
-     *
-     * After sort, they become:
-     * [0, 30), [10, 20), [10, 15), [30, 40)
-     */
-    public List<Map.Entry<String, PartitionItem>> sortPartitionMap(Map<String, 
PartitionItem> originPartitions) {
-        List<Map.Entry<String, PartitionItem>> entries = new 
ArrayList<>(originPartitions.entrySet());
-        entries.sort(new RangeComparator());
-        return entries;
-    }
-
-    public static class RangeComparator implements 
Comparator<Map.Entry<String, PartitionItem>> {
-        @Override
-        public int compare(Map.Entry<String, PartitionItem> p1, 
Map.Entry<String, PartitionItem> p2) {
-            PartitionItem value1 = p1.getValue();
-            PartitionItem value2 = p2.getValue();
-            if (value1 instanceof RangePartitionItem && value2 instanceof 
RangePartitionItem) {
-                Range<PartitionKey> items1 = value1.getItems();
-                Range<PartitionKey> items2 = value2.getItems();
-                if (!items1.hasLowerBound()) {
-                    return -1;
-                }
-                if (!items2.hasLowerBound()) {
-                    return 1;
-                }
-                PartitionKey upper1 = items1.upperEndpoint();
-                PartitionKey lower1 = items1.lowerEndpoint();
-                PartitionKey upper2 = items2.upperEndpoint();
-                PartitionKey lower2 = items2.lowerEndpoint();
-                int compareLow = lower1.compareTo(lower2);
-                return compareLow == 0 ? upper2.compareTo(upper1) : compareLow;
-            }
-            return 0;
-        }
-    }
-
     @VisibleForTesting
     public boolean isValidRelatedTableCached() {
         return isValidRelatedTableCached;
@@ -557,14 +257,6 @@ public class IcebergExternalTable extends ExternalTable 
implements MTMVRelatedTa
         this.isValidRelatedTableCached = isCached;
     }
 
-    public IcebergSnapshotCacheValue 
getOrFetchSnapshotCacheValue(Optional<MvccSnapshot> snapshot) {
-        if (snapshot.isPresent()) {
-            return ((IcebergMvccSnapshot) 
snapshot.get()).getSnapshotCacheValue();
-        } else {
-            return getIcebergSnapshotCacheValue();
-        }
-    }
-
     @Override
     public List<SysTable> getSupportedSysTables() {
         makeSureInitialized();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
index b776a8c3a47..f99b652b42d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
@@ -24,6 +24,7 @@ import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.ExternalMetaCacheMgr;
 import org.apache.doris.datasource.hive.HMSExternalCatalog;
+import org.apache.doris.mtmv.MTMVRelatedTableIf;
 
 import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.google.common.collect.Iterables;
@@ -123,12 +124,18 @@ public class IcebergMetadataCache {
 
     @NotNull
     private IcebergSnapshotCacheValue loadSnapshot(IcebergMetadataCacheKey 
key) throws AnalysisException {
-        IcebergExternalTable table = (IcebergExternalTable) 
key.catalog.getDbOrAnalysisException(key.dbName)
+        MTMVRelatedTableIf table = (MTMVRelatedTableIf) 
key.catalog.getDbOrAnalysisException(key.dbName)
                 .getTableOrAnalysisException(key.tableName);
-        long snapshotId = table.getLatestSnapshotId();
-        long schemaId = table.getSchemaId(snapshotId);
-        IcebergPartitionInfo icebergPartitionInfo = 
table.loadPartitionInfo(snapshotId);
-        return new IcebergSnapshotCacheValue(icebergPartitionInfo, new 
IcebergSnapshot(snapshotId, schemaId));
+        IcebergSnapshot lastedIcebergSnapshot = 
IcebergUtils.getLastedIcebergSnapshot(
+                (ExternalCatalog) key.catalog, key.dbName, key.tableName);
+        IcebergPartitionInfo icebergPartitionInfo;
+        if (!table.isValidRelatedTable()) {
+            icebergPartitionInfo = IcebergPartitionInfo.empty();
+        } else {
+            icebergPartitionInfo = IcebergUtils.loadPartitionInfo(
+                (ExternalCatalog) key.catalog, key.dbName, key.tableName, 
lastedIcebergSnapshot.getSnapshotId());
+        }
+        return new IcebergSnapshotCacheValue(icebergPartitionInfo, 
lastedIcebergSnapshot);
     }
 
     public void invalidateCatalogCache(long catalogId) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfo.java
index 9edb2137f4f..bb46eb21223 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfo.java
@@ -29,12 +29,18 @@ public class IcebergPartitionInfo {
     private final Map<String, IcebergPartition> nameToIcebergPartition;
     private final Map<String, Set<String>> nameToIcebergPartitionNames;
 
+    private static final IcebergPartitionInfo EMPTY = new 
IcebergPartitionInfo();
+
     public IcebergPartitionInfo() {
         this.nameToPartitionItem = Maps.newHashMap();
         this.nameToIcebergPartition = Maps.newHashMap();
         this.nameToIcebergPartitionNames = Maps.newHashMap();
     }
 
+    static IcebergPartitionInfo empty() {
+        return EMPTY;
+    }
+
     public IcebergPartitionInfo(Map<String, PartitionItem> nameToPartitionItem,
                                Map<String, IcebergPartition> 
nameToIcebergPartition,
                                 Map<String, Set<String>> 
nameToIcebergPartitionNames) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
index f61c226ed75..ab91d9c3d90 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
@@ -31,39 +31,58 @@ import org.apache.doris.analysis.IntLiteral;
 import org.apache.doris.analysis.LiteralExpr;
 import org.apache.doris.analysis.NullLiteral;
 import org.apache.doris.analysis.PartitionDesc;
+import org.apache.doris.analysis.PartitionValue;
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.StringLiteral;
 import org.apache.doris.analysis.Subquery;
+import org.apache.doris.analysis.TableSnapshot;
 import org.apache.doris.catalog.ArrayType;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MapType;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionKey;
+import org.apache.doris.catalog.RangePartitionItem;
 import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.StructField;
 import org.apache.doris.catalog.StructType;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.Type;
+import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.info.SimpleTableInfo;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.datasource.CacheException;
 import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.ExternalSchemaCache;
 import org.apache.doris.datasource.SchemaCacheValue;
+import org.apache.doris.datasource.mvcc.MvccSnapshot;
+import org.apache.doris.datasource.mvcc.MvccUtil;
 import org.apache.doris.datasource.property.constants.HMSProperties;
 import org.apache.doris.nereids.exceptions.NotSupportedException;
 import org.apache.doris.thrift.TExprOpcode;
 
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionsTable;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.expressions.And;
@@ -79,14 +98,25 @@ import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.types.Type.TypeID;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.LocationUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.StructProjection;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.io.IOException;
+import java.time.DateTimeException;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.Month;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -117,7 +147,15 @@ public class IcebergUtils {
     // nickname in spark
     public static final String SPARK_SQL_COMPRESSION_CODEC = 
"spark.sql.iceberg.compression-codec";
 
-    public static final long UNKNOWN_SNAPSHOT_ID = -1;
+    public static final long UNKNOWN_SNAPSHOT_ID = -1;  // means an empty table
+    public static final long NEWEST_SCHEMA_ID = -1;
+
+    public static final String YEAR = "year";
+    public static final String MONTH = "month";
+    public static final String DAY = "day";
+    public static final String HOUR = "hour";
+    public static final String IDENTITY = "identity";
+    public static final int PARTITION_DATA_ID_START = 1000; // 
org.apache.iceberg.PartitionSpec
 
     public static Expression convertToIcebergExpr(Expr expr, Schema schema) {
         if (expr == null) {
@@ -582,10 +620,6 @@ public class IcebergUtils {
                 : metadataCache.getIcebergTable(catalog, dbName, tblName);
     }
 
-    public static List<Column> getSchema(ExternalCatalog catalog, String 
dbName, String name) {
-        return getSchema(catalog, dbName, name, UNKNOWN_SNAPSHOT_ID);
-    }
-
     /**
      * Get iceberg schema from catalog and convert them to doris schema
      */
@@ -594,7 +628,7 @@ public class IcebergUtils {
             return catalog.getPreExecutionAuthenticator().execute(() -> {
                 org.apache.iceberg.Table icebergTable = 
getIcebergTable(catalog, dbName, name);
                 Schema schema;
-                if (schemaId == UNKNOWN_SNAPSHOT_ID || 
icebergTable.currentSnapshot() == null) {
+                if (schemaId == NEWEST_SCHEMA_ID || 
icebergTable.currentSnapshot() == null) {
                     schema = icebergTable.schema();
                 } else {
                     schema = icebergTable.schemas().get((int) schemaId);
@@ -725,25 +759,6 @@ public class IcebergUtils {
         return hiveCatalog;
     }
 
-    // load table schema from iceberg API to external schema cache.
-    public static Optional<SchemaCacheValue> loadSchemaCacheValue(
-            ExternalCatalog catalog, String dbName, String tbName, long 
schemaId) {
-        Table table = IcebergUtils.getIcebergTable(catalog, dbName, tbName);
-        List<Column> schema = IcebergUtils.getSchema(catalog, dbName, tbName, 
schemaId);
-        List<Column> tmpColumns = Lists.newArrayList();
-        PartitionSpec spec = table.spec();
-        for (PartitionField field : spec.fields()) {
-            Types.NestedField col = table.schema().findField(field.sourceId());
-            for (Column c : schema) {
-                if (c.getName().equalsIgnoreCase(col.name())) {
-                    tmpColumns.add(c);
-                    break;
-                }
-            }
-        }
-        return Optional.of(new IcebergSchemaCacheValue(schema, tmpColumns));
-    }
-
     // Retrieve the manifest files that match the query based on partitions in 
filter
     public static CloseableIterable<ManifestFile> getMatchingManifest(
                 List<ManifestFile> dataManifests,
@@ -772,4 +787,330 @@ public class IcebergUtils {
 
         return matchingManifests;
     }
+
+    // get snapshot id from query like 'for version/time as of'
+    public static long getQuerySpecSnapshot(Table table, TableSnapshot 
queryTableSnapshot) {
+        TableSnapshot.VersionType type = queryTableSnapshot.getType();
+        if (type == TableSnapshot.VersionType.VERSION) {
+            return queryTableSnapshot.getVersion();
+        } else {
+            long timestamp = 
TimeUtils.timeStringToLong(queryTableSnapshot.getTime(), 
TimeUtils.getTimeZone());
+            if (timestamp < 0) {
+                throw new DateTimeException("can't parse time: " + 
queryTableSnapshot.getTime());
+            }
+            return SnapshotUtil.snapshotIdAsOfTime(table, timestamp);
+        }
+    }
+
+    // read schema from external schema cache
+    public static IcebergSchemaCacheValue getSchemaCacheValue(
+            ExternalCatalog catalog, String dbName, String name, long 
schemaId) {
+        ExternalSchemaCache cache = 
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog);
+        Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue(
+            new IcebergSchemaCacheKey(dbName, name, schemaId));
+        if (!schemaCacheValue.isPresent()) {
+            throw new CacheException("failed to getSchema for: %s.%s.%s.%s",
+                null, catalog.getName(), dbName, name, schemaId);
+        }
+        return (IcebergSchemaCacheValue) schemaCacheValue.get();
+    }
+
+    public static IcebergSnapshot getLastedIcebergSnapshot(ExternalCatalog 
catalog, String dbName, String tbName) {
+        Table table = IcebergUtils.getIcebergTable(catalog, dbName, tbName);
+        Snapshot snapshot = table.currentSnapshot();
+        long snapshotId = snapshot == null ? IcebergUtils.UNKNOWN_SNAPSHOT_ID 
: snapshot.snapshotId();
+        return new IcebergSnapshot(snapshotId, table.schema().schemaId());
+    }
+
+    public static IcebergPartitionInfo loadPartitionInfo(
+            ExternalCatalog catalog, String dbName, String tbName, long 
snapshotId) throws AnalysisException {
+        // snapshotId == UNKNOWN_SNAPSHOT_ID means this is an empty table, 
haven't contained any snapshot yet.
+        if (snapshotId == IcebergUtils.UNKNOWN_SNAPSHOT_ID) {
+            return IcebergPartitionInfo.empty();
+        }
+        Table table = getIcebergTable(catalog, dbName, tbName);
+        List<IcebergPartition> icebergPartitions = loadIcebergPartition(table, 
snapshotId);
+        Map<String, IcebergPartition> nameToPartition = Maps.newHashMap();
+        Map<String, PartitionItem> nameToPartitionItem = Maps.newHashMap();
+
+        List<Column> partitionColumns = IcebergUtils.getSchemaCacheValue(
+                catalog, dbName, tbName, 
table.snapshot(snapshotId).schemaId()).getPartitionColumns();
+        for (IcebergPartition partition : icebergPartitions) {
+            nameToPartition.put(partition.getPartitionName(), partition);
+            String transform = 
table.specs().get(partition.getSpecId()).fields().get(0).transform().toString();
+            Range<PartitionKey> partitionRange = getPartitionRange(
+                    partition.getPartitionValues().get(0), transform, 
partitionColumns);
+            PartitionItem item = new RangePartitionItem(partitionRange);
+            nameToPartitionItem.put(partition.getPartitionName(), item);
+        }
+        Map<String, Set<String>> partitionNameMap = 
mergeOverlapPartitions(nameToPartitionItem);
+        return new IcebergPartitionInfo(nameToPartitionItem, nameToPartition, 
partitionNameMap);
+    }
+
+    private static List<IcebergPartition> loadIcebergPartition(Table table, 
long snapshotId) {
+        PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils
+                .createMetadataTableInstance(table, 
MetadataTableType.PARTITIONS);
+        List<IcebergPartition> partitions = Lists.newArrayList();
+        try (CloseableIterable<FileScanTask> tasks = 
partitionsTable.newScan().useSnapshot(snapshotId).planFiles()) {
+            for (FileScanTask task : tasks) {
+                CloseableIterable<StructLike> rows = task.asDataTask().rows();
+                for (StructLike row : rows) {
+                    partitions.add(generateIcebergPartition(table, row));
+                }
+            }
+        } catch (IOException e) {
+            LOG.warn("Failed to get Iceberg table {} partition info.", 
table.name(), e);
+        }
+        return partitions;
+    }
+
+    private static IcebergPartition generateIcebergPartition(Table table, 
StructLike row) {
+        // row format :
+        // 0. partitionData,
+        // 1. spec_id,
+        // 2. record_count,
+        // 3. file_count,
+        // 4. total_data_file_size_in_bytes,
+        // 5. position_delete_record_count,
+        // 6. position_delete_file_count,
+        // 7. equality_delete_record_count,
+        // 8. equality_delete_file_count,
+        // 9. last_updated_at,
+        // 10. last_updated_snapshot_id
+        Preconditions.checkState(!table.spec().fields().isEmpty(), 
table.name() + " is not a partition table.");
+        int specId = row.get(1, Integer.class);
+        PartitionSpec partitionSpec = table.specs().get(specId);
+        StructProjection partitionData = row.get(0, StructProjection.class);
+        StringBuilder sb = new StringBuilder();
+        List<String> partitionValues = Lists.newArrayList();
+        List<String> transforms = Lists.newArrayList();
+        for (int i = 0; i < partitionSpec.fields().size(); ++i) {
+            PartitionField partitionField = partitionSpec.fields().get(i);
+            Class<?> fieldClass = partitionSpec.javaClasses()[i];
+            int fieldId = partitionField.fieldId();
+            // Iceberg partition field id starts at PARTITION_DATA_ID_START,
+            // So we can get the field index in partitionData using fieldId - 
PARTITION_DATA_ID_START
+            int index = fieldId - PARTITION_DATA_ID_START;
+            Object o = partitionData.get(index, fieldClass);
+            String fieldValue = o == null ? null : o.toString();
+            String fieldName = partitionField.name();
+            sb.append(fieldName);
+            sb.append("=");
+            sb.append(fieldValue);
+            sb.append("/");
+            partitionValues.add(fieldValue);
+            transforms.add(partitionField.transform().toString());
+        }
+        if (sb.length() > 0) {
+            sb.delete(sb.length() - 1, sb.length());
+        }
+        String partitionName = sb.toString();
+        long recordCount = row.get(2, Long.class);
+        long fileCount = row.get(3, Integer.class);
+        long fileSizeInBytes = row.get(4, Long.class);
+        long lastUpdateTime = row.get(9, Long.class);
+        long lastUpdateSnapShotId = row.get(10, Long.class);
+        return new IcebergPartition(partitionName, specId, recordCount, 
fileSizeInBytes, fileCount,
+            lastUpdateTime, lastUpdateSnapShotId, partitionValues, transforms);
+    }
+
+    @VisibleForTesting
+    public static Range<PartitionKey> getPartitionRange(String value, String 
transform, List<Column> partitionColumns)
+            throws AnalysisException {
+        // For NULL value, create a minimum partition for it.
+        if (value == null) {
+            PartitionKey nullLowKey = PartitionKey.createPartitionKey(
+                    Lists.newArrayList(new PartitionValue("0000-01-01")), 
partitionColumns);
+            PartitionKey nullUpKey = nullLowKey.successor();
+            return Range.closedOpen(nullLowKey, nullUpKey);
+        }
+        LocalDateTime epoch = 
Instant.EPOCH.atZone(ZoneId.of("UTC")).toLocalDateTime();
+        LocalDateTime target;
+        LocalDateTime lower;
+        LocalDateTime upper;
+        long longValue = Long.parseLong(value);
+        switch (transform) {
+            case HOUR:
+                target = epoch.plusHours(longValue);
+                lower = LocalDateTime.of(target.getYear(), target.getMonth(), 
target.getDayOfMonth(),
+                    target.getHour(), 0, 0);
+                upper = lower.plusHours(1);
+                break;
+            case DAY:
+                target = epoch.plusDays(longValue);
+                lower = LocalDateTime.of(target.getYear(), target.getMonth(), 
target.getDayOfMonth(), 0, 0, 0);
+                upper = lower.plusDays(1);
+                break;
+            case MONTH:
+                target = epoch.plusMonths(longValue);
+                lower = LocalDateTime.of(target.getYear(), target.getMonth(), 
1, 0, 0, 0);
+                upper = lower.plusMonths(1);
+                break;
+            case YEAR:
+                target = epoch.plusYears(longValue);
+                lower = LocalDateTime.of(target.getYear(), Month.JANUARY, 1, 
0, 0, 0);
+                upper = lower.plusYears(1);
+                break;
+            default:
+                throw new RuntimeException("Unsupported transform " + 
transform);
+        }
+        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss");
+        Column c = partitionColumns.get(0);
+        Preconditions.checkState(c.getDataType().isDateType(), "Only support 
date type partition column");
+        if (c.getType().isDate() || c.getType().isDateV2()) {
+            formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
+        }
+        PartitionValue lowerValue = new 
PartitionValue(lower.format(formatter));
+        PartitionValue upperValue = new 
PartitionValue(upper.format(formatter));
+        PartitionKey lowKey = 
PartitionKey.createPartitionKey(Lists.newArrayList(lowerValue), 
partitionColumns);
+        PartitionKey upperKey =  
PartitionKey.createPartitionKey(Lists.newArrayList(upperValue), 
partitionColumns);
+        return Range.closedOpen(lowKey, upperKey);
+    }
+
+    /**
+     * Merge overlapped iceberg partitions into one Doris partition.
+     */
+    @VisibleForTesting
+    public static Map<String, Set<String>> mergeOverlapPartitions(Map<String, 
PartitionItem> originPartitions) {
+        List<Map.Entry<String, PartitionItem>> entries = 
sortPartitionMap(originPartitions);
+        Map<String, Set<String>> map = Maps.newHashMap();
+        for (int i = 0; i < entries.size() - 1; i++) {
+            Range<PartitionKey> firstValue = 
entries.get(i).getValue().getItems();
+            String firstKey = entries.get(i).getKey();
+            Range<PartitionKey> secondValue = entries.get(i + 
1).getValue().getItems();
+            String secondKey = entries.get(i + 1).getKey();
+            // If the first entry enclose the second one, remove the second 
entry and keep a record in the return map.
+            // So we can track the iceberg partitions those contained by one 
Doris partition.
+            while (i < entries.size() && firstValue.encloses(secondValue)) {
+                originPartitions.remove(secondKey);
+                map.putIfAbsent(firstKey, Sets.newHashSet(firstKey));
+                String finalSecondKey = secondKey;
+                map.computeIfPresent(firstKey, (key, value) -> {
+                    value.add(finalSecondKey);
+                    return value;
+                });
+                i++;
+                if (i >= entries.size() - 1) {
+                    break;
+                }
+                secondValue = entries.get(i + 1).getValue().getItems();
+                secondKey = entries.get(i + 1).getKey();
+            }
+        }
+        return map;
+    }
+
+    /**
+     * Sort the given map entries by PartitionItem Range(LOW, HIGH)
+     * When comparing two ranges, the one with smaller LOW value is smaller 
than the other one.
+     * If two ranges have same values of LOW, the one with larger HIGH value 
is smaller.
+     *
+     * For now, we only support year, month, day and hour,
+     * so it is impossible to have two partially intersect partitions.
+     * One range is either enclosed by another or has no intersection at all 
with another.
+     *
+     *
+     * For example, we have these 4 ranges:
+     * [10, 20), [30, 40), [0, 30), [10, 15)
+     *
+     * After sort, they become:
+     * [0, 30), [10, 20), [10, 15), [30, 40)
+     */
+    @VisibleForTesting
+    public static List<Map.Entry<String, PartitionItem>> 
sortPartitionMap(Map<String, PartitionItem> originPartitions) {
+        List<Map.Entry<String, PartitionItem>> entries = new 
ArrayList<>(originPartitions.entrySet());
+        entries.sort(new RangeComparator());
+        return entries;
+    }
+
+    public static class RangeComparator implements 
Comparator<Map.Entry<String, PartitionItem>> {
+        @Override
+        public int compare(Map.Entry<String, PartitionItem> p1, 
Map.Entry<String, PartitionItem> p2) {
+            PartitionItem value1 = p1.getValue();
+            PartitionItem value2 = p2.getValue();
+            if (value1 instanceof RangePartitionItem && value2 instanceof 
RangePartitionItem) {
+                Range<PartitionKey> items1 = value1.getItems();
+                Range<PartitionKey> items2 = value2.getItems();
+                if (!items1.hasLowerBound()) {
+                    return -1;
+                }
+                if (!items2.hasLowerBound()) {
+                    return 1;
+                }
+                PartitionKey upper1 = items1.upperEndpoint();
+                PartitionKey lower1 = items1.lowerEndpoint();
+                PartitionKey upper2 = items2.upperEndpoint();
+                PartitionKey lower2 = items2.lowerEndpoint();
+                int compareLow = lower1.compareTo(lower2);
+                return compareLow == 0 ? upper2.compareTo(upper1) : compareLow;
+            }
+            return 0;
+        }
+    }
+
+    public static IcebergSnapshotCacheValue getIcebergSnapshotCacheValue(
+            Optional<TableSnapshot> tableSnapshot,
+            ExternalCatalog catalog,
+            String dbName,
+            String tbName) {
+        IcebergSnapshotCacheValue snapshotCache = 
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache()
+                .getSnapshotCache(catalog, dbName, tbName);
+        if (tableSnapshot.isPresent()) {
+            // If a snapshot is specified,
+            // use the specified snapshot and the corresponding schema(not the 
latest schema).
+            Table icebergTable = getIcebergTable(catalog, dbName, tbName);
+            TableSnapshot snapshot = tableSnapshot.get();
+            long querySpecSnapshot = getQuerySpecSnapshot(icebergTable, 
snapshot);
+            return new IcebergSnapshotCacheValue(
+                IcebergPartitionInfo.empty(),
+                    new IcebergSnapshot(querySpecSnapshot, 
icebergTable.snapshot(querySpecSnapshot).schemaId()));
+        } else {
+            // Otherwise, use the latest snapshot and the latest schema.
+            return snapshotCache;
+        }
+    }
+
+    // load table schema from iceberg API to external schema cache.
+    public static Optional<SchemaCacheValue> loadSchemaCacheValue(
+            ExternalCatalog catalog, String dbName, String tbName, long 
schemaId) {
+        Table table = IcebergUtils.getIcebergTable(catalog, dbName, tbName);
+        List<Column> schema = IcebergUtils.getSchema(catalog, dbName, tbName, 
schemaId);
+        List<Column> tmpColumns = Lists.newArrayList();
+        PartitionSpec spec = table.spec();
+        for (PartitionField field : spec.fields()) {
+            Types.NestedField col = table.schema().findField(field.sourceId());
+            for (Column c : schema) {
+                if (c.getName().equalsIgnoreCase(col.name())) {
+                    tmpColumns.add(c);
+                    break;
+                }
+            }
+        }
+        return Optional.of(new IcebergSchemaCacheValue(schema, tmpColumns));
+    }
+
+    public static List<Column> getIcebergSchema(
+            TableIf tableIf,
+            ExternalCatalog catalog,
+            String dbName,
+            String tbName) {
+        Optional<MvccSnapshot> snapshotFromContext = 
MvccUtil.getSnapshotFromContext(tableIf);
+        IcebergSnapshotCacheValue cacheValue =
+                IcebergUtils.getOrFetchSnapshotCacheValue(snapshotFromContext, 
catalog, dbName, tbName);
+        return IcebergUtils.getSchemaCacheValue(
+                catalog, dbName, tbName, 
cacheValue.getSnapshot().getSchemaId())
+            .getSchema();
+    }
+
+    public static IcebergSnapshotCacheValue getOrFetchSnapshotCacheValue(
+            Optional<MvccSnapshot> snapshot,
+            ExternalCatalog catalog,
+            String dbName,
+            String tbName) {
+        if (snapshot.isPresent()) {
+            return ((IcebergMvccSnapshot) 
snapshot.get()).getSnapshotCacheValue();
+        } else {
+            return IcebergUtils.getIcebergSnapshotCacheValue(Optional.empty(), 
catalog, dbName, tbName);
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index 34efa59a459..d2f31214b0b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -28,7 +28,6 @@ import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
 import 
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
 import org.apache.doris.common.util.LocationPath;
-import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.FileQueryScanNode;
 import org.apache.doris.datasource.TableFormatType;
@@ -68,13 +67,11 @@ import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.types.Conversions;
-import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.iceberg.util.TableScanUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.io.IOException;
-import java.time.DateTimeException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -446,16 +443,7 @@ public class IcebergScanNode extends FileQueryScanNode {
     public Long getSpecifiedSnapshot() {
         TableSnapshot tableSnapshot = getQueryTableSnapshot();
         if (tableSnapshot != null) {
-            TableSnapshot.VersionType type = tableSnapshot.getType();
-            if (type == TableSnapshot.VersionType.VERSION) {
-                return tableSnapshot.getVersion();
-            } else {
-                long timestamp = 
TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone());
-                if (timestamp < 0) {
-                    throw new DateTimeException("can't parse time: " + 
tableSnapshot.getTime());
-                }
-                return SnapshotUtil.snapshotIdAsOfTime(icebergTable, 
timestamp);
-            }
+            return IcebergUtils.getQuerySpecSnapshot(icebergTable, 
tableSnapshot);
         }
         return null;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index e68ab2476d9..edc54e230be 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -110,6 +110,7 @@ public abstract class ScanNode extends PlanNode implements 
SplitGenerator {
     protected final List<SortNode> topnFilterSortNodes = Lists.newArrayList();
 
     protected TableSnapshot tableSnapshot;
+    protected List<Column> columns;
 
     // Save the id of backends which this scan node will be executed on.
     // This is also important for local shuffle logic.
@@ -141,6 +142,13 @@ public abstract class ScanNode extends PlanNode implements 
SplitGenerator {
         return result;
     }
 
+    protected List<Column> getColumns() {
+        if (columns == null && desc.getTable() != null) {
+            columns = desc.getTable().getBaseSchema();
+        }
+        return columns;
+    }
+
     public TupleDescriptor getTupleDesc() {
         return desc;
     }
@@ -233,7 +241,7 @@ public abstract class ScanNode extends PlanNode implements 
SplitGenerator {
         // for load scan node, table is null
         // partitionsInfo maybe null for other scan node, eg: 
ExternalScanNode...
         if (desc.getTable() != null) {
-            computeColumnsFilter(desc.getTable().getBaseSchema(), 
partitionsInfo);
+            computeColumnsFilter(getColumns(), partitionsInfo);
         }
     }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java
index 251f4d8f6b6..7408898ab74 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java
@@ -17,8 +17,16 @@
 
 package org.apache.doris.datasource.iceberg;
 
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionKey;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.RangePartitionItem;
+import org.apache.doris.common.AnalysisException;
+
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
@@ -32,8 +40,10 @@ import org.mockito.ArgumentMatchers;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public class IcebergExternalTableTest {
 
@@ -136,4 +146,108 @@ public class IcebergExternalTableTest {
         Assertions.assertTrue(spyTable.isValidRelatedTableCached());
         Assertions.assertTrue(spyTable.validRelatedTableCache());
     }
+
+    @Test
+    public void testGetPartitionRange() throws AnalysisException {
+        Column c = new Column("ts", PrimitiveType.DATETIMEV2);
+        List<Column> partitionColumns = Lists.newArrayList(c);
+
+        // Test null partition value
+        Range<PartitionKey> nullRange = IcebergUtils.getPartitionRange(null, 
"hour", partitionColumns);
+        Assertions.assertEquals("0000-01-01 00:00:00",
+                
nullRange.lowerEndpoint().getPartitionValuesAsStringList().get(0));
+        Assertions.assertEquals("0000-01-01 00:00:01",
+                
nullRange.upperEndpoint().getPartitionValuesAsStringList().get(0));
+
+        // Test hour transform.
+        Range<PartitionKey> hour = IcebergUtils.getPartitionRange("100", 
"hour", partitionColumns);
+        PartitionKey lowKey = hour.lowerEndpoint();
+        PartitionKey upKey = hour.upperEndpoint();
+        Assertions.assertEquals("1970-01-05 04:00:00", 
lowKey.getPartitionValuesAsStringList().get(0));
+        Assertions.assertEquals("1970-01-05 05:00:00", 
upKey.getPartitionValuesAsStringList().get(0));
+
+        // Test day transform.
+        Range<PartitionKey> day = IcebergUtils.getPartitionRange("100", "day", 
partitionColumns);
+        lowKey = day.lowerEndpoint();
+        upKey = day.upperEndpoint();
+        Assertions.assertEquals("1970-04-11 00:00:00", 
lowKey.getPartitionValuesAsStringList().get(0));
+        Assertions.assertEquals("1970-04-12 00:00:00", 
upKey.getPartitionValuesAsStringList().get(0));
+
+        // Test month transform.
+        Range<PartitionKey> month = IcebergUtils.getPartitionRange("100", 
"month", partitionColumns);
+        lowKey = month.lowerEndpoint();
+        upKey = month.upperEndpoint();
+        Assertions.assertEquals("1978-05-01 00:00:00", 
lowKey.getPartitionValuesAsStringList().get(0));
+        Assertions.assertEquals("1978-06-01 00:00:00", 
upKey.getPartitionValuesAsStringList().get(0));
+
+        // Test year transform.
+        Range<PartitionKey> year = IcebergUtils.getPartitionRange("100", 
"year", partitionColumns);
+        lowKey = year.lowerEndpoint();
+        upKey = year.upperEndpoint();
+        Assertions.assertEquals("2070-01-01 00:00:00", 
lowKey.getPartitionValuesAsStringList().get(0));
+        Assertions.assertEquals("2071-01-01 00:00:00", 
upKey.getPartitionValuesAsStringList().get(0));
+
+        // Test unsupported transform
+        Exception exception = Assertions.assertThrows(RuntimeException.class, 
() -> {
+            IcebergUtils.getPartitionRange("100", "bucket", partitionColumns);
+        });
+        Assertions.assertEquals("Unsupported transform bucket", 
exception.getMessage());
+    }
+
+    @Test
+    public void testSortRange() throws AnalysisException {
+        Column c = new Column("c", PrimitiveType.DATETIMEV2);
+        ArrayList<Column> columns = Lists.newArrayList(c);
+        PartitionItem nullRange = new 
RangePartitionItem(IcebergUtils.getPartitionRange(null, "hour", columns));
+        PartitionItem year1970 = new 
RangePartitionItem(IcebergUtils.getPartitionRange("0", "year", columns));
+        PartitionItem year1971 = new 
RangePartitionItem(IcebergUtils.getPartitionRange("1", "year", columns));
+        PartitionItem month197002 = new 
RangePartitionItem(IcebergUtils.getPartitionRange("1", "month", columns));
+        PartitionItem month197103 = new 
RangePartitionItem(IcebergUtils.getPartitionRange("14", "month", columns));
+        PartitionItem month197204 = new 
RangePartitionItem(IcebergUtils.getPartitionRange("27", "month", columns));
+        PartitionItem day19700202 = new 
RangePartitionItem(IcebergUtils.getPartitionRange("32", "day", columns));
+        PartitionItem day19730101 = new 
RangePartitionItem(IcebergUtils.getPartitionRange("1096", "day", columns));
+        Map<String, PartitionItem> map = Maps.newHashMap();
+        map.put("nullRange", nullRange);
+        map.put("year1970", year1970);
+        map.put("year1971", year1971);
+        map.put("month197002", month197002);
+        map.put("month197103", month197103);
+        map.put("month197204", month197204);
+        map.put("day19700202", day19700202);
+        map.put("day19730101", day19730101);
+        List<Map.Entry<String, PartitionItem>> entries = 
IcebergUtils.sortPartitionMap(map);
+        Assertions.assertEquals(8, entries.size());
+        Assertions.assertEquals("nullRange", entries.get(0).getKey());
+        Assertions.assertEquals("year1970", entries.get(1).getKey());
+        Assertions.assertEquals("month197002", entries.get(2).getKey());
+        Assertions.assertEquals("day19700202", entries.get(3).getKey());
+        Assertions.assertEquals("year1971", entries.get(4).getKey());
+        Assertions.assertEquals("month197103", entries.get(5).getKey());
+        Assertions.assertEquals("month197204", entries.get(6).getKey());
+        Assertions.assertEquals("day19730101", entries.get(7).getKey());
+
+        Map<String, Set<String>> stringSetMap = 
IcebergUtils.mergeOverlapPartitions(map);
+        Assertions.assertEquals(2, stringSetMap.size());
+        Assertions.assertTrue(stringSetMap.containsKey("year1970"));
+        Assertions.assertTrue(stringSetMap.containsKey("year1971"));
+
+        Set<String> names1970 = stringSetMap.get("year1970");
+        Assertions.assertEquals(3, names1970.size());
+        Assertions.assertTrue(names1970.contains("year1970"));
+        Assertions.assertTrue(names1970.contains("month197002"));
+        Assertions.assertTrue(names1970.contains("day19700202"));
+
+        Set<String> names1971 = stringSetMap.get("year1971");
+        Assertions.assertEquals(2, names1971.size());
+        Assertions.assertTrue(names1971.contains("year1971"));
+        Assertions.assertTrue(names1971.contains("month197103"));
+
+        Assertions.assertEquals(5, map.size());
+        Assertions.assertTrue(map.containsKey("nullRange"));
+        Assertions.assertTrue(map.containsKey("year1970"));
+        Assertions.assertTrue(map.containsKey("year1971"));
+        Assertions.assertTrue(map.containsKey("month197204"));
+        Assertions.assertTrue(map.containsKey("day19730101"));
+    }
 }
+
diff --git 
a/regression-test/data/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.out
 
b/regression-test/data/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.out
new file mode 100644
index 00000000000..27e9359b713
Binary files /dev/null and 
b/regression-test/data/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.out
 differ
diff --git 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.groovy
 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.groovy
new file mode 100644
index 00000000000..a376dfc210d
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.groovy
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+
+suite("iceberg_schema_change_with_timetravel", 
"p0,external,doris,external_docker,external_docker_doris") {
+
+    String enabled = context.config.otherConfigs.get("enableIcebergTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disable iceberg test.")
+        return
+    }
+
+    String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
+    String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    String catalog_name = "iceberg_schema_change_with_timetravel"
+
+    sql """drop catalog if exists ${catalog_name}"""
+    sql """
+    CREATE CATALOG ${catalog_name} PROPERTIES (
+        'type'='iceberg',
+        'iceberg.catalog.type'='rest',
+        'uri' = 'http://${externalEnvIp}:${rest_port}',
+        "s3.access_key" = "admin",
+        "s3.secret_key" = "password",
+        "s3.endpoint" = "http://${externalEnvIp}:${minio_port}";,
+        "s3.region" = "us-east-1"
+    );"""
+
+    logger.info("catalog " + catalog_name + " created")
+    sql """switch ${catalog_name};"""
+    logger.info("switched to catalog " + catalog_name)
+    sql """ use test_db;""" 
+
+    def executeTimeTravelingQueries = { String tableName ->
+        def snapshots = sql """ select snapshot_id from iceberg_meta("table" = 
"${catalog_name}.test_db.${tableName}", "query_type" = "snapshots") order by 
committed_at; """
+        def snapshotIds = [
+            s0: snapshots.get(0)[0],
+            s1: snapshots.get(1)[0],
+            s2: snapshots.get(2)[0],
+            s3: snapshots.get(3)[0],
+            s4: snapshots.get(4)[0]
+        ]
+
+        qt_q0 """ desc ${tableName} """
+        qt_q1 """ select * from ${tableName} order by c1 """
+        qt_q2 """ select * from ${tableName} for version as of 
${snapshotIds.s0} order by c1 """
+        qt_q3 """ select * from ${tableName} for version as of 
${snapshotIds.s1} order by c1 """
+        qt_q4 """ select * from ${tableName} for version as of 
${snapshotIds.s2} order by c1 """
+        qt_q5 """ select * from ${tableName} for version as of 
${snapshotIds.s3} order by c1 """
+        qt_q6 """ select * from ${tableName} for version as of 
${snapshotIds.s4} order by c1 """
+    }
+
+    executeTimeTravelingQueries("schema_change_with_time_travel")
+    executeTimeTravelingQueries("schema_change_with_time_travel_orc")
+
+}
+
+/*
+create table schema_change_with_time_travel (c1 int);
+insert into schema_change_with_time_travel values (1);
+alter table schema_change_with_time_travel add column c2 int;
+insert into schema_change_with_time_travel values (2,3);
+alter table schema_change_with_time_travel add column c3 int; 
+insert into schema_change_with_time_travel values (4,5,6);
+alter table schema_change_with_time_travel drop column c2;
+insert into schema_change_with_time_travel values (7,8);
+alter table schema_change_with_time_travel add column c2 int;
+insert into schema_change_with_time_travel values (9,10,11);
+alter table schema_change_with_time_travel add column c4 int;
+*/
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to