This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new eea082b4540 [refactor](iceberg) Refactor iceberg system tables to use 
native table execution path (#61646)
eea082b4540 is described below

commit eea082b4540ae8f31a5b9deee4f9fefdeefc050e
Author: Socrates <[email protected]>
AuthorDate: Tue Mar 31 23:32:46 2026 +0800

    [refactor](iceberg) Refactor iceberg system tables to use native table 
execution path (#61646)
    
    ### What problem does this PR solve?
    
    Issue Number: N/A
    
    Related PR: #60556
    
    ## Summary
    
    - Refactor Iceberg system tables (`$snapshots`, `$history`,
    `$manifests`, `$files`, `$entries`, `$metadata_log_entries`,
    `$partitions`, `$refs`) to use the native sys-table path
    (`NativeSysTable -> FileQueryScanNode -> IcebergScanNode`) instead of
    the TVF path (`iceberg_meta()` -> `MetadataScanNode`)
    - Introduce `IcebergSysExternalTable` as the native wrapper for Iceberg
    metadata tables and route query / describe / auth / show-create flows
    through the source Iceberg table
    - Keep `position_deletes` explicitly unsupported with the existing
    `SysTable position_deletes is not supported yet` behavior
    - Add the minimum FE/BE protocol and scanner changes required to read
    Iceberg system-table splits through `FORMAT_JNI`
    - Remove the `iceberg_meta()` TVF exposure and migrate regression
    coverage to direct `table$system_table` access
    - Clean up the obsolete Iceberg metadata-scan path and tighten
    Iceberg/Paimon system-table scanner behavior after the refactor
    
    ## Motivation
    
    Previously, Iceberg system tables were queried through `iceberg_meta()`,
    which:
    - Forced system-table queries onto the `MetadataScanNode` path instead
    of the regular file-query path
    - Kept Iceberg system table planning and execution separate from normal
    Iceberg scans
    - Required dedicated TVF-only coverage and made auth / describe /
    show-create handling more fragmented
    - Left a second execution path to maintain even after native sys-table
    support was added
    
    This refactor moves Iceberg system tables onto the native path while
    keeping the first implementation pragmatic: metadata rows are still
    produced by the existing Java Iceberg scanner, but planning, privilege
    checks, and scan-node routing now match the native execution model.
    Follow-up cleanup in this PR also removes the now-dead TVF metadata path
    and tightens scanner/schema handling to match the simplified execution
    model.
    
    ## Architecture After Refactoring
    
    ### Native SysTable Path
    
    ```
    User: SELECT * FROM table$snapshots
    
    BindRelation / SysTableResolver
      -> IcebergSysTable.createSysExternalTable(...)
      -> IcebergSysExternalTable
      -> LogicalFileScan
      -> IcebergScanNode
      -> FORMAT_JNI file ranges with serialized_split
      -> IcebergSysTableJniReader / IcebergSysTableJniScanner
    ```
    
    ### Key Changes
    
    - `IcebergSysTable` now extends `NativeSysTable` instead of
    `TvfSysTable`
    - `IcebergSysExternalTable` wraps the source `IcebergExternalTable`,
    derives schema from the Iceberg metadata table, reuses the source table
    descriptor for thrift, and now caches sys-table schema instead of
    rebuilding it repeatedly
    - `IcebergApiSource` / `IcebergScanNode` now recognize both normal
    Iceberg external tables and Iceberg system tables
    - `TIcebergFileDesc` adds `serialized_split`; FE sends one serialized
    split per system-table range
    - BE `file_scanner.cpp` now routes `FORMAT_JNI +
    table_format_type=iceberg` to `IcebergSysTableJniReader`
    - `IcebergSysTableJniReader` and `IcebergSysTableJniScanner` now only
    serve the native file-scanner path; the old `serialized_splits` /
    metadata-scan compatibility has been removed
    - `IcebergSysTableJniScanner` now reads projected columns by
    source-schema index instead of projection order, fixing incorrect reads
    when projected column order differs from the underlying schema order
    - `PhysicalPlanTranslator`, `UserAuthentication`, and `SHOW CREATE
    TABLE` now resolve Iceberg system tables through the source table
    correctly
    
    ### Cleanup After Refactoring
    
    - Remove `IcebergTableValuedFunction`
    - Remove Nereids `IcebergMeta`
    - Remove builtin `iceberg_meta()` registration
    - Remove the obsolete Iceberg branch from `MetaScanner`; Iceberg
    metadata queries no longer go through `MetadataScanNode`
    - Simplify the Iceberg JNI metadata reader/scanner to single-split
    semantics, matching `FileQueryScanNode -> file_scanner` ownership of
    split fanout
    - Align related Paimon system-table cleanup by caching
    `PaimonSysExternalTable` schema and adding explicit projected-column
    validation in `PaimonJniScanner`
    
    ### Regression Coverage Updates
    
    - Delete the obsolete `test_iceberg_meta` regression case
    - Migrate Iceberg system-table regression checks to direct
    `$system_table` access
    - Update `test_iceberg_sys_table` to use runtime assertions instead of
    unstable `.out` row matching, following the existing Paimon system-table
    testing style
---
 be/src/exec/scan/file_scanner.cpp                  |   6 +
 be/src/exec/scan/meta_scanner.cpp                  |  11 +-
 .../format/table/iceberg_sys_table_jni_reader.cpp  |  36 +++-
 be/src/format/table/iceberg_sys_table_jni_reader.h |   7 +-
 docker/thirdparties/run-thirdparties-docker.sh     |   5 +-
 .../doris/iceberg/IcebergSysTableJniScanner.java   |  60 +++---
 .../org/apache/doris/paimon/PaimonJniScanner.java  |   6 +-
 .../doris/catalog/BuiltinTableValuedFunctions.java |   2 -
 .../main/java/org/apache/doris/catalog/Env.java    |  19 +-
 .../datasource/iceberg/IcebergExternalTable.java   |  13 ++
 .../iceberg/IcebergSysExternalTable.java           | 177 ++++++++++++++++++
 .../iceberg/source/IcebergApiSource.java           |  35 ++--
 .../datasource/iceberg/source/IcebergScanNode.java |  54 +++++-
 .../datasource/iceberg/source/IcebergSplit.java    |  12 ++
 .../datasource/paimon/PaimonSysExternalTable.java  |  73 +++++---
 .../doris/datasource/systable/IcebergSysTable.java |  53 +++---
 .../apache/doris/datasource/systable/SysTable.java |   2 +-
 .../doris/datasource/systable/TvfSysTable.java     |   4 +-
 .../glue/translator/PhysicalPlanTranslator.java    |   3 +-
 .../nereids/rules/analysis/UserAuthentication.java |   4 +
 .../expressions/functions/table/IcebergMeta.java   |  67 -------
 .../visitor/TableValuedFunctionVisitor.java        |   5 -
 .../plans/commands/ShowCreateTableCommand.java     |  45 ++++-
 .../trees/plans/logical/LogicalFileScan.java       |   3 +-
 .../tablefunction/IcebergTableValuedFunction.java  | 168 -----------------
 .../doris/tablefunction/TableValuedFunctionIf.java |   2 -
 .../systable/IcebergSysTableResolverTest.java      |  99 ++++++++++
 .../rules/analysis/UserAuthenticationTest.java     |  61 +++++++
 gensrc/thrift/PlanNodes.thrift                     |   1 +
 .../external_table_p2/tvf/test_iceberg_meta.out    |  22 ---
 .../iceberg_branch_retention_and_snapshot.groovy   |  18 +-
 .../iceberg_tag_retention_and_consistency.groovy   |  20 +-
 .../iceberg/iceberg_branch_tag_operate.groovy      |   2 +-
 ...st_iceberg_schema_change_with_timetravel.groovy |   2 +-
 .../iceberg/test_iceberg_sys_table.groovy          | 202 +++++++--------------
 .../iceberg/test_iceberg_sys_table_auth.groovy     |  64 -------
 .../external_table_p0/polaris/test_polaris.groovy  |   5 +-
 .../iceberg_rest_on_hdfs.groovy                    |   5 +-
 .../iceberg_on_hms_and_filesystem_and_dlf.groovy   |  10 +-
 .../iceberg_rest_s3_storage_vended_test.groovy     |   5 +-
 .../iceberg_rest_storage_test.groovy               |   5 +-
 .../external_table_p2/tvf/test_iceberg_meta.groovy |  90 ---------
 42 files changed, 748 insertions(+), 735 deletions(-)

diff --git a/be/src/exec/scan/file_scanner.cpp 
b/be/src/exec/scan/file_scanner.cpp
index ed74cba7d7e..64a0616674f 100644
--- a/be/src/exec/scan/file_scanner.cpp
+++ b/be/src/exec/scan/file_scanner.cpp
@@ -67,6 +67,7 @@
 #include "format/table/hudi_jni_reader.h"
 #include "format/table/hudi_reader.h"
 #include "format/table/iceberg_reader.h"
+#include "format/table/iceberg_sys_table_jni_reader.h"
 #include "format/table/jdbc_jni_reader.h"
 #include "format/table/max_compute_jni_reader.h"
 #include "format/table/paimon_cpp_reader.h"
@@ -1049,6 +1050,11 @@ Status FileScanner::_get_next_reader() {
                 _cur_reader = JdbcJniReader::create_unique(_file_slot_descs, 
_state, _profile,
                                                            jdbc_params);
                 init_status = 
((JdbcJniReader*)(_cur_reader.get()))->init_reader();
+            } else if (range.__isset.table_format_params &&
+                       range.table_format_params.table_format_type == 
"iceberg") {
+                _cur_reader = 
IcebergSysTableJniReader::create_unique(_file_slot_descs, _state,
+                                                                      
_profile, range, _params);
+                init_status = 
((IcebergSysTableJniReader*)(_cur_reader.get()))->init_reader();
             }
             // Set col_name_to_block_idx for JNI readers to avoid repeated map 
creation
             if (_cur_reader) {
diff --git a/be/src/exec/scan/meta_scanner.cpp 
b/be/src/exec/scan/meta_scanner.cpp
index 7060cdac583..b51d7d3c08b 100644
--- a/be/src/exec/scan/meta_scanner.cpp
+++ b/be/src/exec/scan/meta_scanner.cpp
@@ -37,7 +37,6 @@
 #include "core/column/column_vector.h"
 #include "core/data_type/define_primitive_type.h"
 #include "core/types.h"
-#include "format/table/iceberg_sys_table_jni_reader.h"
 #include "format/table/parquet_metadata_reader.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
@@ -65,15 +64,7 @@ MetaScanner::MetaScanner(RuntimeState* state, 
ScanLocalStateBase* local_state, T
 Status MetaScanner::_open_impl(RuntimeState* state) {
     VLOG_CRITICAL << "MetaScanner::open";
     RETURN_IF_ERROR(Scanner::_open_impl(state));
-    if (_scan_range.meta_scan_range.metadata_type == TMetadataType::ICEBERG) {
-        // TODO: refactor this code
-        auto reader = 
IcebergSysTableJniReader::create_unique(_tuple_desc->slots(), state, _profile,
-                                                              
_scan_range.meta_scan_range);
-        RETURN_IF_ERROR(reader->init_reader());
-        static_cast<IcebergSysTableJniReader*>(reader.get())
-                ->set_col_name_to_block_idx(&_src_block_name_to_idx);
-        _reader = std::move(reader);
-    } else if (_scan_range.meta_scan_range.metadata_type == 
TMetadataType::PARQUET) {
+    if (_scan_range.meta_scan_range.metadata_type == TMetadataType::PARQUET) {
         auto reader = 
ParquetMetadataReader::create_unique(_tuple_desc->slots(), state, _profile,
                                                            
_scan_range.meta_scan_range);
         RETURN_IF_ERROR(reader->init_reader());
diff --git a/be/src/format/table/iceberg_sys_table_jni_reader.cpp 
b/be/src/format/table/iceberg_sys_table_jni_reader.cpp
index 35bf92db3c9..499069a0a93 100644
--- a/be/src/format/table/iceberg_sys_table_jni_reader.cpp
+++ b/be/src/format/table/iceberg_sys_table_jni_reader.cpp
@@ -17,6 +17,7 @@
 
 #include "iceberg_sys_table_jni_reader.h"
 
+#include "common/logging.h"
 #include "format/jni/jni_data_bridge.h"
 #include "runtime/runtime_state.h"
 #include "util/string_util.h"
@@ -26,9 +27,27 @@ namespace doris {
 
 static const std::string HADOOP_OPTION_PREFIX = "hadoop.";
 
+Status IcebergSysTableJniReader::validate_scan_range(const TFileRangeDesc& 
range) {
+    if (!range.__isset.table_format_params) {
+        return Status::InternalError(
+                "missing table_format_params for iceberg sys table jni 
reader");
+    }
+    if (!range.table_format_params.__isset.iceberg_params) {
+        return Status::InternalError("missing iceberg_params for iceberg sys 
table jni reader");
+    }
+    if (!range.table_format_params.iceberg_params.__isset.serialized_split ||
+        range.table_format_params.iceberg_params.serialized_split.empty()) {
+        return Status::InternalError(
+                "missing serialized_split for iceberg sys table jni reader, "
+                "possibly caused by FE/BE protocol mismatch");
+    }
+    return Status::OK();
+}
+
 IcebergSysTableJniReader::IcebergSysTableJniReader(
         const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* 
state,
-        RuntimeProfile* profile, const TMetaScanRange& meta_scan_range)
+        RuntimeProfile* profile, const TFileRangeDesc& range,
+        const TFileScanRangeParams* range_params)
         : JniReader(
                   file_slot_descs, state, profile,
                   "org/apache/doris/iceberg/IcebergSysTableJniScanner",
@@ -41,12 +60,16 @@ IcebergSysTableJniReader::IcebergSysTableJniReader(
                                   
JniDataBridge::get_jni_type_with_different_string(desc->type()));
                       }
                       std::map<std::string, std::string> params;
-                      params["serialized_splits"] = 
join(meta_scan_range.serialized_splits, ",");
+                      params["serialized_split"] =
+                              
range.table_format_params.iceberg_params.serialized_split;
                       params["required_fields"] = join(required_fields, ",");
                       params["required_types"] = join(required_types, "#");
                       params["time_zone"] = state->timezone();
-                      for (const auto& kv : meta_scan_range.hadoop_props) {
-                          params[HADOOP_OPTION_PREFIX + kv.first] = kv.second;
+                      if (range_params != nullptr && 
range_params->__isset.properties &&
+                          !range_params->properties.empty()) {
+                          for (const auto& kv : range_params->properties) {
+                              params[HADOOP_OPTION_PREFIX + kv.first] = 
kv.second;
+                          }
                       }
                       return params;
                   }(),
@@ -56,9 +79,12 @@ IcebergSysTableJniReader::IcebergSysTableJniReader(
                           names.emplace_back(desc->col_name());
                       }
                       return names;
-                  }()) {}
+                  }(),
+                  range.__isset.self_split_weight ? range.self_split_weight : 
-1),
+          _init_status(validate_scan_range(range)) {}
 
 Status IcebergSysTableJniReader::init_reader() {
+    RETURN_IF_ERROR(_init_status);
     return open(_state, _profile);
 }
 
diff --git a/be/src/format/table/iceberg_sys_table_jni_reader.h 
b/be/src/format/table/iceberg_sys_table_jni_reader.h
index 1d52bdc1cf6..f5bc69f6776 100644
--- a/be/src/format/table/iceberg_sys_table_jni_reader.h
+++ b/be/src/format/table/iceberg_sys_table_jni_reader.h
@@ -45,11 +45,16 @@ class IcebergSysTableJniReader : public JniReader {
 public:
     IcebergSysTableJniReader(const std::vector<SlotDescriptor*>& 
file_slot_descs,
                              RuntimeState* state, RuntimeProfile* profile,
-                             const TMetaScanRange& meta_scan_range);
+                             const TFileRangeDesc& range, const 
TFileScanRangeParams* range_params);
 
     ~IcebergSysTableJniReader() override = default;
 
+    static Status validate_scan_range(const TFileRangeDesc& range);
+
     Status init_reader();
+
+private:
+    Status _init_status;
 };
 
 #include "common/compile_check_end.h"
diff --git a/docker/thirdparties/run-thirdparties-docker.sh 
b/docker/thirdparties/run-thirdparties-docker.sh
index 12e2e9b7ba4..15757f57fa4 100755
--- a/docker/thirdparties/run-thirdparties-docker.sh
+++ b/docker/thirdparties/run-thirdparties-docker.sh
@@ -240,7 +240,6 @@ JUICEFS_LOCAL_BIN="${JUICEFS_RUNTIME_ROOT}/bin/juicefs"
 find_juicefs_hadoop_jar() {
     local -a jar_globs=(
         "${JUICEFS_RUNTIME_ROOT}/lib/juicefs-hadoop-[0-9]*.jar"
-        "${ROOT}/docker-compose/hive/scripts/auxlib/juicefs-hadoop-[0-9]*.jar"
         
"${DORIS_ROOT}/thirdparty/installed/juicefs_libs/juicefs-hadoop-[0-9]*.jar"
         "${DORIS_ROOT}/output/fe/lib/juicefs/juicefs-hadoop-[0-9]*.jar"
         
"${DORIS_ROOT}/output/be/lib/java_extensions/juicefs/juicefs-hadoop-[0-9]*.jar"
@@ -359,6 +358,10 @@ ensure_juicefs_hadoop_jar_for_hive() {
     fi
 
     mkdir -p "${auxlib_dir}"
+    if [[ "${source_jar}" == "${auxlib_dir}/$(basename "${source_jar}")" ]]; 
then
+        echo "JuiceFS Hadoop jar already exists in hive auxlib: $(basename 
"${source_jar}")"
+        return 0
+    fi
     cp -f "${source_jar}" "${auxlib_dir}/"
     echo "Synced JuiceFS Hadoop jar to hive auxlib: $(basename 
"${source_jar}")"
 }
diff --git 
a/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableJniScanner.java
 
b/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableJniScanner.java
index d1b648a9801..b014a42706f 100644
--- 
a/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableJniScanner.java
+++ 
b/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableJniScanner.java
@@ -36,8 +36,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.TimeZone;
@@ -51,21 +49,19 @@ public class IcebergSysTableJniScanner extends JniScanner {
     private static final String HADOOP_OPTION_PREFIX = "hadoop.";
     private final ClassLoader classLoader;
     private final PreExecutionAuthenticator preExecutionAuthenticator;
-    private final Iterator<FileScanTask> scanTasks;
-    private final List<NestedField> fields;
+    private final FileScanTask scanTask;
+    private final List<SelectedField> fields;
     private final String timezone;
     private CloseableIterator<StructLike> reader;
 
     public IcebergSysTableJniScanner(int batchSize, Map<String, String> 
params) {
         this.classLoader = this.getClass().getClassLoader();
-        List<FileScanTask> scanTasks = 
Arrays.stream(params.get("serialized_splits").split(","))
-                .map(SerializationUtil::deserializeFromBase64)
-                .map(obj -> (FileScanTask) obj)
-                .collect(Collectors.toList());
-        Preconditions.checkState(!scanTasks.isEmpty(), "scanTasks shoudle not 
be empty");
-        this.scanTasks = scanTasks.iterator();
+        String serializedSplitParams = params.get("serialized_split");
+        Preconditions.checkArgument(serializedSplitParams != null && 
!serializedSplitParams.isEmpty(),
+                "serialized_split should not be empty");
+        this.scanTask = 
SerializationUtil.deserializeFromBase64(serializedSplitParams);
         String[] requiredFields = params.get("required_fields").split(",");
-        this.fields = selectSchema(scanTasks.get(0).schema().asStruct(), 
requiredFields);
+        this.fields = selectSchema(scanTask.schema().asStruct(), 
requiredFields);
         this.timezone = params.getOrDefault("time_zone", 
TimeZone.getDefault().getID());
         Map<String, String> hadoopOptionParams = params.entrySet().stream()
                 .filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX))
@@ -79,13 +75,11 @@ public class IcebergSysTableJniScanner extends JniScanner {
     @Override
     public void open() throws IOException {
         try (ThreadClassLoaderContext ignored = new 
ThreadClassLoaderContext(classLoader)) {
-            nextScanTask();
+            openReader();
         }
     }
 
-    private void nextScanTask() throws IOException {
-        Preconditions.checkArgument(scanTasks.hasNext());
-        FileScanTask scanTask = scanTasks.next();
+    private void openReader() throws IOException {
         try {
             try (ThreadClassLoaderContext ignored = new 
ThreadClassLoaderContext(classLoader)) {
                 preExecutionAuthenticator.execute(() -> {
@@ -96,7 +90,7 @@ public class IcebergSysTableJniScanner extends JniScanner {
             }
         } catch (Exception e) {
             this.close();
-            String msg = String.format("Failed to open next scan task: %s", 
scanTask);
+            String msg = String.format("Failed to open scan task: %s", 
scanTask);
             LOG.error(msg, e);
             throw new IOException(msg, e);
         }
@@ -107,26 +101,20 @@ public class IcebergSysTableJniScanner extends JniScanner 
{
         try (ThreadClassLoaderContext ignored = new 
ThreadClassLoaderContext(classLoader)) {
             int rows = 0;
             long startAppendDataTime = System.nanoTime();
-            long scanTime = 0;
             while (rows < getBatchSize()) {
-                while (!reader.hasNext() && scanTasks.hasNext()) {
-                    long startScanTaskTime = System.nanoTime();
-                    nextScanTask();
-                    scanTime = System.nanoTime() - startScanTaskTime;
-                }
                 if (!reader.hasNext()) {
                     break;
                 }
                 StructLike row = reader.next();
                 for (int i = 0; i < fields.size(); i++) {
-                    NestedField field = fields.get(i);
-                    Object value = row.get(i, 
field.type().typeId().javaClass());
+                    SelectedField field = fields.get(i);
+                    Object value = row.get(field.sourceIndex, 
field.field.type().typeId().javaClass());
                     ColumnValue columnValue = new 
IcebergSysTableColumnValue(value, timezone);
                     appendData(i, columnValue);
                 }
                 rows++;
             }
-            appendDataTime += System.nanoTime() - startAppendDataTime - 
scanTime;
+            appendDataTime += System.nanoTime() - startAppendDataTime;
             return rows;
         }
     }
@@ -141,18 +129,34 @@ public class IcebergSysTableJniScanner extends JniScanner 
{
         }
     }
 
-    private static List<NestedField> selectSchema(StructType schema, String[] 
requiredFields) {
-        List<NestedField> selectedFields = new ArrayList<>();
+    private static List<SelectedField> selectSchema(StructType schema, 
String[] requiredFields) {
+        List<NestedField> schemaFields = schema.fields();
+        List<SelectedField> selectedFields = new ArrayList<>();
         for (String requiredField : requiredFields) {
             NestedField field = schema.field(requiredField);
             if (field == null) {
                 throw new IllegalArgumentException("RequiredField " + 
requiredField + " not found in schema");
             }
-            selectedFields.add(field);
+            int sourceIndex = schemaFields.indexOf(field);
+            if (sourceIndex < 0) {
+                throw new IllegalArgumentException(
+                        "RequiredField " + requiredField + " not found in 
source schema fields");
+            }
+            selectedFields.add(new SelectedField(sourceIndex, field));
         }
         return selectedFields;
     }
 
+    private static final class SelectedField {
+        private final int sourceIndex;
+        private final NestedField field;
+
+        private SelectedField(int sourceIndex, NestedField field) {
+            this.sourceIndex = sourceIndex;
+            this.field = field;
+        }
+    }
+
     private static ColumnType[] parseRequiredTypes(String[] typeStrings, 
String[] requiredFields) {
         ColumnType[] requiredTypes = new ColumnType[typeStrings.length];
         for (int i = 0; i < typeStrings.length; i++) {
diff --git 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
index 8f64a51dc9b..081b5a554c6 100644
--- 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
+++ 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
@@ -123,7 +123,11 @@ public class PaimonJniScanner extends JniScanner {
     }
 
     private int[] getProjected() {
-        return 
Arrays.stream(fields).mapToInt(paimonAllFieldNames::indexOf).toArray();
+        return Arrays.stream(fields).mapToInt(fieldName -> {
+            int index = paimonAllFieldNames.indexOf(fieldName);
+            Preconditions.checkArgument(index >= 0, "RequiredField %s not 
found in schema", fieldName);
+            return index;
+        }).toArray();
     }
 
     private List<Predicate> getPredicates() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
index a55ebedce9f..ca3a5b688c9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
@@ -28,7 +28,6 @@ import 
org.apache.doris.nereids.trees.expressions.functions.table.Hdfs;
 import org.apache.doris.nereids.trees.expressions.functions.table.Http;
 import org.apache.doris.nereids.trees.expressions.functions.table.HttpStream;
 import org.apache.doris.nereids.trees.expressions.functions.table.HudiMeta;
-import org.apache.doris.nereids.trees.expressions.functions.table.IcebergMeta;
 import org.apache.doris.nereids.trees.expressions.functions.table.Jobs;
 import org.apache.doris.nereids.trees.expressions.functions.table.Local;
 import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos;
@@ -60,7 +59,6 @@ public class BuiltinTableValuedFunctions implements 
FunctionHelper {
             tableValued(GroupCommit.class, "group_commit"),
             tableValued(Local.class, "local"),
             tableValued(HudiMeta.class, "hudi_meta"),
-            tableValued(IcebergMeta.class, "iceberg_meta"),
             tableValued(Hdfs.class, "hdfs"),
             tableValued(HttpStream.class, "http_stream"),
             tableValued(Numbers.class, "numbers"),
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index b62258262e3..cf5849e247a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -101,6 +101,7 @@ import org.apache.doris.datasource.es.EsExternalCatalog;
 import org.apache.doris.datasource.hive.HiveTransactionMgr;
 import org.apache.doris.datasource.hive.event.MetastoreEventsProcessor;
 import org.apache.doris.datasource.iceberg.IcebergExternalTable;
+import org.apache.doris.datasource.iceberg.IcebergSysExternalTable;
 import org.apache.doris.datasource.jdbc.JdbcExternalTable;
 import org.apache.doris.datasource.paimon.PaimonExternalTable;
 import org.apache.doris.datasource.paimon.PaimonSysExternalTable;
@@ -4344,7 +4345,14 @@ public class Env {
             sb.append("\n-- Internal JDBC tables are deprecated. Please use 
JDBC Catalog instead.");
         } else if (table.getType() == TableType.ICEBERG_EXTERNAL_TABLE) {
             addTableComment(table, sb);
-            IcebergExternalTable icebergExternalTable = (IcebergExternalTable) 
table;
+            IcebergExternalTable icebergExternalTable;
+            if (table instanceof IcebergExternalTable) {
+                icebergExternalTable = (IcebergExternalTable) table;
+            } else if (table instanceof IcebergSysExternalTable) {
+                icebergExternalTable = ((IcebergSysExternalTable) 
table).getSourceTable();
+            } else {
+                throw new RuntimeException("Unexpected Iceberg table type: " + 
table.getClass().getSimpleName());
+            }
             if (icebergExternalTable.hasSortOrder()) {
                 sb.append("\n").append(icebergExternalTable.getSortOrderSql());
             }
@@ -4731,7 +4739,14 @@ public class Env {
             sb.append("\n-- Internal JDBC tables are deprecated. Please use 
JDBC Catalog instead.");
         } else if (table.getType() == TableType.ICEBERG_EXTERNAL_TABLE) {
             addTableComment(table, sb);
-            IcebergExternalTable icebergExternalTable = (IcebergExternalTable) 
table;
+            IcebergExternalTable icebergExternalTable;
+            if (table instanceof IcebergExternalTable) {
+                icebergExternalTable = (IcebergExternalTable) table;
+            } else if (table instanceof IcebergSysExternalTable) {
+                icebergExternalTable = ((IcebergSysExternalTable) 
table).getSourceTable();
+            } else {
+                throw new RuntimeException("Unexpected Iceberg table type: " + 
table.getClass().getSimpleName());
+            }
             if (icebergExternalTable.hasSortOrder()) {
                 sb.append("\n").append(icebergExternalTable.getSortOrderSql());
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
index ffdec64d3d9..6a3faf49949 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
@@ -318,6 +318,19 @@ public class IcebergExternalTable extends ExternalTable 
implements MTMVRelatedTa
         return IcebergSysTable.SUPPORTED_SYS_TABLES;
     }
 
+    @Override
+    public Optional<SysTable> findSysTable(String tableNameWithSysTableName) {
+        Optional<SysTable> sysTable = 
MTMVRelatedTableIf.super.findSysTable(tableNameWithSysTableName);
+        if (sysTable.isPresent()) {
+            return sysTable;
+        }
+        String sysTableName = 
SysTable.getTableNameWithSysTableName(tableNameWithSysTableName).second;
+        if (IcebergSysTable.POSITION_DELETES.equals(sysTableName)) {
+            return 
Optional.of(IcebergSysTable.UNSUPPORTED_POSITION_DELETES_TABLE);
+        }
+        return Optional.empty();
+    }
+
     @Override
     public boolean isView() {
         makeSureInitialized();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSysExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSysExternalTable.java
new file mode 100644
index 00000000000..aeb539d1f3f
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSysExternalTable.java
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.NameMapping;
+import org.apache.doris.datasource.SchemaCacheKey;
+import org.apache.doris.datasource.SchemaCacheValue;
+import org.apache.doris.datasource.systable.SysTable;
+import org.apache.doris.statistics.AnalysisInfo;
+import org.apache.doris.statistics.BaseAnalysisTask;
+import org.apache.doris.statistics.ExternalAnalysisTask;
+import org.apache.doris.thrift.THiveTable;
+import org.apache.doris.thrift.TIcebergTable;
+import org.apache.doris.thrift.TTableDescriptor;
+import org.apache.doris.thrift.TTableType;
+
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.Table;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class IcebergSysExternalTable extends ExternalTable {
+    private final IcebergExternalTable sourceTable;
+    private final String sysTableType;
+    private volatile Table sysIcebergTable;
+    private volatile List<Column> fullSchema;
+    private volatile SchemaCacheValue schemaCacheValue;
+
+    public IcebergSysExternalTable(IcebergExternalTable sourceTable, String 
sysTableType) {
+        super(generateSysTableId(sourceTable.getId(), sysTableType),
+                sourceTable.getName() + "$" + sysTableType,
+                sourceTable.getRemoteName() + "$" + sysTableType,
+                (IcebergExternalCatalog) sourceTable.getCatalog(),
+                (IcebergExternalDatabase) sourceTable.getDatabase(),
+                TableIf.TableType.ICEBERG_EXTERNAL_TABLE);
+        this.sourceTable = sourceTable;
+        this.sysTableType = sysTableType;
+    }
+
+    @Override
+    public String getMetaCacheEngine() {
+        return sourceTable.getMetaCacheEngine();
+    }
+
+    @Override
+    protected synchronized void makeSureInitialized() {
+        super.makeSureInitialized();
+        if (!objectCreated) {
+            objectCreated = true;
+        }
+    }
+
+    public IcebergExternalTable getSourceTable() {
+        return sourceTable;
+    }
+
+    public String getSysTableType() {
+        return sysTableType;
+    }
+
+    public Table getSysIcebergTable() {
+        if (sysIcebergTable == null) {
+            synchronized (this) {
+                if (sysIcebergTable == null) {
+                    Table baseTable = sourceTable.getIcebergTable();
+                    MetadataTableType tableType = 
MetadataTableType.from(sysTableType);
+                    if (tableType == null) {
+                        throw new IllegalArgumentException("Unknown iceberg 
system table type: " + sysTableType);
+                    }
+                    sysIcebergTable = 
MetadataTableUtils.createMetadataTableInstance(baseTable, tableType);
+                }
+            }
+        }
+        return sysIcebergTable;
+    }
+
+    @Override
+    public List<Column> getFullSchema() {
+        return getOrCreateSchemaCacheValue().getSchema();
+    }
+
+    @Override
+    public NameMapping getOrBuildNameMapping() {
+        return sourceTable.getOrBuildNameMapping();
+    }
+
+    @Override
+    public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
+        makeSureInitialized();
+        return new ExternalAnalysisTask(info);
+    }
+
+    @Override
+    public TTableDescriptor toThrift() {
+        List<Column> schema = getFullSchema();
+        if (sourceTable.getIcebergCatalogType().equals("hms")) {
+            THiveTable tHiveTable = new THiveTable(getDbName(), getName(), new 
HashMap<>());
+            TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), 
TTableType.HIVE_TABLE, schema.size(), 0,
+                    getName(), getDbName());
+            tTableDescriptor.setHiveTable(tHiveTable);
+            return tTableDescriptor;
+        } else {
+            TIcebergTable icebergTable = new TIcebergTable(getDbName(), 
getName(), new HashMap<>());
+            TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), 
TTableType.ICEBERG_TABLE,
+                    schema.size(), 0, getName(), getDbName());
+            tTableDescriptor.setIcebergTable(icebergTable);
+            return tTableDescriptor;
+        }
+    }
+
+    @Override
+    public long fetchRowCount() {
+        return UNKNOWN_ROW_COUNT;
+    }
+
+    @Override
+    public Optional<SchemaCacheValue> initSchema(SchemaCacheKey key) {
+        return Optional.of(getOrCreateSchemaCacheValue());
+    }
+
+    @Override
+    public Optional<SchemaCacheValue> getSchemaCacheValue() {
+        return Optional.of(getOrCreateSchemaCacheValue());
+    }
+
+    @Override
+    public Map<String, SysTable> getSupportedSysTables() {
+        return sourceTable.getSupportedSysTables();
+    }
+
+    @Override
+    public String getComment() {
+        return "Iceberg system table: " + sysTableType + " for " + 
sourceTable.getName();
+    }
+
+    private static long generateSysTableId(long sourceTableId, String 
sysTableType) {
+        return sourceTableId ^ (sysTableType.hashCode() * 31L);
+    }
+
+    private SchemaCacheValue getOrCreateSchemaCacheValue() {
+        if (schemaCacheValue == null) {
+            synchronized (this) {
+                if (schemaCacheValue == null) {
+                    if (fullSchema == null) {
+                        fullSchema = 
IcebergUtils.parseSchema(getSysIcebergTable().schema(),
+                                getCatalog().getEnableMappingVarbinary(),
+                                getCatalog().getEnableMappingTimestampTz());
+                    }
+                    schemaCacheValue = new SchemaCacheValue(fullSchema);
+                }
+            }
+        }
+        return schemaCacheValue;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
index c233f732106..c1626e915c6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
@@ -21,7 +21,9 @@ import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.iceberg.IcebergExternalTable;
+import org.apache.doris.datasource.iceberg.IcebergSysExternalTable;
 import org.apache.doris.datasource.iceberg.IcebergUtils;
 import org.apache.doris.planner.ColumnRange;
 
@@ -34,26 +36,25 @@ import java.util.Map;
  */
 public class IcebergApiSource implements IcebergSource {
 
-    private final IcebergExternalTable icebergExtTable;
+    private final ExternalTable targetTable;
     private final Table originTable;
-
     private final TupleDescriptor desc;
 
-    public IcebergApiSource(IcebergExternalTable table, TupleDescriptor desc,
+    public IcebergApiSource(ExternalTable table, TupleDescriptor desc,
                             Map<String, ColumnRange> columnNameToRange) {
-        // Theoretically, the IcebergScanNode is responsible for scanning data 
from physical tables.
-        // Views should not reach this point.
-        // By adding this validation, we aim to ensure that if a view query 
does end up here, it indicates a bug.
-        // This helps us identify issues promptly.
-
-        // when use legacy planner, query an iceberg view will enter this
-        // we should set enable_fallback_to_original_planner=false
-        // so that it will throw exception by first planner
-        if (table.isView()) {
-            throw new UnsupportedOperationException("IcebergApiSource does not 
support view");
+        if (table instanceof IcebergExternalTable) {
+            IcebergExternalTable icebergExtTable = (IcebergExternalTable) 
table;
+            if (icebergExtTable.isView()) {
+                throw new UnsupportedOperationException("IcebergApiSource does 
not support view");
+            }
+            this.originTable = IcebergUtils.getIcebergTable(icebergExtTable);
+        } else if (table instanceof IcebergSysExternalTable) {
+            this.originTable = ((IcebergSysExternalTable) 
table).getSysIcebergTable();
+        } else {
+            throw new IllegalArgumentException(
+                    "Expected Iceberg table but got " + 
table.getClass().getSimpleName());
         }
-        this.icebergExtTable = table;
-        this.originTable = IcebergUtils.getIcebergTable(icebergExtTable);
+        this.targetTable = table;
         this.desc = desc;
     }
 
@@ -74,12 +75,12 @@ public class IcebergApiSource implements IcebergSource {
 
     @Override
     public TableIf getTargetTable() {
-        return icebergExtTable;
+        return targetTable;
     }
 
     @Override
     public ExternalCatalog getCatalog() {
-        return icebergExtTable.getCatalog();
+        return targetTable.getCatalog();
     }
 
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index 15883744cf3..121dc0ad152 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -41,6 +41,7 @@ import org.apache.doris.datasource.hive.HMSExternalTable;
 import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
 import org.apache.doris.datasource.iceberg.IcebergExternalMetaCache;
 import org.apache.doris.datasource.iceberg.IcebergExternalTable;
+import org.apache.doris.datasource.iceberg.IcebergSysExternalTable;
 import org.apache.doris.datasource.iceberg.IcebergUtils;
 import org.apache.doris.datasource.iceberg.cache.IcebergManifestCacheLoader;
 import org.apache.doris.datasource.iceberg.cache.ManifestCacheValue;
@@ -96,6 +97,7 @@ import org.apache.iceberg.mapping.MappedFields;
 import org.apache.iceberg.mapping.NameMapping;
 import org.apache.iceberg.mapping.NameMappingParser;
 import org.apache.iceberg.util.ScanTaskUtil;
+import org.apache.iceberg.util.SerializationUtil;
 import org.apache.iceberg.util.TableScanUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -158,6 +160,7 @@ public class IcebergScanNode extends FileQueryScanNode {
     private String cachedFsIdentifier;
 
     private Boolean isBatchMode = null;
+    private boolean isSystemTable = false;
 
     // ReferencedDataFile path -> List<DeleteFile> / 
List<TIcebergDeleteFileDesc> (exclude equal delete)
     public Map<String, List<DeleteFile>> deleteFilesByReferencedDataFile = new 
HashMap<>();
@@ -182,8 +185,13 @@ public class IcebergScanNode extends FileQueryScanNode {
         ExternalTable table = (ExternalTable) desc.getTable();
         if (table instanceof HMSExternalTable) {
             source = new IcebergHMSSource((HMSExternalTable) table, desc);
-        } else if (table instanceof IcebergExternalTable) {
-            String catalogType = ((IcebergExternalTable) 
table).getIcebergCatalogType();
+        } else if (table instanceof IcebergExternalTable || table instanceof 
IcebergSysExternalTable) {
+            if (table instanceof IcebergSysExternalTable) {
+                isSystemTable = true;
+            }
+            String catalogType = table instanceof IcebergExternalTable
+                    ? ((IcebergExternalTable) table).getIcebergCatalogType()
+                    : ((IcebergSysExternalTable) 
table).getSourceTable().getIcebergCatalogType();
             switch (catalogType) {
                 case IcebergExternalCatalog.ICEBERG_HMS:
                 case IcebergExternalCatalog.ICEBERG_REST:
@@ -192,7 +200,7 @@ public class IcebergScanNode extends FileQueryScanNode {
                 case IcebergExternalCatalog.ICEBERG_HADOOP:
                 case IcebergExternalCatalog.ICEBERG_JDBC:
                 case IcebergExternalCatalog.ICEBERG_S3_TABLES:
-                    source = new IcebergApiSource((IcebergExternalTable) 
table, desc, columnNameToRange);
+                    source = new IcebergApiSource(table, desc, 
columnNameToRange);
                     break;
                 default:
                     Preconditions.checkState(false, "Unknown iceberg catalog 
type: " + catalogType);
@@ -269,13 +277,21 @@ public class IcebergScanNode extends FileQueryScanNode {
     private void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit 
icebergSplit) {
         TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
         
tableFormatFileDesc.setTableFormatType(icebergSplit.getTableFormatType().value());
+        TIcebergFileDesc fileDesc = new TIcebergFileDesc();
+        if (isSystemTable) {
+            rangeDesc.setFormatType(TFileFormatType.FORMAT_JNI);
+            tableFormatFileDesc.setTableLevelRowCount(-1);
+            fileDesc.setSerializedSplit(icebergSplit.getSerializedSplit());
+            tableFormatFileDesc.setIcebergParams(fileDesc);
+            rangeDesc.setTableFormatParams(tableFormatFileDesc);
+            return;
+        }
         if (tableLevelPushDownCount) {
             
tableFormatFileDesc.setTableLevelRowCount(icebergSplit.getTableLevelRowCount());
         } else {
             // MUST explicitly set to -1, to be distinct from valid row count 
>= 0
             tableFormatFileDesc.setTableLevelRowCount(-1);
         }
-        TIcebergFileDesc fileDesc = new TIcebergFileDesc();
         fileDesc.setFormatVersion(formatVersion);
         fileDesc.setOriginalFilePath(icebergSplit.getOriginalPath());
         if (icebergSplit.getPartitionSpecId() != null) {
@@ -853,7 +869,18 @@ public class IcebergScanNode extends FileQueryScanNode {
         return split;
     }
 
+    private Split createIcebergSysSplit(FileScanTask fileScanTask) {
+        long rowCount = fileScanTask.file() == null ? 1 : 
fileScanTask.file().recordCount();
+        IcebergSplit split = IcebergSplit.newSysTableSplit(
+                SerializationUtil.serializeToBase64(fileScanTask), rowCount);
+        split.setTableFormatType(TableFormatType.ICEBERG);
+        return split;
+    }
+
     private List<Split> doGetSplits(int numBackends) throws UserException {
+        if (isSystemTable) {
+            return doGetSystemTableSplits();
+        }
 
         List<Split> splits = new ArrayList<>();
 
@@ -897,8 +924,24 @@ public class IcebergScanNode extends FileQueryScanNode {
         return splits;
     }
 
+    private List<Split> doGetSystemTableSplits() throws UserException {
+        List<Split> splits = new ArrayList<>();
+        TableScan scan = createTableScan();
+        try (CloseableIterable<FileScanTask> fileScanTasks = scan.planFiles()) 
{
+            fileScanTasks.forEach(task -> 
splits.add(createIcebergSysSplit(task)));
+        } catch (IOException e) {
+            throw new UserException(e.getMessage(), e);
+        }
+        selectedPartitionNum = 0;
+        return splits;
+    }
+
     @Override
     public boolean isBatchMode() {
+        if (isSystemTable) {
+            isBatchMode = false;
+            return false;
+        }
         Boolean cached = isBatchMode;
         if (cached != null) {
             return cached;
@@ -992,6 +1035,9 @@ public class IcebergScanNode extends FileQueryScanNode {
 
     @Override
     public TFileFormatType getFileFormatType() throws UserException {
+        if (isSystemTable) {
+            return TFileFormatType.FORMAT_JNI;
+        }
         TFileFormatType type;
         String icebergFormat = source.getFileFormat();
         if (icebergFormat.equalsIgnoreCase("parquet")) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
index 2596df5ad37..3af484abd6d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
@@ -25,11 +25,13 @@ import lombok.Data;
 import org.apache.iceberg.DeleteFile;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
 @Data
 public class IcebergSplit extends FileSplit {
+    private static final LocationPath DUMMY_PATH = 
LocationPath.of("/dummyPath");
 
     // Doris will convert the schema in FileSystem to achieve the function of 
natively reading files.
     // For example, s3a:// will be converted to s3://.
@@ -49,6 +51,7 @@ public class IcebergSplit extends FileSplit {
     private String partitionDataJson = null;
     private Long firstRowId = null;
     private Long lastUpdatedSequenceNumber = null;
+    private String serializedSplit;
 
     // File path will be changed if the file is modified, so there's no need 
to get modification time.
     public IcebergSplit(LocationPath file, long start, long length, long 
fileLength, String[] hosts,
@@ -66,4 +69,13 @@ public class IcebergSplit extends FileSplit {
         this.deleteFileFilters = deleteFileFilters;
         this.selfSplitWeight += 
deleteFileFilters.stream().mapToLong(IcebergDeleteFileFilter::getFilesize).sum();
     }
+
+    public static IcebergSplit newSysTableSplit(String serializedSplit, long 
rowCount) {
+        IcebergSplit split = new IcebergSplit(DUMMY_PATH, 0, 0, 0, null, null,
+                Collections.emptyMap(),
+                Collections.emptyList(), 
DUMMY_PATH.toStorageLocation().toString());
+        split.setSerializedSplit(serializedSplit);
+        split.setSelfSplitWeight(Math.max(rowCount, 1L));
+        return split;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSysExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSysExternalTable.java
index db972c6b2b6..b6999b7c50c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSysExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSysExternalTable.java
@@ -70,6 +70,8 @@ public class PaimonSysExternalTable extends ExternalTable {
     private final String sysTableType;
     private volatile Boolean isDataTable;
     private volatile Table paimonSysTable;
+    private volatile List<Column> fullSchema;
+    private volatile SchemaCacheValue schemaCacheValue;
 
     /**
      * Creates a new Paimon system external table.
@@ -93,6 +95,7 @@ public class PaimonSysExternalTable extends ExternalTable {
         return PaimonExternalMetaCache.ENGINE;
     }
 
+    @Override
     protected synchronized void makeSureInitialized() {
         super.makeSureInitialized();
         if (!objectCreated) {
@@ -136,30 +139,7 @@ public class PaimonSysExternalTable extends ExternalTable {
      */
     @Override
     public List<Column> getFullSchema() {
-        Table sysTable = getSysPaimonTable();
-        List<DataField> fields = sysTable.rowType().getFields();
-        List<Column> columns = Lists.newArrayListWithCapacity(fields.size());
-
-        for (DataField field : fields) {
-            Column column = new Column(
-                    field.name().toLowerCase(),
-                    PaimonUtil.paimonTypeToDorisType(
-                            field.type(),
-                            getCatalog().getEnableMappingVarbinary(),
-                            getCatalog().getEnableMappingTimestampTz()),
-                    true,
-                    null,
-                    true,
-                    field.description(),
-                    true,
-                    field.id());
-            PaimonUtil.updatePaimonColumnUniqueId(column, field);
-            if (field.type().getTypeRoot() == 
DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
-                column.setWithTZExtraInfo();
-            }
-            columns.add(column);
-        }
-        return columns;
+        return getOrCreateSchemaCacheValue().getSchema();
     }
 
     public PaimonExternalTable getSourceTable() {
@@ -232,12 +212,12 @@ public class PaimonSysExternalTable extends ExternalTable 
{
 
     @Override
     public Optional<SchemaCacheValue> initSchema(SchemaCacheKey key) {
-        return Optional.of(new SchemaCacheValue(getFullSchema()));
+        return Optional.of(getOrCreateSchemaCacheValue());
     }
 
     @Override
     public Optional<SchemaCacheValue> getSchemaCacheValue() {
-        return Optional.of(new SchemaCacheValue(getFullSchema()));
+        return Optional.of(getOrCreateSchemaCacheValue());
     }
 
     @Override
@@ -253,4 +233,45 @@ public class PaimonSysExternalTable extends ExternalTable {
     public String getComment() {
         return "Paimon system table: " + sysTableType + " for " + 
sourceTable.getName();
     }
+
+    private SchemaCacheValue getOrCreateSchemaCacheValue() {
+        if (schemaCacheValue == null) {
+            synchronized (this) {
+                if (schemaCacheValue == null) {
+                    if (fullSchema == null) {
+                        fullSchema = buildFullSchema();
+                    }
+                    schemaCacheValue = new SchemaCacheValue(fullSchema);
+                }
+            }
+        }
+        return schemaCacheValue;
+    }
+
+    private List<Column> buildFullSchema() {
+        Table sysTable = getSysPaimonTable();
+        List<DataField> fields = sysTable.rowType().getFields();
+        List<Column> columns = Lists.newArrayListWithCapacity(fields.size());
+
+        for (DataField field : fields) {
+            Column column = new Column(
+                    field.name().toLowerCase(),
+                    PaimonUtil.paimonTypeToDorisType(
+                            field.type(),
+                            getCatalog().getEnableMappingVarbinary(),
+                            getCatalog().getEnableMappingTimestampTz()),
+                    true,
+                    null,
+                    true,
+                    field.description(),
+                    true,
+                    field.id());
+            PaimonUtil.updatePaimonColumnUniqueId(column, field);
+            if (field.type().getTypeRoot() == 
DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
+                column.setWithTZExtraInfo();
+            }
+            columns.add(column);
+        }
+        return columns;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/IcebergSysTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/IcebergSysTable.java
index ed803640cef..72a739e7b9a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/IcebergSysTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/IcebergSysTable.java
@@ -17,17 +17,16 @@
 
 package org.apache.doris.datasource.systable;
 
-import org.apache.doris.info.TableValuedFunctionRefInfo;
-import org.apache.doris.nereids.trees.expressions.functions.table.IcebergMeta;
-import 
org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFunction;
-import org.apache.doris.tablefunction.IcebergTableValuedFunction;
+import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.iceberg.IcebergExternalTable;
+import org.apache.doris.datasource.iceberg.IcebergSysExternalTable;
+import org.apache.doris.nereids.exceptions.AnalysisException;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import org.apache.iceberg.MetadataTableType;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Locale;
 import java.util.Map;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -38,13 +37,10 @@ import java.util.stream.Collectors;
  * <p>Iceberg system tables provide access to table metadata such as
  * snapshots, history, manifests, files, partitions, etc.
  *
- * <p>Iceberg system tables currently use the TVF path (MetadataScanNode).
- *
  * @see org.apache.iceberg.MetadataTableType for all supported system table 
types
  */
-public class IcebergSysTable extends TvfSysTable {
-
-    private static final String TVF_NAME = "iceberg_meta";
+public class IcebergSysTable extends NativeSysTable {
+    public static final String POSITION_DELETES = 
MetadataTableType.POSITION_DELETES.name().toLowerCase(Locale.ROOT);
 
     /**
      * All supported Iceberg system tables.
@@ -52,14 +48,19 @@ public class IcebergSysTable extends TvfSysTable {
      */
     public static final Map<String, SysTable> SUPPORTED_SYS_TABLES = 
Collections.unmodifiableMap(
             Arrays.stream(MetadataTableType.values())
-                    .map(type -> new 
IcebergSysTable(type.name().toLowerCase()))
+                    .filter(type -> type != MetadataTableType.POSITION_DELETES)
+                    .map(type -> new 
IcebergSysTable(type.name().toLowerCase(Locale.ROOT), true))
                     .collect(Collectors.toMap(SysTable::getSysTableName, 
Function.identity())));
+    public static final SysTable UNSUPPORTED_POSITION_DELETES_TABLE =
+            new IcebergSysTable(POSITION_DELETES, false);
 
     private final String tableName;
+    private final boolean supported;
 
-    private IcebergSysTable(String tableName) {
-        super(tableName, TVF_NAME);
+    private IcebergSysTable(String tableName, boolean supported) {
+        super(tableName);
         this.tableName = tableName;
+        this.supported = supported;
     }
 
     @Override
@@ -68,22 +69,14 @@ public class IcebergSysTable extends TvfSysTable {
     }
 
     @Override
-    public TableValuedFunction createFunction(String ctlName, String dbName, 
String sourceNameWithMetaName) {
-        return IcebergMeta.createIcebergMeta(
-                Lists.newArrayList(ctlName, dbName, 
getSourceTableName(sourceNameWithMetaName)),
-                getSysTableName());
-    }
-
-    @Override
-    public TableValuedFunctionRefInfo createFunctionRef(String ctlName, String 
dbName, String sourceNameWithMetaName) {
-        String tableName = String.format("%s.%s.%s", ctlName, dbName, 
getSourceTableName(sourceNameWithMetaName));
-        try {
-            java.util.Map<String, String> params = Maps.newHashMap();
-            params.put(IcebergTableValuedFunction.TABLE, tableName);
-            params.put(IcebergTableValuedFunction.QUERY_TYPE, 
getSysTableName());
-            return new TableValuedFunctionRefInfo(tvfName, null, params);
-        } catch (org.apache.doris.common.AnalysisException e) {
-            throw new RuntimeException("Failed to create iceberg_meta tvf 
ref", e);
+    public ExternalTable createSysExternalTable(ExternalTable sourceTable) {
+        if (!supported) {
+            throw new AnalysisException("SysTable " + tableName + " is not 
supported yet");
+        }
+        if (!(sourceTable instanceof IcebergExternalTable)) {
+            throw new IllegalArgumentException(
+                    "Expected IcebergExternalTable but got " + 
sourceTable.getClass().getSimpleName());
         }
+        return new IcebergSysExternalTable((IcebergExternalTable) sourceTable, 
getSysTableName());
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/SysTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/SysTable.java
index 0dc6efb609c..2a70843546c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/SysTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/SysTable.java
@@ -35,7 +35,7 @@ import org.apache.doris.common.Pair;
  * <p>Subclasses should extend one of the specialized abstract classes:
  * <ul>
  *   <li>{@link NativeSysTable} - for tables using native execution path 
(FileQueryScanNode)</li>
- *   <li>{@link TvfSysTable} - for tables using TVF execution path 
(MetadataScanNode)</li>
+ *   <li>{@link TvfSysTable} - for tables still using TVF execution path 
(MetadataScanNode)</li>
  * </ul>
  *
  * @see NativeSysTable
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/TvfSysTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/TvfSysTable.java
index d79bd6ac232..f33b2d54617 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/TvfSysTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/TvfSysTable.java
@@ -27,7 +27,6 @@ import 
org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFun
  * This is the legacy execution path, still used for:
  * <ul>
  *   <li>Hive partition tables (partition_values TVF)</li>
- *   <li>Iceberg metadata tables (iceberg_meta TVF)</li>
  * </ul>
  *
  * <p>Subclasses must implement:
@@ -37,7 +36,6 @@ import 
org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFun
  * </ul>
  *
  * @see PartitionsSysTable
- * @see IcebergSysTable
  */
 public abstract class TvfSysTable extends SysTable {
 
@@ -59,7 +57,7 @@ public abstract class TvfSysTable extends SysTable {
     /**
      * Get the TVF name used for this system table.
      *
-     * @return the TVF name (e.g., "partition_values", "iceberg_meta")
+     * @return the TVF name (e.g., "partition_values")
      */
     public String getTvfName() {
         return tvfName;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index cb2ecdba15b..b1d2b41d31c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -54,6 +54,7 @@ import org.apache.doris.datasource.hive.source.HiveScanNode;
 import org.apache.doris.datasource.hudi.source.HudiScanNode;
 import org.apache.doris.datasource.iceberg.IcebergExternalTable;
 import org.apache.doris.datasource.iceberg.IcebergMergeOperation;
+import org.apache.doris.datasource.iceberg.IcebergSysExternalTable;
 import org.apache.doris.datasource.iceberg.source.IcebergScanNode;
 import org.apache.doris.datasource.jdbc.JdbcExternalTable;
 import org.apache.doris.datasource.jdbc.sink.JdbcTableSink;
@@ -743,7 +744,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                 default:
                     throw new RuntimeException("do not support DLA type " + 
((HMSExternalTable) table).getDlaType());
             }
-        } else if (table instanceof IcebergExternalTable) {
+        } else if (table instanceof IcebergExternalTable || table instanceof 
IcebergSysExternalTable) {
             scanNode = new IcebergScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false, sv,
                     context.getScanContext());
         } else if (table.getType() == TableIf.TableType.PAIMON_EXTERNAL_TABLE) 
{
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/UserAuthentication.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/UserAuthentication.java
index ccba89fece9..1f67afe4fae 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/UserAuthentication.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/UserAuthentication.java
@@ -27,6 +27,7 @@ import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.iceberg.IcebergSysExternalTable;
 import org.apache.doris.datasource.paimon.PaimonSysExternalTable;
 import org.apache.doris.mysql.privilege.AccessControllerManager;
 import org.apache.doris.mysql.privilege.PrivPredicate;
@@ -56,6 +57,9 @@ public class UserAuthentication {
         if (table instanceof PaimonSysExternalTable) {
             authTable = ((PaimonSysExternalTable) table).getSourceTable();
             authColumns = Collections.emptySet();
+        } else if (table instanceof IcebergSysExternalTable) {
+            authTable = ((IcebergSysExternalTable) table).getSourceTable();
+            authColumns = Collections.emptySet();
         }
         String tableName = authTable.getName();
         DatabaseIf db = authTable.getDatabase();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/IcebergMeta.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/IcebergMeta.java
deleted file mode 100644
index 8be995beaef..00000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/IcebergMeta.java
+++ /dev/null
@@ -1,67 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.nereids.trees.expressions.functions.table;
-
-import org.apache.doris.catalog.FunctionSignature;
-import org.apache.doris.nereids.exceptions.AnalysisException;
-import org.apache.doris.nereids.trees.expressions.Properties;
-import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
-import org.apache.doris.nereids.types.coercion.AnyDataType;
-import org.apache.doris.tablefunction.IcebergTableValuedFunction;
-import org.apache.doris.tablefunction.TableValuedFunctionIf;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.Maps;
-
-import java.util.List;
-import java.util.Map;
-
-/** iceberg_meta */
-public class IcebergMeta extends TableValuedFunction {
-    public IcebergMeta(Properties properties) {
-        super("iceberg_meta", properties);
-    }
-
-    public static IcebergMeta createIcebergMeta(List<String> nameParts, String 
queryType) {
-        Map<String, String> prop = Maps.newHashMap();
-        prop.put(IcebergTableValuedFunction.TABLE, 
Joiner.on(".").join(nameParts));
-        prop.put(IcebergTableValuedFunction.QUERY_TYPE, queryType);
-        return new IcebergMeta(new Properties(prop));
-    }
-
-    @Override
-    public FunctionSignature customSignature() {
-        return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, 
getArgumentsTypes());
-    }
-
-    @Override
-    protected TableValuedFunctionIf toCatalogFunction() {
-        try {
-            Map<String, String> arguments = getTVFProperties().getMap();
-            return IcebergTableValuedFunction.create(arguments);
-        } catch (Throwable t) {
-            throw new AnalysisException("Can not build 
IcebergTableValuedFunction by "
-                    + this + ": " + t.getMessage(), t);
-        }
-    }
-
-    @Override
-    public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
-        return visitor.visitIcebergMeta(this, context);
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
index 31b3162e647..8ae391e1f0e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
@@ -28,7 +28,6 @@ import 
org.apache.doris.nereids.trees.expressions.functions.table.Hdfs;
 import org.apache.doris.nereids.trees.expressions.functions.table.Http;
 import org.apache.doris.nereids.trees.expressions.functions.table.HttpStream;
 import org.apache.doris.nereids.trees.expressions.functions.table.HudiMeta;
-import org.apache.doris.nereids.trees.expressions.functions.table.IcebergMeta;
 import org.apache.doris.nereids.trees.expressions.functions.table.Jobs;
 import org.apache.doris.nereids.trees.expressions.functions.table.Local;
 import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos;
@@ -109,10 +108,6 @@ public interface TableValuedFunctionVisitor<R, C> {
         return visitTableValuedFunction(hudiMeta, context);
     }
 
-    default R visitIcebergMeta(IcebergMeta icebergMeta, C context) {
-        return visitTableValuedFunction(icebergMeta, context);
-    }
-
     default R visitLocal(Local local, C context) {
         return visitTableValuedFunction(local, context);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowCreateTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowCreateTableCommand.java
index 86332f6a57f..c534d62d01d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowCreateTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowCreateTableCommand.java
@@ -29,11 +29,15 @@ import org.apache.doris.catalog.stream.BaseTableStream;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.datasource.hive.HMSExternalTable;
 import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
 import org.apache.doris.datasource.iceberg.IcebergExternalTable;
+import org.apache.doris.datasource.iceberg.IcebergSysExternalTable;
 import org.apache.doris.datasource.iceberg.IcebergUtils;
+import org.apache.doris.datasource.systable.SysTable;
+import org.apache.doris.datasource.systable.SysTableResolver;
 import org.apache.doris.info.TableNameInfo;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.nereids.trees.plans.PlanType;
@@ -43,10 +47,12 @@ import org.apache.doris.qe.ShowResultSet;
 import org.apache.doris.qe.ShowResultSetMetaData;
 import org.apache.doris.qe.StmtExecutor;
 
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
 
 /**
  * Represents the command for SHOW CREATE TABLE.
@@ -90,9 +96,10 @@ public class ShowCreateTableCommand extends ShowCommand {
     private void validate(ConnectContext ctx) throws AnalysisException {
         tblNameInfo.analyze(ctx);
 
-        TableIf tableIf = Env.getCurrentEnv().getCatalogMgr()
+        DatabaseIf db = Env.getCurrentEnv().getCatalogMgr()
                 .getCatalogOrAnalysisException(tblNameInfo.getCtl())
-                
.getDbOrAnalysisException(tblNameInfo.getDb()).getTableOrAnalysisException(tblNameInfo.getTbl());
+                .getDbOrAnalysisException(tblNameInfo.getDb());
+        TableIf tableIf = resolveShowCreateTarget(db);
 
         if (tableIf instanceof MTMV) {
             ErrorReport.reportAnalysisException("not support async 
materialized view, "
@@ -106,12 +113,15 @@ public class ShowCreateTableCommand extends ShowCommand {
             wanted = PrivPredicate.SHOW;
         }
 
+        String authTableName = tableIf instanceof IcebergSysExternalTable
+                ? ((IcebergSysExternalTable) 
tableIf).getSourceTable().getName()
+                : tableIf.getName();
         if 
(!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(),
-                tblNameInfo.getCtl(), tblNameInfo.getDb(), 
tblNameInfo.getTbl(), wanted)) {
+                tblNameInfo.getCtl(), tblNameInfo.getDb(), authTableName, 
wanted)) {
             
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, 
"SHOW CREATE TABLE",
                                                 
ConnectContext.get().getQualifiedUser(),
                                                 
ConnectContext.get().getRemoteIP(),
-                                                tblNameInfo.getDb() + ": " + 
tblNameInfo.getTbl());
+                                                tblNameInfo.getDb() + ": " + 
authTableName);
         }
     }
 
@@ -132,7 +142,10 @@ public class ShowCreateTableCommand extends ShowCommand {
         // Fetch the catalog, database, and table metadata
         DatabaseIf db = 
ctx.getEnv().getCatalogMgr().getCatalogOrAnalysisException(tblNameInfo.getCtl())
                             .getDbOrMetaException(tblNameInfo.getDb());
-        TableIf table = db.getTableOrMetaException(tblNameInfo.getTbl());
+        TableIf table = resolveShowCreateTarget(db);
+        if (table instanceof IcebergSysExternalTable) {
+            table = ((IcebergSysExternalTable) table).getSourceTable();
+        }
 
         List<List<String>> rows = Lists.newArrayList();
 
@@ -173,4 +186,26 @@ public class ShowCreateTableCommand extends ShowCommand {
         }
     }
 
+    private TableIf resolveShowCreateTarget(DatabaseIf db) throws 
AnalysisException {
+        TableIf table = db.getTableNullable(tblNameInfo.getTbl());
+        if (table != null) {
+            return table;
+        }
+        Pair<String, String> tableNameWithSysTableName = 
SysTable.getTableNameWithSysTableName(tblNameInfo.getTbl());
+        if (Strings.isNullOrEmpty(tableNameWithSysTableName.second)) {
+            return db.getTableOrAnalysisException(tblNameInfo.getTbl());
+        }
+        TableIf sourceTable = 
db.getTableOrAnalysisException(tableNameWithSysTableName.first);
+        Optional<SysTableResolver.SysTableDescribe> sysTableDescribeOpt = 
SysTableResolver.resolveForDescribe(
+                sourceTable, tblNameInfo.getCtl(), tblNameInfo.getDb(), 
tblNameInfo.getTbl());
+        if (!sysTableDescribeOpt.isPresent()) {
+            throw new AnalysisException("sys table not found: " + 
tableNameWithSysTableName.second);
+        }
+        SysTableResolver.SysTableDescribe sysTableDescribe = 
sysTableDescribeOpt.get();
+        if (sysTableDescribe.isNative()) {
+            return sysTableDescribe.getSysExternalTable();
+        }
+        return sourceTable;
+    }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java
index aa9c6af0b5d..30072b97e70 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java
@@ -24,6 +24,7 @@ import org.apache.doris.common.IdGenerator;
 import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.hive.HMSExternalTable;
 import org.apache.doris.datasource.iceberg.IcebergExternalTable;
+import org.apache.doris.datasource.iceberg.IcebergSysExternalTable;
 import org.apache.doris.datasource.mvcc.MvccUtil;
 import org.apache.doris.nereids.memo.GroupExpression;
 import org.apache.doris.nereids.properties.LogicalProperties;
@@ -228,7 +229,7 @@ public class LogicalFileScan extends LogicalCatalogRelation 
implements SupportPr
     @Override
     public boolean supportPruneNestedColumn() {
         ExternalTable table = getTable();
-        if (table instanceof IcebergExternalTable) {
+        if (table instanceof IcebergExternalTable || table instanceof 
IcebergSysExternalTable) {
             return true;
         } else if (table instanceof HMSExternalTable) {
             HMSExternalTable hmsTable = (HMSExternalTable) table;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
deleted file mode 100644
index 5585805536f..00000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
+++ /dev/null
@@ -1,168 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.tablefunction;
-
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.TableIf;
-import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.ErrorCode;
-import org.apache.doris.common.ErrorReport;
-import org.apache.doris.common.security.authentication.ExecutionAuthenticator;
-import org.apache.doris.datasource.CatalogIf;
-import org.apache.doris.datasource.ExternalCatalog;
-import org.apache.doris.datasource.ExternalTable;
-import org.apache.doris.datasource.iceberg.IcebergUtils;
-import org.apache.doris.info.TableNameInfo;
-import org.apache.doris.mysql.privilege.PrivPredicate;
-import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.thrift.TMetaScanRange;
-import org.apache.doris.thrift.TMetadataType;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.MetadataTableType;
-import org.apache.iceberg.MetadataTableUtils;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.util.SerializationUtil;
-
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
-
-/**
- * The class of table valued function for iceberg metadata.
- * iceberg_meta("table" = "ctl.db.tbl", "query_type" = "snapshots").
- */
-public class IcebergTableValuedFunction extends MetadataTableValuedFunction {
-
-    public static final String NAME = "iceberg_meta";
-    public static final String TABLE = "table";
-    public static final String QUERY_TYPE = "query_type";
-
-    private static final ImmutableSet<String> PROPERTIES_SET = 
ImmutableSet.of(TABLE, QUERY_TYPE);
-
-    private final String queryType;
-    private final Table sysTable;
-    private final List<Column> schema;
-    private final Map<String, String> hadoopProps;
-    private final ExecutionAuthenticator preExecutionAuthenticator;
-
-    public static IcebergTableValuedFunction create(Map<String, String> params)
-            throws AnalysisException {
-        Map<String, String> validParams = Maps.newHashMap();
-        for (String key : params.keySet()) {
-            if (!PROPERTIES_SET.contains(key.toLowerCase())) {
-                throw new AnalysisException("'" + key + "' is invalid 
property");
-            }
-            // check ctl, db, tbl
-            validParams.put(key.toLowerCase(), params.get(key));
-        }
-        String tableName = validParams.get(TABLE);
-        String queryType = validParams.get(QUERY_TYPE);
-        if (tableName == null || queryType == null) {
-            throw new AnalysisException("Invalid iceberg metadata query");
-        }
-        // TODO: support position_deletes in future;
-        if (queryType.equalsIgnoreCase("position_deletes")) {
-            throw new AnalysisException("SysTable " + queryType + " is not 
supported yet");
-        }
-        String[] names = tableName.split("\\.");
-        if (names.length != 3) {
-            throw new AnalysisException("The iceberg table name contains the 
catalogName, databaseName, and tableName");
-        }
-        TableNameInfo icebergTableName = new TableNameInfo(names[0], names[1], 
names[2]);
-        // check auth
-        if (!Env.getCurrentEnv().getAccessManager()
-                .checkTblPriv(ConnectContext.get(), icebergTableName, 
PrivPredicate.SELECT)) {
-            
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, 
"SELECT",
-                    ConnectContext.get().getQualifiedUser(), 
ConnectContext.get().getRemoteIP(),
-                    icebergTableName.getDb() + ": " + 
icebergTableName.getTbl());
-        }
-        return new IcebergTableValuedFunction(icebergTableName, queryType);
-    }
-
-    public IcebergTableValuedFunction(TableNameInfo icebergTableName, String 
queryType)
-            throws AnalysisException {
-        this.queryType = queryType;
-        CatalogIf<?> catalog = 
Env.getCurrentEnv().getCatalogMgr().getCatalog(icebergTableName.getCtl());
-        if (!(catalog instanceof ExternalCatalog)) {
-            throw new AnalysisException("Catalog " + icebergTableName.getCtl() 
+ " is not an external catalog");
-        }
-        ExternalCatalog externalCatalog = (ExternalCatalog) catalog;
-        hadoopProps = 
externalCatalog.getCatalogProperty().getBackendStorageProperties();
-        preExecutionAuthenticator = 
externalCatalog.getExecutionAuthenticator();
-
-        TableIf dorisTable = 
externalCatalog.getDbOrAnalysisException(icebergTableName.getDb())
-                .getTableOrAnalysisException(icebergTableName.getTbl());
-        if (!(dorisTable instanceof ExternalTable)) {
-            throw new AnalysisException("Table " + icebergTableName + " is not 
an iceberg table");
-        }
-        Table icebergTable = IcebergUtils.getIcebergTable((ExternalTable) 
dorisTable);
-        if (icebergTable == null) {
-            throw new AnalysisException("Iceberg table " + icebergTableName + 
" does not exist");
-        }
-        MetadataTableType tableType = MetadataTableType.from(queryType);
-        if (tableType == null) {
-            throw new AnalysisException("Unrecognized queryType for iceberg 
metadata: " + queryType);
-        }
-        this.sysTable = 
MetadataTableUtils.createMetadataTableInstance(icebergTable, tableType);
-        this.schema = IcebergUtils.parseSchema(sysTable.schema(), 
externalCatalog.getEnableMappingVarbinary(),
-                externalCatalog.getEnableMappingTimestampTz());
-    }
-
-    @Override
-    public TMetadataType getMetadataType() {
-        return TMetadataType.ICEBERG;
-    }
-
-    @Override
-    public TMetaScanRange getMetaScanRange(List<String> requiredFileds) {
-        CloseableIterable<FileScanTask> tasks;
-        try {
-            tasks = preExecutionAuthenticator.execute(() -> {
-                return sysTable.newScan().select(requiredFileds).planFiles();
-            });
-        } catch (Exception e) {
-            throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e));
-        }
-
-        TMetaScanRange tMetaScanRange = new TMetaScanRange();
-        tMetaScanRange.setMetadataType(TMetadataType.ICEBERG);
-        tMetaScanRange.setHadoopProps(hadoopProps);
-        
tMetaScanRange.setSerializedTable(SerializationUtil.serializeToBase64(sysTable));
-        
tMetaScanRange.setSerializedSplits(StreamSupport.stream(tasks.spliterator(), 
false)
-                .map(SerializationUtil::serializeToBase64)
-                .collect(Collectors.toList()));
-        return tMetaScanRange;
-    }
-
-    @Override
-    public String getTableName() {
-        return "IcebergTableValuedFunction<" + queryType + ">";
-    }
-
-    @Override
-    public List<Column> getTableColumns() {
-        return schema;
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
index f6655192fbd..6e489d08c93 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
@@ -57,8 +57,6 @@ public abstract class TableValuedFunctionIf {
                 return new HttpStreamTableValuedFunction(params);
             case LocalTableValuedFunction.NAME:
                 return new LocalTableValuedFunction(params);
-            case IcebergTableValuedFunction.NAME:
-                return IcebergTableValuedFunction.create(params);
             case HudiTableValuedFunction.NAME:
                 return new HudiTableValuedFunction(params);
             case BackendsTableValuedFunction.NAME:
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/systable/IcebergSysTableResolverTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/systable/IcebergSysTableResolverTest.java
new file mode 100644
index 00000000000..e6c5b0be328
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/systable/IcebergSysTableResolverTest.java
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.systable;
+
+import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
+import org.apache.doris.datasource.iceberg.IcebergExternalDatabase;
+import org.apache.doris.datasource.iceberg.IcebergExternalTable;
+import org.apache.doris.datasource.iceberg.IcebergSysExternalTable;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+
+import mockit.Expectations;
+import mockit.Mocked;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+
+public class IcebergSysTableResolverTest {
+    @Mocked
+    private IcebergExternalCatalog catalog;
+    @Mocked
+    private IcebergExternalDatabase db;
+
+    @Test
+    public void testSupportedSysTablesExcludePositionDeletes() {
+        
Assertions.assertFalse(IcebergSysTable.SUPPORTED_SYS_TABLES.containsKey(IcebergSysTable.POSITION_DELETES));
+    }
+
+    @Test
+    public void testResolveForPlanAndDescribeUseNativePath() throws Exception {
+        IcebergExternalTable sourceTable = newIcebergTable();
+
+        Optional<SysTableResolver.SysTablePlan> plan = 
SysTableResolver.resolveForPlan(
+                sourceTable, "test_ctl", "test_db", "tbl$snapshots");
+        Assertions.assertTrue(plan.isPresent());
+        Assertions.assertTrue(plan.get().isNative());
+        Assertions.assertTrue(plan.get().getSysExternalTable() instanceof 
IcebergSysExternalTable);
+        Assertions.assertEquals("tbl$snapshots", 
plan.get().getSysExternalTable().getName());
+
+        Optional<SysTableResolver.SysTableDescribe> describe = 
SysTableResolver.resolveForDescribe(
+                sourceTable, "test_ctl", "test_db", "tbl$snapshots");
+        Assertions.assertTrue(describe.isPresent());
+        Assertions.assertTrue(describe.get().isNative());
+        Assertions.assertTrue(describe.get().getSysExternalTable() instanceof 
IcebergSysExternalTable);
+        Assertions.assertEquals("tbl$snapshots", 
describe.get().getSysExternalTable().getName());
+    }
+
+    @Test
+    public void testPositionDeletesKeepsUnsupportedError() throws Exception {
+        IcebergExternalTable sourceTable = newIcebergTable();
+        Assertions.assertThrows(AnalysisException.class, () ->
+                SysTableResolver.resolveForPlan(sourceTable, "test_ctl", 
"test_db", "tbl$position_deletes"));
+    }
+
+    private IcebergExternalTable newIcebergTable() throws Exception {
+        new Expectations() {
+            {
+                catalog.getId();
+                minTimes = 0;
+                result = 1L;
+
+                db.getFullName();
+                minTimes = 0;
+                result = "test_db";
+
+                db.getRemoteName();
+                minTimes = 0;
+                result = "test_db";
+
+                catalog.getDbOrAnalysisException("test_db");
+                minTimes = 0;
+                result = db;
+
+                db.getId();
+                minTimes = 0;
+                result = 2L;
+
+                db.makeSureInitialized();
+                minTimes = 0;
+            }
+        };
+        return new IcebergExternalTable(3L, "tbl", "tbl", catalog, db);
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/UserAuthenticationTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/UserAuthenticationTest.java
index d440d32dd18..547e2b0d2c6 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/UserAuthenticationTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/UserAuthenticationTest.java
@@ -25,6 +25,8 @@ import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.iceberg.IcebergExternalTable;
+import org.apache.doris.datasource.iceberg.IcebergSysExternalTable;
 import org.apache.doris.mysql.privilege.AccessControllerManager;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
@@ -55,6 +57,10 @@ public class UserAuthenticationTest {
     private DatabaseIf db;
     @Mocked
     private CatalogIf catalog;
+    @Mocked
+    private IcebergSysExternalTable icebergSysTable;
+    @Mocked
+    private IcebergExternalTable icebergSourceTable;
 
     private String originalMinPrivilege;
 
@@ -345,4 +351,59 @@ public class UserAuthenticationTest {
         Assertions.assertDoesNotThrow(() ->
                 UserAuthentication.checkPermission(table, connectContext, 
null));
     }
+
+    @Test
+    public void testIcebergSysTableUsesSourceTablePrivilege() throws Exception 
{
+        new Expectations() {
+            {
+                icebergSysTable.getSourceTable();
+                minTimes = 0;
+                result = icebergSourceTable;
+
+                connectContext.getSessionVariable();
+                minTimes = 0;
+                result = sessionVariable;
+
+                sessionVariable.isPlayNereidsDump();
+                minTimes = 0;
+                result = false;
+
+                icebergSourceTable.getName();
+                minTimes = 0;
+                result = "source_tbl";
+
+                icebergSourceTable.getDatabase();
+                minTimes = 0;
+                result = db;
+
+                db.getFullName();
+                minTimes = 0;
+                result = "test_db";
+
+                db.getCatalog();
+                minTimes = 0;
+                result = catalog;
+
+                catalog.getName();
+                minTimes = 0;
+                result = "test_ctl";
+
+                connectContext.getEnv();
+                minTimes = 0;
+                result = env;
+
+                env.getAccessManager();
+                minTimes = 0;
+                result = accessControllerManager;
+
+                accessControllerManager.checkTblPriv(connectContext, 
"test_ctl", "test_db",
+                        "source_tbl", PrivPredicate.SELECT);
+                minTimes = 1;
+                result = true;
+            }
+        };
+
+        Assertions.assertDoesNotThrow(() ->
+                UserAuthentication.checkPermission(icebergSysTable, 
connectContext, null));
+    }
 }
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 1f4ae4df04a..60c5e01ecf8 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -319,6 +319,7 @@ struct TIcebergFileDesc {
     10: optional i64 first_row_id;
     // Only for format_version >= 3, the sequence number which last updated 
this file.
     11: optional i64 last_updated_sequence_number;
+    12: optional string serialized_split;
 }
 
 struct TPaimonDeletionFileDesc {
diff --git a/regression-test/data/external_table_p2/tvf/test_iceberg_meta.out 
b/regression-test/data/external_table_p2/tvf/test_iceberg_meta.out
deleted file mode 100644
index b62e2d7510a..00000000000
--- a/regression-test/data/external_table_p2/tvf/test_iceberg_meta.out
+++ /dev/null
@@ -1,22 +0,0 @@
--- This file is automatically generated. You should know what you did if you 
want to edit this
--- !q01 --
-2879562
-
--- !q02 --
-1
-11
-3
-5
-6
-7
-8
-
--- !tvf_1 --
-2023-10-16T21:01:06    4012471924714711043     5784892960796156942     append
-2023-10-16T21:01:06    5784892960796156942     -1      append
-2023-10-16T21:01:06    7235593032487457798     4012471924714711043     append
-2023-10-16T21:01:07    1953697979105284524     7235593032487457798     append
-
--- !tvf_2 --
-2023-10-16T21:01:06    7235593032487457798     4012471924714711043     append
-
diff --git 
a/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_retention_and_snapshot.groovy
 
b/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_retention_and_snapshot.groovy
index fe0cd29c5e9..cd3f77fe96d 100644
--- 
a/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_retention_and_snapshot.groovy
+++ 
b/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_retention_and_snapshot.groovy
@@ -55,7 +55,7 @@ suite("iceberg_branch_retention_and_snapshot", "p0,external") 
{
     sql """ insert into ${table_name_expire} values (4, 'd') """
     sql """ insert into ${table_name_expire} values (5, 'e') """
 
-    List<List<Object>> snapshots_expire = sql """ select snapshot_id from 
iceberg_meta("table" = 
"${catalog_name}.test_db_retention.${table_name_expire}", "query_type" = 
"snapshots") order by committed_at; """
+    List<List<Object>> snapshots_expire = sql """ select snapshot_id from 
${catalog_name}.test_db_retention.${table_name_expire}\$snapshots order by 
committed_at; """
     String s_expire_0 = snapshots_expire.get(0)[0]
     String s_expire_1 = snapshots_expire.get(1)[0]
     String s_expire_2 = snapshots_expire.get(2)[0]
@@ -73,7 +73,7 @@ suite("iceberg_branch_retention_and_snapshot", "p0,external") 
{
     sql """ insert into ${table_name_expire}@branch(b_expire_test) values (10, 
'j') """
 
     // Get the current snapshot count before expire
-    def snapshot_count_before_expire = sql """ select count(*) from 
iceberg_meta("table" = 
"${catalog_name}.test_db_retention.${table_name_expire}", "query_type" = 
"snapshots") """
+    def snapshot_count_before_expire = sql """ select count(*) from 
${catalog_name}.test_db_retention.${table_name_expire}\$snapshots """
     logger.info("Snapshot count before expire: 
${snapshot_count_before_expire[0][0]}")
 
     // Get branch ref snapshot
@@ -107,7 +107,7 @@ suite("iceberg_branch_retention_and_snapshot", 
"p0,external") {
     sql """ create table ${table_name_retain_count} (id int, name string) """
 
     sql """ insert into ${table_name_retain_count} values (1, 'a') """
-    List<List<Object>> snapshots_retain = sql """ select snapshot_id from 
iceberg_meta("table" = 
"${catalog_name}.test_db_retention.${table_name_retain_count}", "query_type" = 
"snapshots") order by committed_at; """
+    List<List<Object>> snapshots_retain = sql """ select snapshot_id from 
${catalog_name}.test_db_retention.${table_name_retain_count}\$snapshots order 
by committed_at; """
     def s_retain_0 = snapshots_retain[0][0].toString()
 
     // Create branch with snapshot retention count of 3
@@ -124,7 +124,7 @@ suite("iceberg_branch_retention_and_snapshot", 
"p0,external") {
     def branch_snapshot_id = sql """ select snapshot_id from 
${table_name_retain_count}\$refs where name = 'b_retain_count' """
 
     // Count snapshots before expire
-    def snapshot_count_retain_before = sql """ select count(*) from 
iceberg_meta("table" = 
"${catalog_name}.test_db_retention.${table_name_retain_count}", "query_type" = 
"snapshots") """
+    def snapshot_count_retain_before = sql """ select count(*) from 
${catalog_name}.test_db_retention.${table_name_retain_count}\$snapshots """
 
     // Call expire_snapshots - older snapshots beyond retention count may be 
expired, but branch snapshot should be protected
     sql """
@@ -150,11 +150,11 @@ suite("iceberg_branch_retention_and_snapshot", 
"p0,external") {
     sql """ insert into ${table_name_unref} values (2, 'old2') """
     sql """ insert into ${table_name_unref} values (3, 'new') """
 
-    List<List<Object>> snapshots_unref = sql """ select snapshot_id, 
committed_at from iceberg_meta("table" = 
"${catalog_name}.test_db_retention.${table_name_unref}", "query_type" = 
"snapshots") order by committed_at; """
+    List<List<Object>> snapshots_unref = sql """ select snapshot_id, 
committed_at from 
${catalog_name}.test_db_retention.${table_name_unref}\$snapshots order by 
committed_at; """
     def old_snapshot_id = snapshots_unref[0][0]
 
     // Create a tag pointing to the newest snapshot (not the old ones)
-    List<List<Object>> latest_snapshot = sql """ select snapshot_id from 
iceberg_meta("table" = "${catalog_name}.test_db_retention.${table_name_unref}", 
"query_type" = "snapshots") order by committed_at desc limit 1; """
+    List<List<Object>> latest_snapshot = sql """ select snapshot_id from 
${catalog_name}.test_db_retention.${table_name_unref}\$snapshots order by 
committed_at desc limit 1; """
     def latest_snap_id = latest_snapshot[0][0]
     sql """ alter table ${table_name_unref} create tag t_latest AS OF VERSION 
${latest_snap_id} """
 
@@ -162,7 +162,7 @@ suite("iceberg_branch_retention_and_snapshot", 
"p0,external") {
     qt_unref_tag_before_expire """ select * from 
${table_name_unref}@tag(t_latest) order by id """ // Should have 1, old2, 3 rows
 
     // Count snapshots before expire
-    def snapshot_count_unref_before = sql """ select count(*) from 
iceberg_meta("table" = "${catalog_name}.test_db_retention.${table_name_unref}", 
"query_type" = "snapshots") """
+    def snapshot_count_unref_before = sql """ select count(*) from 
${catalog_name}.test_db_retention.${table_name_unref}\$snapshots """
     logger.info("Snapshot count before expire: 
${snapshot_count_unref_before[0][0]}")
 
     // Call expire_snapshots - old unreferenced snapshots should be expired
@@ -172,7 +172,7 @@ suite("iceberg_branch_retention_and_snapshot", 
"p0,external") {
     """
 
     // Count snapshots after expire
-    def snapshot_count_unref_after = sql """ select count(*) from 
iceberg_meta("table" = "${catalog_name}.test_db_retention.${table_name_unref}", 
"query_type" = "snapshots") """
+    def snapshot_count_unref_after = sql """ select count(*) from 
${catalog_name}.test_db_retention.${table_name_unref}\$snapshots """
 
     // Refresh catalog to ensure we see the latest state after expire_snapshots
     sql """refresh catalog ${catalog_name}"""
@@ -181,7 +181,7 @@ suite("iceberg_branch_retention_and_snapshot", 
"p0,external") {
     qt_unref_tag_accessible """ select * from 
${table_name_unref}@tag(t_latest) order by id """ // Should have data
 
     // Verify old snapshot is no longer accessible if it was expired
-    def old_snapshot_exists = sql """ select count(*) from 
iceberg_meta("table" = "${catalog_name}.test_db_retention.${table_name_unref}", 
"query_type" = "snapshots") where snapshot_id = '${old_snapshot_id}' """
+    def old_snapshot_exists = sql """ select count(*) from 
${catalog_name}.test_db_retention.${table_name_unref}\$snapshots where 
snapshot_id = '${old_snapshot_id}' """
     logger.info("Old snapshot exists after expire: 
${old_snapshot_exists[0][0]}")
 
     // The tag-protected snapshot should still be in refs
diff --git 
a/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_tag_retention_and_consistency.groovy
 
b/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_tag_retention_and_consistency.groovy
index 3c37ff7ba04..242a4f7fcf7 100644
--- 
a/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_tag_retention_and_consistency.groovy
+++ 
b/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_tag_retention_and_consistency.groovy
@@ -55,7 +55,7 @@ suite("iceberg_tag_retention_and_consistency", "p0,external") 
{
     sql """ insert into ${table_name_expire} values (4, 'd', 'snapshot4') """
     sql """ insert into ${table_name_expire} values (5, 'e', 'snapshot5') """
 
-    List<List<Object>> snapshots_expire = sql """ select snapshot_id from 
iceberg_meta("table" = 
"${catalog_name}.test_db_tag_retention.${table_name_expire}", "query_type" = 
"snapshots") order by committed_at; """
+    List<List<Object>> snapshots_expire = sql """ select snapshot_id from 
${catalog_name}.test_db_tag_retention.${table_name_expire}\$snapshots order by 
committed_at; """
     String s_expire_0 = snapshots_expire.get(0)[0]
     String s_expire_1 = snapshots_expire.get(1)[0]
     String s_expire_2 = snapshots_expire.get(2)[0]
@@ -69,7 +69,7 @@ suite("iceberg_tag_retention_and_consistency", "p0,external") 
{
     logger.info("Created tags t_early (snapshot ${s_expire_1}) and t_middle 
(snapshot ${s_expire_3})")
 
     // Get snapshot count before expire
-    def snapshot_count_before_expire = sql """ select count(*) from 
iceberg_meta("table" = 
"${catalog_name}.test_db_tag_retention.${table_name_expire}", "query_type" = 
"snapshots") """
+    def snapshot_count_before_expire = sql """ select count(*) from 
${catalog_name}.test_db_tag_retention.${table_name_expire}\$snapshots """
     logger.info("Snapshot count before expire: 
${snapshot_count_before_expire[0][0]}")
 
     // Call expire_snapshots via Spark - should not delete snapshots 
referenced by tags
@@ -95,7 +95,7 @@ suite("iceberg_tag_retention_and_consistency", "p0,external") 
{
     sql """ insert into ${table_name_multi_tag} values (2, 'second') """
     sql """ insert into ${table_name_multi_tag} values (3, 'third') """
 
-    List<List<Object>> snapshots_multi = sql """ select snapshot_id from 
iceberg_meta("table" = 
"${catalog_name}.test_db_tag_retention.${table_name_multi_tag}", "query_type" = 
"snapshots") order by committed_at; """
+    List<List<Object>> snapshots_multi = sql """ select snapshot_id from 
${catalog_name}.test_db_tag_retention.${table_name_multi_tag}\$snapshots order 
by committed_at; """
     String s_multi_1 = snapshots_multi.get(1)[0]
 
     // Create multiple tags pointing to the same snapshot
@@ -126,7 +126,7 @@ suite("iceberg_tag_retention_and_consistency", 
"p0,external") {
     sql """ insert into ${table_name_immutable} values (1, 'version1', 1000) 
"""
     sql """ insert into ${table_name_immutable} values (2, 'version2', 2000) 
"""
 
-    List<List<Object>> snapshots_imm = sql """ select snapshot_id from 
iceberg_meta("table" = 
"${catalog_name}.test_db_tag_retention.${table_name_immutable}", "query_type" = 
"snapshots") order by committed_at; """
+    List<List<Object>> snapshots_imm = sql """ select snapshot_id from 
${catalog_name}.test_db_tag_retention.${table_name_immutable}\$snapshots order 
by committed_at; """
     String s_imm_v1 = snapshots_imm.get(1)[0]
 
     // Create a tag at this point
@@ -157,11 +157,11 @@ suite("iceberg_tag_retention_and_consistency", 
"p0,external") {
     sql """ create table ${table_name_replace} (id int, status string) """
 
     sql """ insert into ${table_name_replace} values (1, 'pending') """
-    List<List<Object>> snapshots_replace = sql """ select snapshot_id from 
iceberg_meta("table" = 
"${catalog_name}.test_db_tag_retention.${table_name_replace}", "query_type" = 
"snapshots") order by committed_at; """
+    List<List<Object>> snapshots_replace = sql """ select snapshot_id from 
${catalog_name}.test_db_tag_retention.${table_name_replace}\$snapshots order by 
committed_at; """
     String s_rep_v1 = snapshots_replace.get(0)[0]
 
     sql """ insert into ${table_name_replace} values (2, 'approved') """
-    List<List<Object>> snapshots_replace_2 = sql """ select snapshot_id from 
iceberg_meta("table" = 
"${catalog_name}.test_db_tag_retention.${table_name_replace}", "query_type" = 
"snapshots") order by committed_at; """
+    List<List<Object>> snapshots_replace_2 = sql """ select snapshot_id from 
${catalog_name}.test_db_tag_retention.${table_name_replace}\$snapshots order by 
committed_at; """
     String s_rep_v2 = snapshots_replace_2.get(1)[0]
 
     // Create initial tag
@@ -185,7 +185,7 @@ suite("iceberg_tag_retention_and_consistency", 
"p0,external") {
     sql """ insert into ${table_name_time_travel} values (2, 'v1.0', 
'2024-01-01') """
     sql """ insert into ${table_name_time_travel} values (3, 'v1.0', 
'2024-01-01') """
 
-    List<List<Object>> snapshots_tt = sql """ select snapshot_id from 
iceberg_meta("table" = 
"${catalog_name}.test_db_tag_retention.${table_name_time_travel}", "query_type" 
= "snapshots") order by committed_at; """
+    List<List<Object>> snapshots_tt = sql """ select snapshot_id from 
${catalog_name}.test_db_tag_retention.${table_name_time_travel}\$snapshots 
order by committed_at; """
     String s_tt_1 = snapshots_tt.get(0)[0]
     String s_tt_2 = snapshots_tt.get(1)[0]
     String s_tt_3 = snapshots_tt.get(2)[0]
@@ -219,7 +219,7 @@ suite("iceberg_tag_retention_and_consistency", 
"p0,external") {
     sql """ insert into ${table_name_agg} values (3, 'A', 150) """
     sql """ insert into ${table_name_agg} values (4, 'C', 300) """
 
-    List<List<Object>> snapshots_agg = sql """ select snapshot_id from 
iceberg_meta("table" = 
"${catalog_name}.test_db_tag_retention.${table_name_agg}", "query_type" = 
"snapshots") order by committed_at; """
+    List<List<Object>> snapshots_agg = sql """ select snapshot_id from 
${catalog_name}.test_db_tag_retention.${table_name_agg}\$snapshots order by 
committed_at; """
     String s_agg_all = snapshots_agg.get(3)[0]
 
     // Create tag for aggregate queries
@@ -245,7 +245,7 @@ suite("iceberg_tag_retention_and_consistency", 
"p0,external") {
     sql """ insert into ${table_name_interaction} values (1, 'item1', 'main') 
"""
     sql """ insert into ${table_name_interaction} values (2, 'item2', 'main') 
"""
 
-    List<List<Object>> snapshots_inter = sql """ select snapshot_id from 
iceberg_meta("table" = 
"${catalog_name}.test_db_tag_retention.${table_name_interaction}", "query_type" 
= "snapshots") order by committed_at; """
+    List<List<Object>> snapshots_inter = sql """ select snapshot_id from 
${catalog_name}.test_db_tag_retention.${table_name_interaction}\$snapshots 
order by committed_at; """
     String s_inter_baseline = snapshots_inter.get(1)[0]
 
     // Create a tag for baseline
@@ -276,7 +276,7 @@ suite("iceberg_tag_retention_and_consistency", 
"p0,external") {
     sql """ create table ${table_name_write_fail} (id int, value string) """
 
     sql """ insert into ${table_name_write_fail} values (1, 'data1') """
-    List<List<Object>> snapshots_fail = sql """ select snapshot_id from 
iceberg_meta("table" = 
"${catalog_name}.test_db_tag_retention.${table_name_write_fail}", "query_type" 
= "snapshots") order by committed_at; """
+    List<List<Object>> snapshots_fail = sql """ select snapshot_id from 
${catalog_name}.test_db_tag_retention.${table_name_write_fail}\$snapshots order 
by committed_at; """
     String s_fail = snapshots_fail.get(0)[0]
 
     // Create a tag
diff --git 
a/regression-test/suites/external_table_p0/iceberg/iceberg_branch_tag_operate.groovy
 
b/regression-test/suites/external_table_p0/iceberg/iceberg_branch_tag_operate.groovy
index 53b4581f604..4bf1d347950 100644
--- 
a/regression-test/suites/external_table_p0/iceberg/iceberg_branch_tag_operate.groovy
+++ 
b/regression-test/suites/external_table_p0/iceberg/iceberg_branch_tag_operate.groovy
@@ -88,7 +88,7 @@ suite("iceberg_branch_tag_operate", "p0,external") {
     assertTrue(update_time2 > update_time1);
     sleep(1000)
 
-    List<List<Object>> snapshots = sql """ select snapshot_id from 
iceberg_meta("table" = "${catalog_name}.test_db.${table_name}", "query_type" = 
"snapshots") order by committed_at; """
+    List<List<Object>> snapshots = sql """ select snapshot_id from 
${catalog_name}.test_db.${table_name}\$snapshots order by committed_at; """
     String s0 = snapshots.get(0)[0]
     String s1 = snapshots.get(1)[0]
     String s2 = snapshots.get(2)[0]
diff --git 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.groovy
 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.groovy
index f014ed6ecb7..4ecf4aef86e 100644
--- 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.groovy
+++ 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.groovy
@@ -48,7 +48,7 @@ suite("iceberg_schema_change_with_timetravel", "p0,external") 
{
     sql """ use test_db;""" 
 
     def executeTimeTravelingQueries = { String tableName ->
-        def snapshots = sql """ select snapshot_id from iceberg_meta("table" = 
"${catalog_name}.test_db.${tableName}", "query_type" = "snapshots") order by 
committed_at; """
+        def snapshots = sql """ select snapshot_id from 
${catalog_name}.test_db.${tableName}\$snapshots order by committed_at; """
         def snapshotIds = [
             s0: snapshots.get(0)[0],
             s1: snapshots.get(1)[0],
diff --git 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_sys_table.groovy
 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_sys_table.groovy
index ba3a4621224..de4d582a83a 100644
--- 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_sys_table.groovy
+++ 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_sys_table.groovy
@@ -43,48 +43,13 @@ suite("test_iceberg_sys_table", "p0,external") {
     sql """switch ${catalog_name}"""
     sql """use ${db_name}"""
 
-    def test_iceberg_systable = { tblName, systableType ->
-        def systableName = "${tblName}\$${systableType}"
-
-        order_qt_desc_systable1 """desc ${systableName}"""
-        order_qt_desc_systable2 """desc ${db_name}.${systableName}"""
-        order_qt_desc_systable3 """desc 
${catalog_name}.${db_name}.${systableName}"""
-
-        List<List<Object>> schema = sql """desc ${systableName}"""
-        String key = String.valueOf(schema[1][0])
-
-        order_qt_tbl1_systable """select * from ${systableName}"""
-        order_qt_tbl1_systable_select """select ${key} from ${systableName}"""
-        order_qt_tbl1_systable_count """select count(*) from ${systableName}"""
-        order_qt_tbl1_systable_count_select """select count(${key}) from 
${systableName}"""
-
-        List<List<Object>> res1 = sql """select ${key} from ${systableName} 
order by ${key}"""
-        List<List<Object>> res2 = sql """select ${key} from iceberg_meta(
-            "table" = "${catalog_name}.${db_name}.${tblName}",
-            "query_type" = "${systableType}") order by ${key};
-        """
-        assertEquals(res1.size(), res2.size());
-        for (int i = 0; i < res1.size(); i++) {
-            for (int j = 0; j < res1[i].size(); j++) {
-                assertEquals(res1[i][j], res2[i][j]);
-            }
-        }
-
-        if (res1.isEmpty()) {
-            return
-        }
-
-        String value = String.valueOf(res1[0][0])
-        order_qt_tbl1_systable_where """
-            select * from ${systableName} where ${key}="${value}";
-        """
-        order_qt_tbl1_systable_where_count """
-            select count(*) from ${systableName} where ${key}="${value}";
-        """
-        order_qt_systable_where2 """select * from iceberg_meta(
-            "table" = "${catalog_name}.${db_name}.${tblName}",
-            "query_type" = "${systableType}") where ${key}="${value}";
-        """
+    def assertQueryRowsMatchCount = { String countSql, String querySql, String 
label ->
+        List<List<Object>> countResult = sql countSql
+        assertEquals(1, countResult.size())
+        long expectedRows = ((Number) countResult[0][0]).longValue()
+        List<List<Object>> queryResult = sql querySql
+        assertNotNull(queryResult, "${label} result should not be null")
+        assertEquals(expectedRows, (long) queryResult.size(), "${label} row 
count should match count query")
     }
 
     def test_systable_entries = { table, systableType ->
@@ -103,10 +68,16 @@ suite("test_iceberg_sys_table", "p0,external") {
             }
         }
 
-        order_qt_select_entries """select status, sequence_number, 
file_sequence_number from ${systableName}"""
         order_qt_select_entries_count """select count(*) from 
${systableName}"""
-        order_qt_select_entries_where """select status, sequence_number, 
file_sequence_number from ${systableName} where status="0";"""
         order_qt_select_entries_where_count """select count(status) from 
${systableName} where status="0";"""
+        assertQueryRowsMatchCount(
+                """select count(*) from ${systableName}""",
+                """select status, sequence_number, file_sequence_number from 
${systableName}""",
+                systableName)
+        assertQueryRowsMatchCount(
+                """select count(*) from ${systableName} where status="0";""",
+                """select status, sequence_number, file_sequence_number from 
${systableName} where status="0";""",
+                "${systableName} filtered by status")
     }
 
     def test_systable_files = { table, systableType ->
@@ -125,10 +96,16 @@ suite("test_iceberg_sys_table", "p0,external") {
             }
         }
 
-        order_qt_select_files """select content, file_format, record_count, 
lower_bounds, upper_bounds from ${systableName}"""
         order_qt_select_files_count """select count(*) from ${systableName}"""
-        order_qt_select_files_where """select content, file_format, 
record_count, lower_bounds, upper_bounds from ${systableName} where 
content="0";"""
         order_qt_select_files_where_count """select count(content) from 
${systableName} where content="0";"""
+        assertQueryRowsMatchCount(
+                """select count(*) from ${systableName}""",
+                """select content, file_format, record_count, lower_bounds, 
upper_bounds from ${systableName}""",
+                systableName)
+        assertQueryRowsMatchCount(
+                """select count(*) from ${systableName} where content="0";""",
+                """select content, file_format, record_count, lower_bounds, 
upper_bounds from ${systableName} where content="0";""",
+                "${systableName} filtered by content")
     }
 
     def test_systable_history = { table ->
@@ -148,17 +125,10 @@ suite("test_iceberg_sys_table", "p0,external") {
         }
 
         order_qt_select_history_count """select count(*) from 
${systableName}"""
-
-        List<List<Object>> res1 = sql """select * from ${systableName} order 
by snapshot_id"""
-        List<List<Object>> res2 = sql """select * from iceberg_meta(
-            "table" = "${catalog_name}.${db_name}.${table}",
-            "query_type" = "history") order by snapshot_id"""
-        assertEquals(res1.size(), res2.size());
-        for (int i = 0; i < res1.size(); i++) {
-            for (int j = 0; j < res1[i].size(); j++) {
-                assertEquals(res1[i][j], res2[i][j]);
-            }
-        }
+        assertQueryRowsMatchCount(
+                """select count(*) from ${systableName}""",
+                """select * from ${systableName} order by snapshot_id""",
+                systableName)
     }
 
     def test_systable_metadata_log_entries = { table ->
@@ -178,17 +148,10 @@ suite("test_iceberg_sys_table", "p0,external") {
         }
 
         order_qt_select_metadata_log_entries_count """select count(*) from 
${systableName}"""
-
-        List<List<Object>> res1 = sql """select * from ${systableName} order 
by timestamp"""
-        List<List<Object>> res2 = sql """select * from iceberg_meta(
-            "table" = "${catalog_name}.${db_name}.${table}",
-            "query_type" = "metadata_log_entries") order by timestamp"""
-        assertEquals(res1.size(), res2.size());
-        for (int i = 0; i < res1.size(); i++) {
-            for (int j = 0; j < res1[i].size(); j++) {
-                assertEquals(res1[i][j], res2[i][j]);
-            }
-        }
+        assertQueryRowsMatchCount(
+                """select count(*) from ${systableName}""",
+                """select * from ${systableName} order by timestamp""",
+                systableName)
     }
 
     def test_systable_snapshots = { table ->
@@ -207,19 +170,15 @@ suite("test_iceberg_sys_table", "p0,external") {
             }
         }
 
-        order_qt_select_snapshots """select operation from ${systableName}"""
         order_qt_select_snapshots_count """select count(*) from 
${systableName}"""
-
-        List<List<Object>> res1 = sql """select * from ${systableName} order 
by committed_at"""
-        List<List<Object>> res2 = sql """select * from iceberg_meta(
-            "table" = "${catalog_name}.${db_name}.${table}",
-            "query_type" = "snapshots") order by committed_at"""
-        assertEquals(res1.size(), res2.size());
-        for (int i = 0; i < res1.size(); i++) {
-            for (int j = 0; j < res1[i].size(); j++) {
-                assertEquals(res1[i][j], res2[i][j]);
-            }
-        }
+        assertQueryRowsMatchCount(
+                """select count(*) from ${systableName}""",
+                """select operation from ${systableName}""",
+                "${systableName} projected query")
+        assertQueryRowsMatchCount(
+                """select count(*) from ${systableName}""",
+                """select * from ${systableName} order by committed_at""",
+                systableName)
     }
 
     def test_systable_refs = { table ->
@@ -238,19 +197,15 @@ suite("test_iceberg_sys_table", "p0,external") {
             }
         }
 
-        order_qt_select_refs """select name, type from ${systableName}"""
         order_qt_select_refs_count """select count(*) from ${systableName}"""
-
-        List<List<Object>> res1 = sql """select * from ${systableName} order 
by snapshot_id"""
-        List<List<Object>> res2 = sql """select * from iceberg_meta(
-            "table" = "${catalog_name}.${db_name}.${table}",
-            "query_type" = "refs") order by snapshot_id"""
-        assertEquals(res1.size(), res2.size());
-        for (int i = 0; i < res1.size(); i++) {
-            for (int j = 0; j < res1[i].size(); j++) {
-                assertEquals(res1[i][j], res2[i][j]);
-            }
-        }
+        assertQueryRowsMatchCount(
+                """select count(*) from ${systableName}""",
+                """select name, type from ${systableName}""",
+                "${systableName} projected query")
+        assertQueryRowsMatchCount(
+                """select count(*) from ${systableName}""",
+                """select * from ${systableName} order by snapshot_id""",
+                systableName)
     }
 
     def test_systable_manifests = { table, systableType ->
@@ -272,27 +227,15 @@ suite("test_iceberg_sys_table", "p0,external") {
         order_qt_select_manifests_count """select count(*) from 
${systableName}"""
 
         if (systableType.equals("manifests")) {
-            List<List<Object>> res1 = sql """select * from ${systableName} 
order by path"""
-            List<List<Object>> res2 = sql """select * from iceberg_meta(
-                "table" = "${catalog_name}.${db_name}.${table}",
-                "query_type" = "${systableType}") order by path"""
-            assertEquals(res1.size(), res2.size());
-            for (int i = 0; i < res1.size(); i++) {
-                for (int j = 0; j < res1[i].size(); j++) {
-                    assertEquals(res1[i][j], res2[i][j]);
-                }
-            }
+            assertQueryRowsMatchCount(
+                    """select count(*) from ${systableName}""",
+                    """select * from ${systableName} order by path""",
+                    systableName)
         } else {
-            List<List<Object>> res1 = sql """select * from ${systableName} 
order by path, reference_snapshot_id"""
-            List<List<Object>> res2 = sql """select * from iceberg_meta(
-                "table" = "${catalog_name}.${db_name}.${table}",
-                "query_type" = "${systableType}") order by path, 
reference_snapshot_id"""
-            assertEquals(res1.size(), res2.size());
-            for (int i = 0; i < res1.size(); i++) {
-                for (int j = 0; j < res1[i].size(); j++) {
-                    assertEquals(res1[i][j], res2[i][j]);
-                }
-            }
+            assertQueryRowsMatchCount(
+                    """select count(*) from ${systableName}""",
+                    """select * from ${systableName} order by path, 
reference_snapshot_id""",
+                    systableName)
         }
     }
 
@@ -313,14 +256,10 @@ suite("test_iceberg_sys_table", "p0,external") {
         }
 
         order_qt_select_partitions_count """select count(*) from 
${systableName}"""
-
-        
-        List<List<Object>> res1 = sql """select * from ${systableName};"""
-        List<List<Object>> res2 = sql """select * from iceberg_meta(
-            "table" = "${catalog_name}.${db_name}.${table}",
-            "query_type" = "partitions");"""
-        assertEquals(res1.size(), res2.size());
-        // just test can be selected successully
+        assertQueryRowsMatchCount(
+                """select count(*) from ${systableName}""",
+                """select * from ${systableName};""",
+                systableName)
     }
 
     def test_table_systables = { table ->
@@ -374,14 +313,6 @@ suite("test_iceberg_sys_table", "p0,external") {
     sql """create database if not exists internal.regression_test"""
     sql """grant select_priv on internal.regression_test.* to ${user}""" 
     connect(user, "${pwd}", context.config.jdbcUrl) {
-        test {
-              sql """
-                 select committed_at, snapshot_id, parent_id, operation from 
iceberg_meta(
-                                             "table" = 
"${catalog_name}.${db_name}.test_iceberg_systable_tbl1",
-                                             "query_type" = "snapshots");
-              """
-              exception "denied"
-        }
         test {
               sql """
                  select committed_at, snapshot_id, parent_id, operation from 
${catalog_name}.${db_name}.test_iceberg_systable_tbl1\$snapshots
@@ -391,11 +322,6 @@ suite("test_iceberg_sys_table", "p0,external") {
     }
     sql """grant select_priv on 
${catalog_name}.${db_name}.test_iceberg_systable_tbl1 to ${user}"""
     connect(user, "${pwd}", context.config.jdbcUrl) {
-        sql """
-           select committed_at, snapshot_id, parent_id, operation from 
iceberg_meta(
-                                       "table" = 
"${catalog_name}.${db_name}.test_iceberg_systable_tbl1",
-                                       "query_type" = "snapshots");
-        """
         sql """select committed_at, snapshot_id, parent_id, operation from 
${catalog_name}.${db_name}.test_iceberg_systable_tbl1\$snapshots"""
     }
     try_sql("DROP USER ${user}")
@@ -417,5 +343,13 @@ suite("test_iceberg_sys_table", "p0,external") {
     sql """use ${db_name}"""
 
     order_qt_varbinary_sys_table_desc """desc 
test_iceberg_systable_unpartitioned\$files"""
-    order_qt_varbinary_sys_table_select """select content, file_format, 
record_count, lower_bounds, upper_bounds from 
test_iceberg_systable_unpartitioned\$files;"""
+    List<List<Object>> varbinaryRows = sql """
+        select content, file_format, record_count, lower_bounds, upper_bounds
+        from test_iceberg_systable_unpartitioned\$files;
+    """
+    assertTrue(varbinaryRows.size() > 0, "Varbinary system table query should 
return data")
+    assertTrue(String.valueOf(varbinaryRows[0][3]).contains("0x"),
+            "Expected lower_bounds to use varbinary hex output")
+    assertTrue(String.valueOf(varbinaryRows[0][4]).contains("0x"),
+            "Expected upper_bounds to use varbinary hex output")
 }
diff --git 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_sys_table_auth.groovy
 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_sys_table_auth.groovy
index e2b059b78cb..99f035f7674 100644
--- 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_sys_table_auth.groovy
+++ 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_sys_table_auth.groovy
@@ -70,15 +70,6 @@ suite("test_iceberg_sys_table_auth", "p0,external") {
 
     // Test that user without table permission cannot query system tables
     connect(user, "${pwd}", context.config.jdbcUrl) {
-        // Test snapshots system table via iceberg_meta function
-        test {
-              sql """
-                 select committed_at, snapshot_id, parent_id, operation from 
iceberg_meta(
-                                             "table" = 
"${catalog_name}.${db_name}.test_iceberg_systable_auth_tbl1",
-                                             "query_type" = "snapshots");
-              """
-              exception "denied"
-        }
         // Test snapshots system table via direct access
         test {
               sql """
@@ -86,15 +77,6 @@ suite("test_iceberg_sys_table_auth", "p0,external") {
               """
               exception "denied"
         }
-        // Test files system table via iceberg_meta function
-        test {
-              sql """
-                 select * from iceberg_meta(
-                                             "table" = 
"${catalog_name}.${db_name}.test_iceberg_systable_auth_tbl1",
-                                             "query_type" = "files");
-              """
-              exception "denied"
-        }
         // Test files system table via direct access
         test {
               sql """
@@ -102,15 +84,6 @@ suite("test_iceberg_sys_table_auth", "p0,external") {
               """
               exception "denied"
         }
-        // Test entries system table via iceberg_meta function
-        test {
-              sql """
-                 select * from iceberg_meta(
-                                             "table" = 
"${catalog_name}.${db_name}.test_iceberg_systable_auth_tbl1",
-                                             "query_type" = "entries");
-              """
-              exception "denied"
-        }
         // Test entries system table via direct access
         test {
               sql """
@@ -118,15 +91,6 @@ suite("test_iceberg_sys_table_auth", "p0,external") {
               """
               exception "denied"
         }
-        // Test history system table via iceberg_meta function
-        test {
-              sql """
-                 select * from iceberg_meta(
-                                             "table" = 
"${catalog_name}.${db_name}.test_iceberg_systable_auth_tbl1",
-                                             "query_type" = "history");
-              """
-              exception "denied"
-        }
         // Test history system table via direct access
         test {
               sql """
@@ -139,38 +103,10 @@ suite("test_iceberg_sys_table_auth", "p0,external") {
     // Grant permission and verify user can query system tables
     sql """GRANT SELECT_PRIV ON 
${catalog_name}.${db_name}.test_iceberg_systable_auth_tbl1 TO '${user}'@'%'"""
     connect(user, "${pwd}", context.config.jdbcUrl) {
-        // Test snapshots system table with permission
-        sql """
-           select committed_at, snapshot_id, parent_id, operation from 
iceberg_meta(
-                                       "table" = 
"${catalog_name}.${db_name}.test_iceberg_systable_auth_tbl1",
-                                       "query_type" = "snapshots");
-        """
         sql """select committed_at, snapshot_id, parent_id, operation from 
${catalog_name}.${db_name}.test_iceberg_systable_auth_tbl1\$snapshots"""
-        
-        // Test files system table with permission
-        sql """
-           select * from iceberg_meta(
-                                       "table" = 
"${catalog_name}.${db_name}.test_iceberg_systable_auth_tbl1",
-                                       "query_type" = "files");
-        """
         sql """select * from 
${catalog_name}.${db_name}.test_iceberg_systable_auth_tbl1\$files"""
-        
-        // Test entries system table with permission
-        sql """
-           select * from iceberg_meta(
-                                       "table" = 
"${catalog_name}.${db_name}.test_iceberg_systable_auth_tbl1",
-                                       "query_type" = "entries");
-        """
         sql """select * from 
${catalog_name}.${db_name}.test_iceberg_systable_auth_tbl1\$entries"""
-        
-        // Test history system table with permission
-        sql """
-           select * from iceberg_meta(
-                                       "table" = 
"${catalog_name}.${db_name}.test_iceberg_systable_auth_tbl1",
-                                       "query_type" = "history");
-        """
         sql """select * from 
${catalog_name}.${db_name}.test_iceberg_systable_auth_tbl1\$history"""
     }
     try_sql("DROP USER '${user}'@'%'")
 }
-
diff --git 
a/regression-test/suites/external_table_p0/polaris/test_polaris.groovy 
b/regression-test/suites/external_table_p0/polaris/test_polaris.groovy
index b1bb71a31c4..e8fe34af37e 100644
--- a/regression-test/suites/external_table_p0/polaris/test_polaris.groovy
+++ b/regression-test/suites/external_table_p0/polaris/test_polaris.groovy
@@ -158,10 +158,7 @@ suite("test_polaris", "p0,external") {
 
         // =======  TIME TRAVEL TEST  =======
         def iceberg_meta_result = sql """
-        SELECT snapshot_id FROM iceberg_meta(
-                'table' = '${catalog_name}.${db_name}.${table_name}',
-                'query_type' = 'snapshots'
-        ) order by committed_at desc;
+        SELECT snapshot_id FROM 
${catalog_name}.${db_name}.${table_name}\$snapshots order by committed_at desc;
         """
         def first_snapshot_id = iceberg_meta_result.get(0).get(0);
         def time_travel = sql """
diff --git 
a/regression-test/suites/external_table_p0/refactor_storage_param/iceberg_rest_on_hdfs.groovy
 
b/regression-test/suites/external_table_p0/refactor_storage_param/iceberg_rest_on_hdfs.groovy
index d2b6cee8965..0bfba69e9a5 100644
--- 
a/regression-test/suites/external_table_p0/refactor_storage_param/iceberg_rest_on_hdfs.groovy
+++ 
b/regression-test/suites/external_table_p0/refactor_storage_param/iceberg_rest_on_hdfs.groovy
@@ -158,10 +158,7 @@ suite("iceberg_rest_on_hdfs", "p0,external") {
 
         // =======  TIME TRAVEL TEST  =======
         def iceberg_meta_result = sql """
-        SELECT snapshot_id FROM iceberg_meta(
-                'table' = '${catalog_name}.${db_name}.${table_name}',
-                'query_type' = 'snapshots'
-        ) order by committed_at desc;
+        SELECT snapshot_id FROM 
${catalog_name}.${db_name}.${table_name}\$snapshots order by committed_at desc;
         """
         def first_snapshot_id = iceberg_meta_result.get(0).get(0);
         def time_travel = sql """
diff --git 
a/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_on_hms_and_filesystem_and_dlf.groovy
 
b/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_on_hms_and_filesystem_and_dlf.groovy
index 8032a42b2a3..45215c76d12 100644
--- 
a/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_on_hms_and_filesystem_and_dlf.groovy
+++ 
b/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_on_hms_and_filesystem_and_dlf.groovy
@@ -128,10 +128,7 @@ suite("iceberg_on_hms_and_filesystem_and_dlf", 
"p2,external") {
             println "iceberg_meta_result SUCCESS" + catalog_name
 
             def iceberg_meta_result = sql """
-        SELECT snapshot_id FROM iceberg_meta(
-                'table' = '${catalog_name}.${db_name}.${table_name}',
-                'query_type' = 'snapshots'
-        ) order by committed_at desc;
+        SELECT snapshot_id FROM 
${catalog_name}.${db_name}.${table_name}\$snapshots order by committed_at desc;
         
         """
             def first_snapshot_id = iceberg_meta_result.get(0).get(0);
@@ -291,10 +288,7 @@ suite("iceberg_on_hms_and_filesystem_and_dlf", 
"p2,external") {
             println "iceberg_meta_result SUCCESS" + catalog_name
 
             def iceberg_meta_result = sql """
-        SELECT snapshot_id FROM iceberg_meta(
-                'table' = '${catalog_name}.${db_name}.${table_name}',
-                'query_type' = 'snapshots'
-        ) order by committed_at desc;
+        SELECT snapshot_id FROM 
${catalog_name}.${db_name}.${table_name}\$snapshots order by committed_at desc;
         
         """
             def first_snapshot_id = iceberg_meta_result.get(0).get(0);
diff --git 
a/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_rest_s3_storage_vended_test.groovy
 
b/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_rest_s3_storage_vended_test.groovy
index 7584e388ff2..3954bfdadb4 100644
--- 
a/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_rest_s3_storage_vended_test.groovy
+++ 
b/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_rest_s3_storage_vended_test.groovy
@@ -160,10 +160,7 @@ suite("iceberg_rest_s3_storage_vended_test", 
"p2,external") {
 
         // =======  TIME TRAVEL TEST  =======
         def iceberg_meta_result = sql """
-        SELECT snapshot_id FROM iceberg_meta(
-                'table' = '${catalog_name}.${db_name}.${table_name}',
-                'query_type' = 'snapshots'
-        ) order by committed_at desc;
+        SELECT snapshot_id FROM 
${catalog_name}.${db_name}.${table_name}\$snapshots order by committed_at desc;
         """
         def first_snapshot_id = iceberg_meta_result.get(0).get(0);
         def time_travel = sql """
diff --git 
a/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_rest_storage_test.groovy
 
b/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_rest_storage_test.groovy
index d4fc6b1be44..955637db101 100644
--- 
a/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_rest_storage_test.groovy
+++ 
b/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_rest_storage_test.groovy
@@ -158,10 +158,7 @@ suite("iceberg_rest_storage_test", "p2,external") {
 
         // =======  TIME TRAVEL TEST  =======
         def iceberg_meta_result = sql """
-        SELECT snapshot_id FROM iceberg_meta(
-                'table' = '${catalog_name}.${db_name}.${table_name}',
-                'query_type' = 'snapshots'
-        ) order by committed_at desc;
+        SELECT snapshot_id FROM 
${catalog_name}.${db_name}.${table_name}\$snapshots order by committed_at desc;
         """
         def first_snapshot_id = iceberg_meta_result.get(0).get(0);
         def time_travel = sql """
diff --git 
a/regression-test/suites/external_table_p2/tvf/test_iceberg_meta.groovy 
b/regression-test/suites/external_table_p2/tvf/test_iceberg_meta.groovy
deleted file mode 100644
index c6f16fe2395..00000000000
--- a/regression-test/suites/external_table_p2/tvf/test_iceberg_meta.groovy
+++ /dev/null
@@ -1,90 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-suite("test_iceberg_meta", "p2,external") {
-    String suiteName = "test_iceberg_meta"
-    Boolean ignoreP2 = true;
-    if (ignoreP2) {
-        logger.info("disable p2 test");
-        return;
-    }
-
-    String enabled = context.config.otherConfigs.get("enableExternalHiveTest")
-    if (enabled != null && enabled.equalsIgnoreCase("true")) {
-        String iceberg_catalog_name = "test_iceberg_meta_tvf"
-        String extHiveHmsHost = 
context.config.otherConfigs.get("extHiveHmsHost")
-        String extHdfsPort = context.config.otherConfigs.get("extHdfsPort")
-        String db = "multi_catalog"
-        sql """drop catalog if exists ${iceberg_catalog_name};"""
-        sql """
-            create catalog if not exists ${iceberg_catalog_name} properties (
-                'type'='iceberg',
-                'iceberg.catalog.type'='hadoop',
-                'warehouse' = 
'hdfs://${extHiveHmsHost}:${extHdfsPort}/usr/hive/warehouse/hadoop_catalog'
-            );
-        """
-
-        sql """switch ${iceberg_catalog_name};"""
-        sql """ use `${db}`; """
-
-        order_qt_q01 """ select count(*) from iceberg_hadoop_catalog """
-        order_qt_q02 """ select c_custkey from iceberg_hadoop_catalog group by 
c_custkey order by c_custkey limit 7 """
-
-        order_qt_tvf_1 """ select committed_at, snapshot_id, parent_id, 
operation from iceberg_meta(
-                            "table" = 
"${iceberg_catalog_name}.${db}.multi_partition",
-                            "query_type" = "snapshots");
-                        """
-
-        order_qt_tvf_2 """ select committed_at, snapshot_id, parent_id, 
operation from iceberg_meta(
-                            "table" = 
"${iceberg_catalog_name}.${db}.multi_partition",
-                            "query_type" = "snapshots")
-                            where snapshot_id = 7235593032487457798;
-                        """
-         String user = "${suiteName}_user"
-         String pwd = 'C123_567p'
-         try_sql("DROP USER ${user}")
-         sql """CREATE USER '${user}' IDENTIFIED BY '${pwd}'"""
-         //cloud-mode
-         if (isCloudMode()) {
-             def clusters = sql " SHOW CLUSTERS; "
-             assertTrue(!clusters.isEmpty())
-             def validCluster = clusters[0][0]
-             sql """GRANT USAGE_PRIV ON CLUSTER `${validCluster}` TO 
${user}""";
-         }
-
-         sql """grant select_priv on regression_test to ${user}"""
-         connect(user=user, password="${pwd}", url=context.config.jdbcUrl) {
-             test {
-                   sql """
-                      select committed_at, snapshot_id, parent_id, operation 
from iceberg_meta(
-                                                  "table" = 
"${iceberg_catalog_name}.${db}.multi_partition",
-                                                  "query_type" = "snapshots");
-                   """
-                   exception "denied"
-             }
-         }
-         sql """grant select_priv on 
${iceberg_catalog_name}.${db}.multi_partition to ${user}"""
-         connect(user=user, password="${pwd}", url=context.config.jdbcUrl) {
-           sql """
-              select committed_at, snapshot_id, parent_id, operation from 
iceberg_meta(
-                                          "table" = 
"${iceberg_catalog_name}.${db}.multi_partition",
-                                          "query_type" = "snapshots");
-           """
-         }
-         try_sql("DROP USER ${user}")
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to