This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d9883e5c0f [Feature](Iceberg) Implement expire_snapshots procedure 
for Iceberg tables (#59979)
6d9883e5c0f is described below

commit 6d9883e5c0fea605d685d87a93613339e4c0a996
Author: Socrates <[email protected]>
AuthorDate: Wed Feb 4 23:50:28 2026 +0800

    [Feature](Iceberg) Implement expire_snapshots procedure for Iceberg tables 
(#59979)
    
    ### What problem does this PR solve?
    
    - Issue Number:  #58199
    
    ## Summary
    
    This PR implements the `expire_snapshots` procedure for Iceberg tables,
    following the Apache Iceberg Spark procedure specification. This
    procedure removes old snapshots from Iceberg tables to free up storage
    space and improve metadata performance.
    
    ## Changes
    
    ### Main Implementation
    - **File:**
    
`fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExpireSnapshotsAction.java`
    - Implemented `executeAction()` method to expire snapshots using
    Iceberg's `ExpireSnapshots` API
    - Added `getResultSchema()` method returning 6-column output matching
    Spark's schema
    - Added `parseTimestamp()` helper method to support ISO datetime and
    milliseconds formats
      - Updated validation to allow `snapshot_ids` as a standalone parameter
    - Fixed `retain_last` behavior: when specified alone, automatically sets
    `expireOlderThan` to current time
    
    ### Supported Parameters
    | Parameter | Description |
    |-----------|-------------|
    | `older_than` | Timestamp before which snapshots will be removed (ISO
    datetime or milliseconds) |
    | `retain_last` | Number of ancestor snapshots to preserve |
    | `snapshot_ids` | Comma-separated list of specific snapshot IDs to
    expire |
    | `max_concurrent_deletes` | Size of thread pool for delete operations |
    | `clean_expired_metadata` | When true, cleans up unused partition specs
    and schemas |
    
    ### Output Schema
    The procedure returns 6 columns:
    - `deleted_data_files_count`
    - `deleted_position_delete_files_count`
    - `deleted_equality_delete_files_count`
    - `deleted_manifest_files_count`
    - `deleted_manifest_lists_count`
    - `deleted_statistics_files_count`
    
    ### Test Updates
    - **File:**
    
`regression-test/suites/external_table_p0/iceberg/action/test_iceberg_execute_actions.groovy`
    - Added functional tests for `expire_snapshots` with `retain_last`
    parameter
      - Added validation tests for `snapshot_ids` parameter
      - Updated error message expectations
    
    ## Usage Example
    
    ```sql
    -- Expire snapshots, keeping only the last 2
    ALTER TABLE catalog.db.table EXECUTE expire_snapshots("retain_last" = "2");
    
    -- Expire snapshots older than a specific timestamp
    ALTER TABLE catalog.db.table EXECUTE expire_snapshots("older_than" = 
"2024-01-01T00:00:00");
    
    -- Expire specific snapshots by ID
    ALTER TABLE catalog.db.table EXECUTE expire_snapshots("snapshot_ids" = 
"123456789,987654321");
    
    -- Combine parameters
    ALTER TABLE catalog.db.table EXECUTE expire_snapshots("older_than" = 
"2024-06-01T00:00:00", "retain_last" = "5");
    ```
---
 .../action/IcebergExpireSnapshotsAction.java       | 221 ++++++++++++++-
 .../action/test_iceberg_expire_snapshots.out       |  10 +
 .../action/test_iceberg_execute_actions.groovy     |  99 -------
 .../action/test_iceberg_expire_snapshots.groovy    | 299 +++++++++++++++++++++
 .../iceberg_branch_retention_and_snapshot.groovy   |  18 +-
 5 files changed, 532 insertions(+), 115 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExpireSnapshotsAction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExpireSnapshotsAction.java
index 77ea784eadc..0937af8ba4c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExpireSnapshotsAction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExpireSnapshotsAction.java
@@ -17,20 +17,41 @@
 
 package org.apache.doris.datasource.iceberg.action;
 
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.ArgumentParsers;
-import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.iceberg.IcebergExternalTable;
 import org.apache.doris.info.PartitionNamesInfo;
 import org.apache.doris.nereids.trees.expressions.Expression;
 
+import com.google.common.collect.Lists;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.ExpireSnapshots;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.SupportsBulkOperations;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.time.LocalDateTime;
+import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
 import java.time.format.DateTimeParseException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Iceberg expire snapshots action implementation.
@@ -39,10 +60,10 @@ import java.util.Optional;
  * and improve metadata performance.
  */
 public class IcebergExpireSnapshotsAction extends BaseIcebergAction {
+    private static final Logger LOG = 
LogManager.getLogger(IcebergExpireSnapshotsAction.class);
     public static final String OLDER_THAN = "older_than";
     public static final String RETAIN_LAST = "retain_last";
     public static final String MAX_CONCURRENT_DELETES = 
"max_concurrent_deletes";
-    public static final String STREAM_RESULTS = "stream_results";
     public static final String SNAPSHOT_IDS = "snapshot_ids";
     public static final String CLEAN_EXPIRED_METADATA = 
"clean_expired_metadata";
 
@@ -62,11 +83,9 @@ public class IcebergExpireSnapshotsAction extends 
BaseIcebergAction {
                 "Number of ancestor snapshots to preserve regardless of 
older_than",
                 null, ArgumentParsers.positiveInt(RETAIN_LAST));
         namedArguments.registerOptionalArgument(MAX_CONCURRENT_DELETES,
-                "Size of the thread pool used for delete file actions",
-                null, ArgumentParsers.positiveInt(MAX_CONCURRENT_DELETES));
-        namedArguments.registerOptionalArgument(STREAM_RESULTS,
-                "When true, deletion files will be sent to Spark driver by RDD 
partition",
-                null, ArgumentParsers.booleanValue(STREAM_RESULTS));
+                "Size of the thread pool used for delete file actions (0 
disables, "
+                        + "ignored for FileIOs that support bulk deletes)",
+                0, ArgumentParsers.intRange(MAX_CONCURRENT_DELETES, 0, 
Integer.MAX_VALUE));
         namedArguments.registerOptionalArgument(SNAPSHOT_IDS,
                 "Array of snapshot IDs to expire",
                 null, ArgumentParsers.nonEmptyString(SNAPSHOT_IDS));
@@ -103,9 +122,24 @@ public class IcebergExpireSnapshotsAction extends 
BaseIcebergAction {
             throw new AnalysisException("retain_last must be at least 1");
         }
 
-        // At least one of older_than or retain_last must be specified for 
validation
-        if (olderThan == null && retainLast == null) {
-            throw new AnalysisException("At least one of 'older_than' or 
'retain_last' must be specified");
+        // Get snapshot_ids for validation
+        String snapshotIds = namedArguments.getString(SNAPSHOT_IDS);
+
+        // Validate snapshot_ids format if provided
+        if (snapshotIds != null) {
+            for (String idStr : snapshotIds.split(",")) {
+                try {
+                    Long.parseLong(idStr.trim());
+                } catch (NumberFormatException e) {
+                    throw new AnalysisException("Invalid snapshot_id format: " 
+ idStr.trim());
+                }
+            }
+        }
+
+        // At least one of older_than, retain_last, or snapshot_ids must be 
specified
+        if (olderThan == null && retainLast == null && snapshotIds == null) {
+            throw new AnalysisException("At least one of 'older_than', 
'retain_last', or "
+                    + "'snapshot_ids' must be specified");
         }
 
         // Iceberg procedures don't support partitions or where conditions
@@ -115,7 +149,172 @@ public class IcebergExpireSnapshotsAction extends 
BaseIcebergAction {
 
     @Override
     protected List<String> executeAction(TableIf table) throws UserException {
-        throw new DdlException("Iceberg expire_snapshots procedure is not 
implemented yet");
+        Table icebergTable = ((IcebergExternalTable) table).getIcebergTable();
+
+        // Parse parameters
+        String olderThan = namedArguments.getString(OLDER_THAN);
+        Integer retainLast = namedArguments.getInt(RETAIN_LAST);
+        String snapshotIdsStr = namedArguments.getString(SNAPSHOT_IDS);
+        Boolean cleanExpiredMetadata = 
namedArguments.getBoolean(CLEAN_EXPIRED_METADATA);
+        Integer maxConcurrentDeletes = 
namedArguments.getInt(MAX_CONCURRENT_DELETES);
+
+        // Track deleted file counts using callbacks (matching Spark's 
6-column schema)
+        AtomicLong deletedDataFilesCount = new AtomicLong(0);
+        AtomicLong deletedPositionDeleteFilesCount = new AtomicLong(0);
+        AtomicLong deletedEqualityDeleteFilesCount = new AtomicLong(0);
+        AtomicLong deletedManifestFilesCount = new AtomicLong(0);
+        AtomicLong deletedManifestListsCount = new AtomicLong(0);
+        AtomicLong deletedStatisticsFilesCount = new AtomicLong(0);
+
+        ExecutorService deleteExecutor = null;
+        try {
+            Map<String, FileContent> deleteFileContentByPath =
+                    buildDeleteFileContentMap(icebergTable);
+            ExpireSnapshots expireSnapshots = icebergTable.expireSnapshots();
+
+            // Configure older_than timestamp
+            // If retain_last is specified without older_than, use current 
time as the cutoff
+            // This is because Iceberg's retainLast only works in conjunction 
with expireOlderThan
+            if (olderThan != null) {
+                long timestampMillis = parseTimestamp(olderThan);
+                expireSnapshots.expireOlderThan(timestampMillis);
+            } else if (retainLast != null && snapshotIdsStr == null) {
+                // When only retain_last is specified, expire all snapshots 
older than now
+                // but keep at least retain_last snapshots
+                expireSnapshots.expireOlderThan(System.currentTimeMillis());
+            }
+
+            // Configure retain_last
+            if (retainLast != null) {
+                expireSnapshots.retainLast(retainLast);
+            }
+
+            // Configure specific snapshot IDs to expire
+            if (snapshotIdsStr != null) {
+                for (String idStr : snapshotIdsStr.split(",")) {
+                    
expireSnapshots.expireSnapshotId(Long.parseLong(idStr.trim()));
+                }
+            }
+
+            // Configure clean expired metadata
+            if (cleanExpiredMetadata != null) {
+                expireSnapshots.cleanExpiredMetadata(cleanExpiredMetadata);
+            }
+
+            // Set up ExecutorService for concurrent deletes if specified
+            if (maxConcurrentDeletes > 0) {
+                if (icebergTable.io() instanceof SupportsBulkOperations) {
+                    LOG.warn("max_concurrent_deletes only works with FileIOs 
that do not support "
+                            + "bulk deletes. This table is currently using {} 
which supports bulk deletes "
+                            + "so the parameter will be ignored.",
+                            icebergTable.io().getClass().getName());
+                } else {
+                    deleteExecutor = 
Executors.newFixedThreadPool(maxConcurrentDeletes);
+                    expireSnapshots.executeDeleteWith(deleteExecutor);
+                }
+            }
+
+            // Set up delete callback to count files by type
+            expireSnapshots.deleteWith(path -> {
+                FileContent deleteContent = deleteFileContentByPath.get(path);
+                if (deleteContent == FileContent.POSITION_DELETES) {
+                    deletedPositionDeleteFilesCount.incrementAndGet();
+                } else if (deleteContent == FileContent.EQUALITY_DELETES) {
+                    deletedEqualityDeleteFilesCount.incrementAndGet();
+                } else if (path.contains("-m-") && path.endsWith(".avro")) {
+                    deletedManifestFilesCount.incrementAndGet();
+                } else if (path.contains("snap-") && path.endsWith(".avro")) {
+                    deletedManifestListsCount.incrementAndGet();
+                } else if (path.endsWith(".stats") || 
path.contains("statistics")) {
+                    deletedStatisticsFilesCount.incrementAndGet();
+                } else {
+                    deletedDataFilesCount.incrementAndGet();
+                }
+                icebergTable.io().deleteFile(path);
+            });
+
+            // Execute and commit
+            expireSnapshots.commit();
+
+            // Invalidate cache
+            Env.getCurrentEnv().getExtMetaCacheMgr()
+                .invalidateTableCache((ExternalTable) table);
+
+            return Lists.newArrayList(
+                String.valueOf(deletedDataFilesCount.get()),
+                String.valueOf(deletedPositionDeleteFilesCount.get()),
+                String.valueOf(deletedEqualityDeleteFilesCount.get()),
+                String.valueOf(deletedManifestFilesCount.get()),
+                String.valueOf(deletedManifestListsCount.get()),
+                String.valueOf(deletedStatisticsFilesCount.get())
+            );
+        } catch (Exception e) {
+            throw new UserException("Failed to expire snapshots: " + 
e.getMessage(), e);
+        } finally {
+            // Shutdown executor if created
+            if (deleteExecutor != null) {
+                deleteExecutor.shutdown();
+            }
+        }
+    }
+
+    /**
+     * Parse timestamp string to milliseconds since epoch.
+     * Supports ISO datetime format (yyyy-MM-ddTHH:mm:ss) or milliseconds.
+     */
+    private long parseTimestamp(String timestamp) {
+        try {
+            // Try ISO datetime format
+            LocalDateTime dateTime = LocalDateTime.parse(timestamp,
+                    DateTimeFormatter.ISO_LOCAL_DATE_TIME);
+            return dateTime.atZone(ZoneId.systemDefault())
+                .toInstant().toEpochMilli();
+        } catch (DateTimeParseException e) {
+            // Try as milliseconds
+            return Long.parseLong(timestamp);
+        }
+    }
+
+    private Map<String, FileContent> buildDeleteFileContentMap(Table 
icebergTable) throws UserException {
+        Map<String, FileContent> deleteFileContentByPath = new HashMap<>();
+        try {
+            for (org.apache.iceberg.Snapshot snapshot : 
icebergTable.snapshots()) {
+                List<ManifestFile> deleteManifests = 
snapshot.deleteManifests(icebergTable.io());
+                if (deleteManifests == null || deleteManifests.isEmpty()) {
+                    continue;
+                }
+                for (ManifestFile manifest : deleteManifests) {
+                    try (CloseableIterable<DeleteFile> deleteFiles = 
ManifestFiles.readDeleteManifest(
+                            manifest, icebergTable.io(), 
icebergTable.specs())) {
+                        for (DeleteFile deleteFile : deleteFiles) {
+                            deleteFileContentByPath.putIfAbsent(
+                                    deleteFile.location(), 
deleteFile.content());
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            throw new UserException("Failed to build delete file content map: 
" + e.getMessage(), e);
+        }
+        return deleteFileContentByPath;
+    }
+
+    @Override
+    protected List<Column> getResultSchema() {
+        return Lists.newArrayList(
+            new Column("deleted_data_files_count", Type.BIGINT, false,
+                "Number of data files deleted"),
+            new Column("deleted_position_delete_files_count", Type.BIGINT, 
false,
+                "Number of position delete files deleted"),
+            new Column("deleted_equality_delete_files_count", Type.BIGINT, 
false,
+                "Number of equality delete files deleted"),
+            new Column("deleted_manifest_files_count", Type.BIGINT, false,
+                "Number of manifest files deleted"),
+            new Column("deleted_manifest_lists_count", Type.BIGINT, false,
+                "Number of manifest list files deleted"),
+            new Column("deleted_statistics_files_count", Type.BIGINT, false,
+                "Number of statistics files deleted")
+        );
     }
 
     @Override
diff --git 
a/regression-test/data/external_table_p0/iceberg/action/test_iceberg_expire_snapshots.out
 
b/regression-test/data/external_table_p0/iceberg/action/test_iceberg_expire_snapshots.out
new file mode 100644
index 00000000000..4ef68442a19
--- /dev/null
+++ 
b/regression-test/data/external_table_p0/iceberg/action/test_iceberg_expire_snapshots.out
@@ -0,0 +1,10 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !expire_snapshots_result --
+0      0       0       0       2       0
+
+-- !after_expire_snapshots --
+1      data1
+2      data2
+3      data3
+4      data4
+
diff --git 
a/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_execute_actions.groovy
 
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_execute_actions.groovy
index 00906456633..223d6f78297 100644
--- 
a/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_execute_actions.groovy
+++ 
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_execute_actions.groovy
@@ -395,15 +395,6 @@ suite("test_iceberg_optimize_actions_ddl", 
"p0,external,doris,external_docker,ex
     qt_after_fast_forword_branch """SELECT * FROM 
test_fast_forward@branch(feature_branch) ORDER BY id"""
 
 
-    // Test expire_snapshots action
-    test {
-        sql """
-            ALTER TABLE ${catalog_name}.${db_name}.${table_name} EXECUTE 
expire_snapshots
-            ("older_than" = "2024-01-01T00:00:00")
-        """
-        exception "Iceberg expire_snapshots procedure is not implemented yet"
-    }
-
     // Test validation - missing required property
     test {
         sql """
@@ -449,87 +440,6 @@ suite("test_iceberg_optimize_actions_ddl", 
"p0,external,doris,external_docker,ex
         exception "Missing required argument: timestamp"
     }
 
-    // Test expire_snapshots with invalid older_than timestamp
-    test {
-        sql """
-            ALTER TABLE ${catalog_name}.${db_name}.${table_name} EXECUTE 
expire_snapshots
-            ("older_than" = "not-a-timestamp")
-        """
-        exception "Invalid older_than format"
-    }
-
-    // Test expire_snapshots with negative timestamp
-    test {
-        sql """
-            ALTER TABLE ${catalog_name}.${db_name}.${table_name} EXECUTE 
expire_snapshots
-            ("older_than" = "-1000")
-        """
-        exception "older_than timestamp must be non-negative"
-    }
-
-    // Test validation - retain_last must be at least 1
-    test {
-        sql """
-            ALTER TABLE ${catalog_name}.${db_name}.${table_name} EXECUTE 
expire_snapshots
-            ("retain_last" = "0")
-        """
-        exception "retain_last must be positive, got: 0"
-    }
-
-    // Test expire_snapshots with invalid retain_last format
-    test {
-        sql """
-            ALTER TABLE ${catalog_name}.${db_name}.${table_name} EXECUTE 
expire_snapshots
-            ("retain_last" = "not-a-number")
-        """
-        exception "Invalid retain_last format: not-a-number"
-    }
-
-    // Test expire_snapshots with negative retain_last
-    test {
-        sql """
-            ALTER TABLE ${catalog_name}.${db_name}.${table_name} EXECUTE 
expire_snapshots
-            ("retain_last" = "-5")
-        """
-        exception "retain_last must be positive, got: -5"
-    }
-
-    // Test expire_snapshots with neither older_than nor retain_last
-    test {
-        sql """
-            ALTER TABLE ${catalog_name}.${db_name}.${table_name} EXECUTE 
expire_snapshots
-            ()
-        """
-        exception "At least one of 'older_than' or 'retain_last' must be 
specified"
-    }
-
-    // Test expire_snapshots with valid timestamp format (milliseconds)
-    test {
-        sql """
-            ALTER TABLE ${catalog_name}.${db_name}.${table_name} EXECUTE 
expire_snapshots
-            ("older_than" = "1640995200000")
-        """
-        exception "Iceberg expire_snapshots procedure is not implemented yet"
-    }
-
-    // Test expire_snapshots with valid ISO datetime
-    test {
-        sql """
-            ALTER TABLE ${catalog_name}.${db_name}.${table_name} EXECUTE 
expire_snapshots
-            ("older_than" = "2024-01-01T12:30:45")
-        """
-        exception "Iceberg expire_snapshots procedure is not implemented yet"
-    }
-
-    // Test expire_snapshots with valid retain_last and older_than
-    test {
-        sql """
-            ALTER TABLE ${catalog_name}.${db_name}.${table_name} EXECUTE 
expire_snapshots
-            ("older_than" = "2024-01-01T00:00:00", "retain_last" = "5")
-        """
-        exception "Iceberg expire_snapshots procedure is not implemented yet"
-    }
-
     // Test unknown action
     test {
         sql """
@@ -628,15 +538,6 @@ suite("test_iceberg_optimize_actions_ddl", 
"p0,external,doris,external_docker,ex
         exception "Snapshot 123456789 not found in table"
     }
 
-    // Test with multiple partitions
-    test {
-        sql """
-            ALTER TABLE ${catalog_name}.${db_name}.${table_name} EXECUTE 
expire_snapshots
-            ("older_than" = "2024-01-01T00:00:00") PARTITIONS (p1, p2, p3)
-        """
-        exception "Action 'expire_snapshots' does not support partition 
specification"
-    }
-
     // 
=====================================================================================
 // Test Case 6: publish_changes action with WAP (Write-Audit-Publish) pattern
 // Simplified workflow:
diff --git 
a/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_expire_snapshots.groovy
 
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_expire_snapshots.groovy
new file mode 100644
index 00000000000..2470dc16b39
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_expire_snapshots.groovy
@@ -0,0 +1,299 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider
+import com.amazonaws.auth.BasicAWSCredentials
+import com.amazonaws.client.builder.AwsClientBuilder
+import com.amazonaws.services.s3.AmazonS3
+import com.amazonaws.services.s3.AmazonS3ClientBuilder
+import groovy.json.JsonSlurper
+
+suite("test_iceberg_expire_snapshots", 
"p0,external,doris,external_docker,external_docker_doris") {
+    String enabled = context.config.otherConfigs.get("enableIcebergTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disable iceberg test.")
+        return
+    }
+
+    String catalog_name = "test_iceberg_expire_snapshots"
+    String db_name = "test_db"
+    String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
+    String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    sql """drop catalog if exists ${catalog_name}"""
+    sql """
+    CREATE CATALOG ${catalog_name} PROPERTIES (
+        'type'='iceberg',
+        'iceberg.catalog.type'='rest',
+        'uri' = 'http://${externalEnvIp}:${rest_port}',
+        "s3.access_key" = "admin",
+        "s3.secret_key" = "password",
+        "s3.endpoint" = "http://${externalEnvIp}:${minio_port}";,
+        "s3.region" = "us-east-1"
+    );"""
+
+    sql """switch ${catalog_name}"""
+    sql """CREATE DATABASE IF NOT EXISTS ${db_name} """
+    sql """use ${db_name}"""
+
+    def buildIcebergS3Client = { ->
+        def credentials = new BasicAWSCredentials("admin", "password")
+        def endpoint = "http://${externalEnvIp}:${minio_port}";
+        return AmazonS3ClientBuilder.standard()
+                .withEndpointConfiguration(new 
AwsClientBuilder.EndpointConfiguration(endpoint, "us-east-1"))
+                .withCredentials(new AWSStaticCredentialsProvider(credentials))
+                .withPathStyleAccessEnabled(true)
+                .build()
+    }
+
+    def parseS3Path = { String path ->
+        int schemeIdx = path.indexOf("://")
+        assertTrue(schemeIdx > 0, "Unexpected file path: ${path}")
+        String withoutScheme = path.substring(schemeIdx + 3)
+        withoutScheme = withoutScheme.split("\\?")[0].split("#")[0]
+        int slashIdx = withoutScheme.indexOf("/")
+        String bucket = slashIdx > 0 ? withoutScheme.substring(0, slashIdx) : 
withoutScheme
+        String key = slashIdx > 0 ? withoutScheme.substring(slashIdx + 1) : ""
+        return [bucket, key]
+    }
+
+    def readMetadataJson = { AmazonS3 client, String metadataPath ->
+        def (bucket, key) = parseS3Path(metadataPath)
+        def obj = client.getObject(bucket, key)
+        try {
+            String text = obj.objectContent.getText("UTF-8")
+            return new JsonSlurper().parseText(text)
+        } finally {
+            obj.objectContent.close()
+        }
+    }
+
+    // 
=====================================================================================
+    // Test Case 1: expire_snapshots action with retain_last parameter
+    // Tests the ability to expire old snapshots from Iceberg tables
+    // 
=====================================================================================
+    logger.info("Starting expire_snapshots test case")
+
+    // Create test table for expire_snapshots
+    sql """DROP TABLE IF EXISTS ${db_name}.test_expire_snapshots"""
+    sql """
+        CREATE TABLE ${db_name}.test_expire_snapshots (
+            id BIGINT,
+            data STRING
+        ) ENGINE=iceberg
+    """
+
+    // Insert data to create multiple snapshots
+    sql """INSERT INTO ${db_name}.test_expire_snapshots VALUES (1, 'data1')"""
+    sql """INSERT INTO ${db_name}.test_expire_snapshots VALUES (2, 'data2')"""
+    sql """INSERT INTO ${db_name}.test_expire_snapshots VALUES (3, 'data3')"""
+    sql """INSERT INTO ${db_name}.test_expire_snapshots VALUES (4, 'data4')"""
+
+    // Verify 4 snapshots exist
+    List<List<Object>> snapshotsBefore = sql """
+        SELECT snapshot_id FROM test_expire_snapshots\$snapshots ORDER BY 
committed_at
+    """
+    assertTrue(snapshotsBefore.size() == 4, "Expected 4 snapshots before 
expiration")
+    logger.info("Snapshots before expire: ${snapshotsBefore}")
+
+    // Test 1: expire_snapshots with retain_last=2
+    qt_expire_snapshots_result """
+        ALTER TABLE ${catalog_name}.${db_name}.test_expire_snapshots
+        EXECUTE expire_snapshots("retain_last" = "2")
+    """
+
+    // Verify only 2 snapshots remain
+    List<List<Object>> snapshotsAfter = sql """
+        SELECT snapshot_id FROM test_expire_snapshots\$snapshots ORDER BY 
committed_at
+    """
+    assertTrue(snapshotsAfter.size() == 2, "Expected 2 snapshots after 
expiration with retain_last=2")
+    logger.info("Snapshots after expire: ${snapshotsAfter}")
+
+    // Test 2: Verify data is still accessible
+    qt_after_expire_snapshots """SELECT * FROM test_expire_snapshots ORDER BY 
id"""
+
+    logger.info("expire_snapshots test case completed successfully")
+
+    // 
=====================================================================================
+    // Test Case 2: expire_snapshots with older_than parameter
+    // 
=====================================================================================
+    // Test expire_snapshots with older_than (should work, but not expire 
snapshots that are recent)
+    List<List<Object>> expireOlderThanResult = sql """
+        ALTER TABLE ${catalog_name}.${db_name}.test_expire_snapshots
+        EXECUTE expire_snapshots("older_than" = "2024-01-01T00:00:00")
+    """
+    logger.info("Expire older_than result: ${expireOlderThanResult}")
+
+    // 
=====================================================================================
+    // Test Case 3: expire_snapshots should remove expired snapshots and clear 
old files from metadata
+    // 
=====================================================================================
+    String delete_files_table = "test_expire_snapshots_delete_files"
+    sql """DROP TABLE IF EXISTS ${db_name}.${delete_files_table}"""
+    sql """
+        CREATE TABLE ${db_name}.${delete_files_table} (
+            id BIGINT,
+            data STRING
+        ) ENGINE=iceberg
+    """
+    sql """INSERT INTO ${db_name}.${delete_files_table} VALUES (1, 'data1')"""
+
+    List<List<Object>> allDataFilesBefore = sql """SELECT file_path FROM 
${delete_files_table}\$all_data_files"""
+    assertTrue(allDataFilesBefore.size() > 0, "Expected data files after 
initial insert")
+    def oldFiles = allDataFilesBefore.collect { String.valueOf(it[0]) } as Set
+
+    // Overwrite table via Doris
+    sql """INSERT OVERWRITE TABLE ${db_name}.${delete_files_table} VALUES (3, 
'data3')"""
+
+    List<List<Object>> snapshotsBeforeExpire = sql """
+        SELECT snapshot_id FROM ${delete_files_table}\$snapshots ORDER BY 
committed_at
+    """
+    assertTrue(snapshotsBeforeExpire.size() >= 2, "Expected multiple snapshots 
before expiration")
+    List<List<Object>> allDataFilesAfterOverwrite = sql """SELECT file_path 
FROM ${delete_files_table}\$all_data_files"""
+    def allFilesAfterOverwrite = allDataFilesAfterOverwrite.collect { 
String.valueOf(it[0]) } as Set
+    assertTrue(allFilesAfterOverwrite.containsAll(oldFiles),
+            "Expected old files still visible in all_data_files before 
expiration")
+    long expireMillis = System.currentTimeMillis()
+    sql """
+        ALTER TABLE ${catalog_name}.${db_name}.${delete_files_table}
+        EXECUTE expire_snapshots("older_than" = "${expireMillis}", 
"retain_last" = "1")
+    """
+    List<List<Object>> snapshotsAfterExpire = sql """
+        SELECT snapshot_id FROM ${delete_files_table}\$snapshots ORDER BY 
committed_at
+    """
+    assertTrue(snapshotsAfterExpire.size() == 1,
+            "Expected 1 snapshot after expiration, got: 
${snapshotsAfterExpire.size()}")
+    List<List<Object>> allDataFilesAfterExpire = sql """SELECT file_path FROM 
${delete_files_table}\$all_data_files"""
+    def allFilesAfterExpire = allDataFilesAfterExpire.collect { 
String.valueOf(it[0]) } as Set
+    assertTrue(oldFiles.intersect(allFilesAfterExpire).isEmpty(),
+            "Expected old files removed from all_data_files after expiration")
+
+    // 
=====================================================================================
+    // Test Case 4: expire_snapshots should clean expired metadata when enabled
+    // 
=====================================================================================
+    String clean_metadata_table = "test_expire_snapshots_clean_metadata"
+    sql """DROP TABLE IF EXISTS ${db_name}.${clean_metadata_table}"""
+    sql """
+        CREATE TABLE ${db_name}.${clean_metadata_table} (
+            id BIGINT,
+            data STRING
+        ) ENGINE=iceberg
+    """
+    sql """INSERT INTO ${db_name}.${clean_metadata_table} VALUES (1, 
'data1')"""
+    sql """ALTER TABLE ${db_name}.${clean_metadata_table} ADD COLUMN extra_col 
INT"""
+    sql """INSERT INTO ${db_name}.${clean_metadata_table} VALUES (2, 'data2', 
10)"""
+
+    AmazonS3 icebergS3Client = buildIcebergS3Client()
+    String metadataBeforePath = String.valueOf((sql """
+        SELECT file FROM ${clean_metadata_table}\$metadata_log_entries
+        ORDER BY timestamp DESC LIMIT 1
+    """)[0][0])
+    def metadataBefore = readMetadataJson(icebergS3Client, metadataBeforePath)
+    long schemasBefore = ((List) metadataBefore.schemas).size()
+    assertTrue(schemasBefore >= 2, "Expected multiple schemas before cleanup")
+
+    long expireMillis2 = System.currentTimeMillis()
+    sql """
+        ALTER TABLE ${catalog_name}.${db_name}.${clean_metadata_table}
+        EXECUTE expire_snapshots("older_than" = "${expireMillis2}", 
"retain_last" = "1",
+            "clean_expired_metadata" = "true")
+    """
+
+    String metadataAfterPath = String.valueOf((sql """
+        SELECT file FROM ${clean_metadata_table}\$metadata_log_entries
+        ORDER BY timestamp DESC LIMIT 1
+    """)[0][0])
+    def metadataAfter = readMetadataJson(icebergS3Client, metadataAfterPath)
+    long schemasAfter = ((List) metadataAfter.schemas).size()
+    assertTrue(schemasAfter < schemasBefore,
+        "Expected schemas cleaned up, before=${schemasBefore}, 
after=${schemasAfter}")
+
+    // 
=====================================================================================
+    // Negative Test Cases for expire_snapshots
+    // 
=====================================================================================
+
+    // Test validation - missing required property (neither older_than, 
retain_last, nor snapshot_ids)
+    test {
+        sql """
+            ALTER TABLE ${catalog_name}.${db_name}.test_expire_snapshots 
EXECUTE expire_snapshots
+            ()
+        """
+        exception "At least one of 'older_than', 'retain_last', or 
'snapshot_ids' must be specified"
+    }
+
+    // Test expire_snapshots with invalid older_than timestamp
+    test {
+        sql """
+            ALTER TABLE ${catalog_name}.${db_name}.test_expire_snapshots 
EXECUTE expire_snapshots
+            ("older_than" = "not-a-timestamp")
+        """
+        exception "Invalid older_than format"
+    }
+
+    // Test expire_snapshots with negative timestamp
+    test {
+        sql """
+            ALTER TABLE ${catalog_name}.${db_name}.test_expire_snapshots 
EXECUTE expire_snapshots
+            ("older_than" = "-1000")
+        """
+        exception "older_than timestamp must be non-negative"
+    }
+
+    // Test validation - retain_last must be at least 1
+    test {
+        sql """
+            ALTER TABLE ${catalog_name}.${db_name}.test_expire_snapshots 
EXECUTE expire_snapshots
+            ("retain_last" = "0")
+        """
+        exception "retain_last must be positive, got: 0"
+    }
+
+    // Test expire_snapshots with invalid retain_last format
+    test {
+        sql """
+            ALTER TABLE ${catalog_name}.${db_name}.test_expire_snapshots 
EXECUTE expire_snapshots
+            ("retain_last" = "not-a-number")
+        """
+        exception "Invalid retain_last format: not-a-number"
+    }
+
+    // Test expire_snapshots with negative retain_last
+    test {
+        sql """
+            ALTER TABLE ${catalog_name}.${db_name}.test_expire_snapshots 
EXECUTE expire_snapshots
+            ("retain_last" = "-5")
+        """
+        exception "retain_last must be positive, got: -5"
+    }
+
+    // Test expire_snapshots with invalid snapshot_ids format
+    test {
+        sql """
+            ALTER TABLE ${catalog_name}.${db_name}.test_expire_snapshots 
EXECUTE expire_snapshots
+            ("snapshot_ids" = "not-a-number")
+        """
+        exception "Invalid snapshot_id format: not-a-number"
+    }
+
+    // Test expire_snapshots with partition specification (should fail)
+    test {
+        sql """
+            ALTER TABLE ${catalog_name}.${db_name}.test_expire_snapshots 
EXECUTE expire_snapshots
+            ("older_than" = "2024-01-01T00:00:00") PARTITIONS (p1, p2, p3)
+        """
+        exception "Action 'expire_snapshots' does not support partition 
specification"
+    }
+}
diff --git 
a/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_retention_and_snapshot.groovy
 
b/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_retention_and_snapshot.groovy
index 489715d1ff1..ddf12b8f940 100644
--- 
a/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_retention_and_snapshot.groovy
+++ 
b/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_retention_and_snapshot.groovy
@@ -82,9 +82,12 @@ suite("iceberg_branch_retention_and_snapshot", 
"p0,external,doris,external_docke
     // Create tags to protect additional snapshots
     sql """ alter table ${table_name_expire} create tag t_expire_protect AS OF 
VERSION ${s_expire_1} """
 
-    // Call expire_snapshots via Spark - should not delete snapshots 
referenced by branch/tag
+    // Call expire_snapshots via Doris - should not delete snapshots 
referenced by branch/tag
     // Using a timestamp that would expire old snapshots but not those 
referenced by branch/tag
-    spark_iceberg """CALL demo.system.expire_snapshots(table => 
'test_db_retention.${table_name_expire}', older_than => TIMESTAMP '2020-01-01 
00:00:00')"""
+    sql """
+        ALTER TABLE ${catalog_name}.test_db_retention.${table_name_expire}
+        EXECUTE expire_snapshots("older_than" = "2020-01-01T00:00:00")
+    """
 
     // Verify snapshots are still accessible after expire_snapshots
     qt_expire_branch_still_accessible """ select count(*) from 
${table_name_expire}@branch(b_expire_test) """ // Should still have data
@@ -124,7 +127,10 @@ suite("iceberg_branch_retention_and_snapshot", 
"p0,external,doris,external_docke
     def snapshot_count_retain_before = sql """ select count(*) from 
iceberg_meta("table" = 
"${catalog_name}.test_db_retention.${table_name_retain_count}", "query_type" = 
"snapshots") """
 
     // Call expire_snapshots - older snapshots beyond retention count may be 
expired, but branch snapshot should be protected
-    spark_iceberg """CALL demo.system.expire_snapshots(table => 
'test_db_retention.${table_name_retain_count}', older_than => TIMESTAMP 
'2020-01-01 00:00:00')"""
+    sql """
+        ALTER TABLE 
${catalog_name}.test_db_retention.${table_name_retain_count}
+        EXECUTE expire_snapshots("older_than" = "2020-01-01T00:00:00")
+    """
 
     // Verify branch is still accessible and has data
     qt_retain_count_branch_accessible """ select count(*) from 
${table_name_retain_count}@branch(b_retain_count) """ // Should have data
@@ -160,7 +166,10 @@ suite("iceberg_branch_retention_and_snapshot", 
"p0,external,doris,external_docke
     logger.info("Snapshot count before expire: 
${snapshot_count_unref_before[0][0]}")
 
     // Call expire_snapshots - old unreferenced snapshots should be expired
-    spark_iceberg """CALL demo.system.expire_snapshots(table => 
'test_db_retention.${table_name_unref}', retain_last => 1)"""
+    sql """
+        ALTER TABLE ${catalog_name}.test_db_retention.${table_name_unref}
+        EXECUTE expire_snapshots("retain_last" = "1")
+    """
 
     // Count snapshots after expire
     def snapshot_count_unref_after = sql """ select count(*) from 
iceberg_meta("table" = "${catalog_name}.test_db_retention.${table_name_unref}", 
"query_type" = "snapshots") """
@@ -180,4 +189,3 @@ suite("iceberg_branch_retention_and_snapshot", 
"p0,external,doris,external_docke
     assertEquals(tag_ref_unref_after[0][0], 1)
 
 }
-


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to