This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 94f4c3db5ed branch-4.1: [Feature](iceberg) Implement rewrite_manifests 
procedure for Iceberg #59487 (#64391)
94f4c3db5ed is described below

commit 94f4c3db5ed38d6352de7a726504eb06a4811a5e
Author: Socrates <[email protected]>
AuthorDate: Tue Jun 16 19:25:20 2026 +0800

    branch-4.1: [Feature](iceberg) Implement rewrite_manifests procedure for 
Iceberg #59487 (#64391)
    
    ### What problem does this PR solve?
    
    Pick #59487 to branch-4.1.
    
    Original PR: https://github.com/apache/doris/pull/59487
    
    This pick implements the Iceberg rewrite_manifests procedure on
    branch-4.1 and includes the corresponding external Iceberg regression
    test.
    
    ### Release note
    
    Feature Implement rewrite_manifests procedure for Iceberg tables.
    
    ### Check List (For Author)
    
    - Test: Unit Test
    - ./run-fe-ut.sh --run
    org.apache.doris.datasource.iceberg.IcebergUtilsTest
    - Test: Static check
        - git diff --check origin/branch-4.1...HEAD
    - Behavior changed: Yes. Iceberg tables support ALTER TABLE ... EXECUTE
    rewrite_manifests().
    - Does this need documentation: No
    
    ### Pick Info
    
    - Source PR: #59487
    - Source merge commit: df9a8e66d2a124556a50485fd0148a0dcbc62c5e
    - Pick commit: f3f2ceeb2d7
    
    Co-authored-by: Lemon <[email protected]>
    Co-authored-by: weiqiang <[email protected]>
---
 .../action/IcebergExecuteActionFactory.java        |   7 +-
 .../action/IcebergRewriteManifestsAction.java      | 109 ++++++
 .../iceberg/rewrite/RewriteManifestExecutor.java   | 125 +++++++
 .../action/test_iceberg_rewrite_manifests.out      |  43 +++
 .../action/test_iceberg_rewrite_manifests.groovy   | 364 +++++++++++++++++++++
 5 files changed, 647 insertions(+), 1 deletion(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExecuteActionFactory.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExecuteActionFactory.java
index 7c208cb7db6..0d09a9ef35c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExecuteActionFactory.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExecuteActionFactory.java
@@ -40,6 +40,7 @@ public class IcebergExecuteActionFactory {
     public static final String EXPIRE_SNAPSHOTS = "expire_snapshots";
     public static final String REWRITE_DATA_FILES = "rewrite_data_files";
     public static final String PUBLISH_CHANGES = "publish_changes";
+    public static final String REWRITE_MANIFESTS = "rewrite_manifests";
 
     /**
      * Create an Iceberg-specific ExecuteAction instance.
@@ -84,6 +85,9 @@ public class IcebergExecuteActionFactory {
             case PUBLISH_CHANGES:
                 return new IcebergPublishChangesAction(properties, 
partitionNamesInfo,
                         whereCondition);
+            case REWRITE_MANIFESTS:
+                return new IcebergRewriteManifestsAction(properties, 
partitionNamesInfo,
+                        whereCondition);
             default:
                 throw new DdlException("Unsupported Iceberg procedure: " + 
actionType
                         + ". Supported procedures: " + String.join(", ", 
getSupportedActions()));
@@ -104,7 +108,8 @@ public class IcebergExecuteActionFactory {
                 FAST_FORWARD,
                 EXPIRE_SNAPSHOTS,
                 REWRITE_DATA_FILES,
-                PUBLISH_CHANGES
+                PUBLISH_CHANGES,
+                REWRITE_MANIFESTS
         };
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergRewriteManifestsAction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergRewriteManifestsAction.java
new file mode 100644
index 00000000000..430e9fe9d5e
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergRewriteManifestsAction.java
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.action;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.ArgumentParsers;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.iceberg.IcebergExternalTable;
+import org.apache.doris.datasource.iceberg.rewrite.RewriteManifestExecutor;
+import org.apache.doris.info.PartitionNamesInfo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+
+import com.google.common.collect.Lists;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Action for rewriting Iceberg manifest files to optimize metadata layout
+ */
+public class IcebergRewriteManifestsAction extends BaseIcebergAction {
+    private static final Logger LOG = 
LogManager.getLogger(IcebergRewriteManifestsAction.class);
+    public static final String SPEC_ID = "spec_id";
+
+    public IcebergRewriteManifestsAction(Map<String, String> properties,
+            Optional<PartitionNamesInfo> partitionNamesInfo,
+            Optional<Expression> whereCondition) {
+        super("rewrite_manifests", properties, partitionNamesInfo, 
whereCondition);
+    }
+
+    @Override
+    protected void registerIcebergArguments() {
+        namedArguments.registerOptionalArgument(SPEC_ID,
+                "Spec id of the manifests to rewrite (defaults to current spec 
id)",
+                null,
+                ArgumentParsers.intRange(SPEC_ID, 0, Integer.MAX_VALUE));
+    }
+
+    @Override
+    protected void validateIcebergAction() throws UserException {
+        validateNoPartitions();
+        validateNoWhereCondition();
+    }
+
+    @Override
+    protected List<String> executeAction(TableIf table) throws UserException {
+        try {
+            Table icebergTable = ((IcebergExternalTable) 
table).getIcebergTable();
+            Snapshot current = icebergTable.currentSnapshot();
+            if (current == null) {
+                // No current snapshot means the table is empty, no manifests 
to rewrite
+                return Lists.newArrayList("0", "0");
+            }
+
+            // Get optional spec_id parameter
+            Integer specId = namedArguments.getInt(SPEC_ID);
+
+            // Execute rewrite operation
+            RewriteManifestExecutor executor = new RewriteManifestExecutor();
+            RewriteManifestExecutor.Result result = executor.execute(
+                    icebergTable,
+                    (ExternalTable) table,
+                    specId);
+
+            return result.toStringList();
+        } catch (Exception e) {
+            LOG.warn("Failed to rewrite manifests for table: {}", 
table.getName(), e);
+            throw new UserException("Rewrite manifests failed: " + 
e.getMessage(), e);
+        }
+    }
+
+    @Override
+    protected List<Column> getResultSchema() {
+        return Lists.newArrayList(
+                new Column("rewritten_manifests_count", Type.INT, false,
+                        "Number of manifests which were re-written by this 
command"),
+                new Column("added_manifests_count", Type.INT, false,
+                        "Number of new manifest files which were written by 
this command")
+        );
+    }
+
+    @Override
+    public String getDescription() {
+        return "Rewrite Iceberg manifest files to optimize metadata layout";
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteManifestExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteManifestExecutor.java
new file mode 100644
index 00000000000..f2e5ab77adb
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteManifestExecutor.java
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.rewrite;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalTable;
+
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.RewriteManifests;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+
+/**
+ * Executor for manifest rewrite operations
+ */
+public class RewriteManifestExecutor {
+    private static final Logger LOG = 
LogManager.getLogger(RewriteManifestExecutor.class);
+
+    public static class Result {
+        private final int rewrittenCount;
+        private final int addedCount;
+
+        public Result(int rewrittenCount, int addedCount) {
+            this.rewrittenCount = rewrittenCount;
+            this.addedCount = addedCount;
+        }
+
+        public java.util.List<String> toStringList() {
+            return java.util.Arrays.asList(String.valueOf(rewrittenCount),
+                    String.valueOf(addedCount));
+        }
+    }
+
+    /**
+     * Execute manifest rewrite using Iceberg RewriteManifests API
+     */
+    public Result execute(Table table, ExternalTable extTable, Integer specId) 
throws UserException {
+        try {
+            // Get current snapshot and return early if table is empty
+            Snapshot currentSnapshot = table.currentSnapshot();
+            if (currentSnapshot == null) {
+                return new Result(0, 0);
+            }
+
+            // Collect manifests before rewrite and filter by specId if 
provided
+            List<ManifestFile> manifestsBefore = 
currentSnapshot.dataManifests(table.io());
+            List<ManifestFile> manifestsBeforeTargeted = 
filterBySpecId(manifestsBefore, specId);
+
+            int rewrittenCount = manifestsBeforeTargeted.size();
+
+            if (rewrittenCount == 0) {
+                return new Result(0, 0);
+            }
+
+            // Configure rewrite operation, optionally restricting manifests 
by specId
+            RewriteManifests rm = table.rewriteManifests();
+
+            if (specId != null) {
+                final int targetSpecId = specId;
+                rm.rewriteIf(manifest -> manifest.partitionSpecId() == 
targetSpecId);
+            }
+
+            // Commit manifest rewrite
+            rm.commit();
+
+            // Refresh snapshot after rewrite
+            Snapshot snapshotAfter = table.currentSnapshot();
+            if (snapshotAfter == null) {
+                return new Result(rewrittenCount, 0);
+            }
+
+            // Collect manifests after rewrite and filter by specId
+            List<ManifestFile> manifestsAfter = 
snapshotAfter.dataManifests(table.io());
+            List<ManifestFile> manifestsAfterTargeted = 
filterBySpecId(manifestsAfter, specId);
+
+            // Compute addedCount as newly produced manifests (path not in 
before set)
+            java.util.Set<String> beforePaths = 
manifestsBeforeTargeted.stream()
+                    .map(ManifestFile::path)
+                    .collect(java.util.stream.Collectors.toSet());
+
+            int addedCount = (int) manifestsAfterTargeted.stream()
+                    .map(ManifestFile::path)
+                    .filter(path -> !beforePaths.contains(path))
+                    .count();
+
+            // Invalidate table cache to ensure metadata is refreshed
+            
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(extTable);
+
+            return new Result(rewrittenCount, addedCount);
+        } catch (Exception e) {
+            LOG.warn("Failed to execute manifest rewrite for table: {}", 
extTable.getName(), e);
+            throw new UserException("Failed to rewrite manifests: " + 
e.getMessage(), e);
+        }
+    }
+
+    private List<ManifestFile> filterBySpecId(List<ManifestFile> manifests, 
Integer specId) {
+        if (specId == null) {
+            return manifests;
+        }
+        final int targetSpecId = specId;
+        return manifests.stream()
+                .filter(manifest -> manifest.partitionSpecId() == targetSpecId)
+                .collect(java.util.stream.Collectors.toList());
+    }
+}
diff --git 
a/regression-test/data/external_table_p0/iceberg/action/test_iceberg_rewrite_manifests.out
 
b/regression-test/data/external_table_p0/iceberg/action/test_iceberg_rewrite_manifests.out
new file mode 100644
index 00000000000..68934a31334
--- /dev/null
+++ 
b/regression-test/data/external_table_p0/iceberg/action/test_iceberg_rewrite_manifests.out
@@ -0,0 +1,43 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !before_basic_rewrite --
+1      item1   electronics     100     2024-01-01
+2      item2   electronics     200     2024-01-02
+3      item3   books   300     2024-01-03
+4      item4   books   400     2024-01-04
+5      item5   clothing        500     2024-01-05
+6      item6   clothing        600     2024-01-06
+7      item7   electronics     700     2024-01-07
+8      item8   electronics     800     2024-01-08
+
+-- !after_basic_rewrite --
+1      item1   electronics     100     2024-01-01
+2      item2   electronics     200     2024-01-02
+3      item3   books   300     2024-01-03
+4      item4   books   400     2024-01-04
+5      item5   clothing        500     2024-01-05
+6      item6   clothing        600     2024-01-06
+7      item7   electronics     700     2024-01-07
+8      item8   electronics     800     2024-01-08
+
+-- !before_partitioned_rewrite --
+1      item1   100     2024    1
+2      item2   200     2024    1
+3      item3   300     2024    2
+4      item4   400     2024    2
+5      item5   500     2024    3
+6      item6   600     2024    3
+
+-- !after_partitioned_rewrite --
+1      item1   100     2024    1
+2      item2   200     2024    1
+3      item3   300     2024    2
+4      item4   400     2024    2
+5      item5   500     2024    3
+6      item6   600     2024    3
+
+-- !after_spec_id_rewrite --
+1      item1   100     2024    1       15
+2      item2   200     2024    1       16
+3      item3   300     2024    2       17
+4      item4   400     2024    3       18
+5      item5   500     2024    3       19
diff --git 
a/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_rewrite_manifests.groovy
 
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_rewrite_manifests.groovy
new file mode 100644
index 00000000000..2e3af2e307a
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_rewrite_manifests.groovy
@@ -0,0 +1,364 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_rewrite_manifests", 
"p0,external,doris,external_docker,external_docker_doris") {
+    String enabled = context.config.otherConfigs.get("enableIcebergTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disable iceberg test.")
+        return
+    }
+
+    String catalog_name = "test_iceberg_rewrite_manifests"
+    String db_name = "test_db"
+    String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
+    String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+
+    sql """drop catalog if exists ${catalog_name}"""
+    sql """
+    CREATE CATALOG ${catalog_name} PROPERTIES (
+        'type'='iceberg',
+        'iceberg.catalog.type'='rest',
+        'uri' = 'http://${externalEnvIp}:${rest_port}',
+        "s3.access_key" = "admin",
+        "s3.secret_key" = "password",
+        "s3.endpoint" = "http://${externalEnvIp}:${minio_port}";,
+        "s3.region" = "us-east-1"
+    );"""
+
+    sql """switch ${catalog_name}"""
+    sql """CREATE DATABASE IF NOT EXISTS ${db_name} """
+    sql """use ${db_name}"""
+
+    // 
=====================================================================================
+    // Test Case 1: Basic rewrite_manifests operation
+    // Tests the ability to rewrite multiple manifest files into fewer, 
optimized files
+    // 
=====================================================================================
+    logger.info("Starting basic rewrite_manifests test case")
+
+    def table_name = "test_rewrite_manifests_basic"
+
+    // Clean up if table exists
+    sql """DROP TABLE IF EXISTS ${db_name}.${table_name}"""
+
+    // Create a test table
+    sql """
+        CREATE TABLE ${db_name}.${table_name} (
+            id BIGINT,
+            name STRING,
+            category STRING,
+            value INT,
+            created_date DATE
+        ) ENGINE=iceberg
+    """
+    logger.info("Created test table: ${table_name}")
+
+    // Insert data in multiple single-row operations to create multiple 
manifest files
+    // Each INSERT operation typically creates a new manifest file
+    sql """INSERT INTO ${db_name}.${table_name} VALUES (1, 'item1', 
'electronics', 100, '2024-01-01')"""
+    sql """INSERT INTO ${db_name}.${table_name} VALUES (2, 'item2', 
'electronics', 200, '2024-01-02')"""
+    sql """INSERT INTO ${db_name}.${table_name} VALUES (3, 'item3', 'books', 
300, '2024-01-03')"""
+    sql """INSERT INTO ${db_name}.${table_name} VALUES (4, 'item4', 'books', 
400, '2024-01-04')"""
+    sql """INSERT INTO ${db_name}.${table_name} VALUES (5, 'item5', 
'clothing', 500, '2024-01-05')"""
+    sql """INSERT INTO ${db_name}.${table_name} VALUES (6, 'item6', 
'clothing', 600, '2024-01-06')"""
+    sql """INSERT INTO ${db_name}.${table_name} VALUES (7, 'item7', 
'electronics', 700, '2024-01-07')"""
+    sql """INSERT INTO ${db_name}.${table_name} VALUES (8, 'item8', 
'electronics', 800, '2024-01-08')"""
+
+    // Verify data before rewrite
+    qt_before_basic_rewrite """SELECT * FROM ${table_name} ORDER BY id"""
+
+    // Check manifest count before rewrite
+    List<List<Object>> manifestsBefore = sql """
+        SELECT COUNT(*) as manifest_count FROM ${table_name}\$manifests
+    """
+    logger.info("Manifest count before rewrite: ${manifestsBefore}")
+
+    // Execute basic rewrite_manifests operation (no parameters - rewrite all 
manifests)
+    List<List<Object>> rewriteResult = sql """
+        ALTER TABLE ${catalog_name}.${db_name}.${table_name}
+        EXECUTE rewrite_manifests()
+    """
+    logger.info("Basic rewrite_manifests result: ${rewriteResult}")
+
+    // Verify the result structure
+    assertTrue(rewriteResult.size() == 1, "Expected exactly 1 result row")
+    assertTrue(rewriteResult[0].size() == 2, "Expected 2 columns in result")
+
+    // Extract rewritten and added manifest counts
+    int rewrittenCount = rewriteResult[0][0] as int
+    int addedCount = rewriteResult[0][1] as int
+
+    logger.info("Rewritten manifests: ${rewrittenCount}, Added manifests: 
${addedCount}")
+    assertTrue(rewrittenCount > 0, "Should have rewritten at least 1 manifest")
+    assertTrue(addedCount >= 0, "Added count should be non-negative")
+    // Note: addedCount can be 0 if Iceberg determines manifests are already 
optimal
+    // or if it reuses existing manifest files
+    if (addedCount > 0) {
+        assertTrue(addedCount <= rewrittenCount, "Added count should be <= 
rewritten count (consolidation)")
+    }
+
+    // Verify data integrity after rewrite
+    qt_after_basic_rewrite """SELECT * FROM ${table_name} ORDER BY id"""
+
+    // Check manifest count after rewrite (should be fewer or equal)
+    List<List<Object>> manifestsAfter = sql """
+        SELECT COUNT(*) as manifest_count FROM ${table_name}\$manifests
+    """
+    logger.info("Manifest count after rewrite: ${manifestsAfter}")
+    assertTrue(manifestsAfter[0][0] as int <= manifestsBefore[0][0] as int,
+        "Manifest count after rewrite should be <= count before")
+
+    // 
=====================================================================================
+    // Test Case 2: rewrite_manifests on partitioned table
+    // Tests manifest rewriting on a table with partition specifications
+    // 
=====================================================================================
+    logger.info("Starting rewrite_manifests on partitioned table test case")
+
+    def partitioned_table = "test_rewrite_manifests_partitioned"
+
+    // Clean up if table exists
+    sql """DROP TABLE IF EXISTS ${db_name}.${partitioned_table}"""
+
+    // Create a partitioned table
+    sql """
+        CREATE TABLE ${db_name}.${partitioned_table} (
+            id BIGINT,
+            name STRING,
+            value INT,
+            year INT,
+            month INT
+        ) ENGINE=iceberg
+        PARTITION BY (year, month)()
+    """
+    logger.info("Created partitioned test table: ${partitioned_table}")
+
+    // Insert data into different partitions to create multiple manifest files
+    sql """INSERT INTO ${db_name}.${partitioned_table} VALUES (1, 'item1', 
100, 2024, 1)"""
+    sql """INSERT INTO ${db_name}.${partitioned_table} VALUES (2, 'item2', 
200, 2024, 1)"""
+    sql """INSERT INTO ${db_name}.${partitioned_table} VALUES (3, 'item3', 
300, 2024, 2)"""
+    sql """INSERT INTO ${db_name}.${partitioned_table} VALUES (4, 'item4', 
400, 2024, 2)"""
+    sql """INSERT INTO ${db_name}.${partitioned_table} VALUES (5, 'item5', 
500, 2024, 3)"""
+    sql """INSERT INTO ${db_name}.${partitioned_table} VALUES (6, 'item6', 
600, 2024, 3)"""
+
+    // Verify data before rewrite
+    qt_before_partitioned_rewrite """SELECT * FROM ${partitioned_table} ORDER 
BY id"""
+
+    // Check manifest count before rewrite
+    List<List<Object>> partitionedManifestsBefore = sql """
+        SELECT COUNT(*) as manifest_count FROM ${partitioned_table}\$manifests
+    """
+    logger.info("Partitioned table manifest count before rewrite: 
${partitionedManifestsBefore}")
+
+    // Execute rewrite_manifests on partitioned table
+    List<List<Object>> partitionedRewriteResult = sql """
+        ALTER TABLE ${catalog_name}.${db_name}.${partitioned_table}
+        EXECUTE rewrite_manifests()
+    """
+    logger.info("Partitioned table rewrite_manifests result: 
${partitionedRewriteResult}")
+
+    // Verify result structure
+    assertTrue(partitionedRewriteResult.size() == 1, "Expected exactly 1 
result row for partitioned table")
+    assertTrue(partitionedRewriteResult[0].size() == 2, "Expected 2 columns in 
result for partitioned table")
+
+    int partitionedRewrittenCount = partitionedRewriteResult[0][0] as int
+    int partitionedAddedCount = partitionedRewriteResult[0][1] as int
+
+    logger.info("Partitioned table - Rewritten manifests: 
${partitionedRewrittenCount}, Added manifests: ${partitionedAddedCount}")
+    assertTrue(partitionedRewrittenCount > 0, "Partitioned table should have 
rewritten at least 1 manifest")
+    assertTrue(partitionedAddedCount >= 0, "Partitioned table added count 
should be non-negative")
+
+    // Verify data integrity after rewrite
+    qt_after_partitioned_rewrite """SELECT * FROM ${partitioned_table} ORDER 
BY id"""
+
+    // Check manifest count after rewrite
+    List<List<Object>> partitionedManifestsAfter = sql """
+        SELECT COUNT(*) as manifest_count FROM ${partitioned_table}\$manifests
+    """
+    logger.info("Partitioned table manifest count after rewrite: 
${partitionedManifestsAfter}")
+    assertTrue(partitionedManifestsAfter[0][0] as int <= 
partitionedManifestsBefore[0][0] as int,
+        "Partitioned table manifest count after rewrite should be <= count 
before")
+
+    // 
=====================================================================================
+    // Test Case 3: rewrite_manifests with spec_id parameter
+    // Tests manifest rewriting for a specific partition spec
+    // 
=====================================================================================
+    logger.info("Starting rewrite_manifests with spec_id test case")
+
+    def spec_id_table = "test_rewrite_manifests_spec_id"
+
+    // Clean up if table exists
+    sql """DROP TABLE IF EXISTS ${db_name}.${spec_id_table}"""
+
+    // Create a partitioned table (this will have spec_id = 0)
+    sql """
+        CREATE TABLE ${db_name}.${spec_id_table} (
+            id BIGINT,
+            name STRING,
+            value INT,
+            year INT,
+            month INT,
+            day INT
+        ) ENGINE=iceberg
+        PARTITION BY (year, month)()
+    """
+    logger.info("Created spec_id test table: ${spec_id_table}")
+
+    // Insert data to create manifests with spec_id 0
+    sql """INSERT INTO ${db_name}.${spec_id_table} VALUES (1, 'item1', 100, 
2024, 1, 15)"""
+    sql """INSERT INTO ${db_name}.${spec_id_table} VALUES (2, 'item2', 200, 
2024, 1, 16)"""
+    sql """INSERT INTO ${db_name}.${spec_id_table} VALUES (3, 'item3', 300, 
2024, 2, 17)"""
+
+    // Check initial spec_id and manifest count
+    List<List<Object>> initialSpecs = sql """
+        SELECT partition_spec_id, COUNT(*) as manifest_count
+        FROM ${spec_id_table}\$manifests
+        GROUP BY partition_spec_id
+        ORDER BY partition_spec_id
+    """
+    logger.info("Initial spec IDs and manifest counts: ${initialSpecs}")
+
+    // Add day as a new partition field to create spec_id = 1
+    sql """ALTER TABLE ${catalog_name}.${db_name}.${spec_id_table} ADD 
PARTITION KEY day"""
+
+    // Insert more data to create manifests with spec_id 1
+    sql """INSERT INTO ${db_name}.${spec_id_table} VALUES (4, 'item4', 400, 
2024, 3, 18)"""
+    sql """INSERT INTO ${db_name}.${spec_id_table} VALUES (5, 'item5', 500, 
2024, 3, 19)"""
+
+    // Check spec_ids after adding new partition field
+    List<List<Object>> allSpecs = sql """
+        SELECT partition_spec_id, COUNT(*) as manifest_count
+        FROM ${spec_id_table}\$manifests
+        GROUP BY partition_spec_id
+        ORDER BY partition_spec_id
+    """
+    logger.info("All spec IDs and manifest counts: ${allSpecs}")
+
+    if (allSpecs.size() > 0) {
+        int targetSpec = allSpecs[0][0] as int
+        int targetCount = allSpecs[0][1] as int
+        List<List<Object>> specResult = sql """
+            ALTER TABLE ${catalog_name}.${db_name}.${spec_id_table}
+            EXECUTE rewrite_manifests('spec_id' = '${targetSpec}')
+        """
+        int specRewritten = specResult[0][0] as int
+        assertTrue(specRewritten == targetCount,
+            "Should rewrite exactly ${targetCount} manifests for 
spec_id=${targetSpec}, got ${specRewritten}")
+        qt_after_spec_id_rewrite """SELECT * FROM ${spec_id_table} ORDER BY 
id"""
+        logger.info("spec_id filtering test completed successfully")
+    } else {
+        logger.warn("Could not create spec_id, skipping spec_id filtering 
test")
+    }
+
+    // 
=====================================================================================
+    // Test Case 4: rewrite_manifests on empty table (no current snapshot)
+    // Tests that rewrite_manifests handles tables with no current snapshot 
gracefully
+    // 
=====================================================================================
+    logger.info("Starting rewrite_manifests on empty table test case")
+
+    def empty_table = "test_empty_table"
+    sql """DROP TABLE IF EXISTS ${db_name}.${empty_table}"""
+    sql """
+        CREATE TABLE ${db_name}.${empty_table} (
+            id BIGINT,
+            name STRING
+        ) ENGINE=iceberg
+    """
+    logger.info("Created empty test table: ${empty_table}")
+
+    // Execute rewrite_manifests on empty table (no current snapshot)
+    List<List<Object>> emptyTableResult = sql """
+        ALTER TABLE ${catalog_name}.${db_name}.${empty_table}
+        EXECUTE rewrite_manifests()
+    """
+    logger.info("Empty table rewrite_manifests result: ${emptyTableResult}")
+
+    // Verify result structure
+    assertTrue(emptyTableResult.size() == 1, "Expected exactly 1 result row 
for empty table")
+    assertTrue(emptyTableResult[0].size() == 2, "Expected 2 columns in result 
for empty table")
+
+    // Should return 0 rewritten manifests and 0 added manifests for empty 
table
+    int emptyRewrittenCount = emptyTableResult[0][0] as int
+    int emptyAddedCount = emptyTableResult[0][1] as int
+
+    assertTrue(emptyRewrittenCount == 0, "Empty table should have 0 rewritten 
manifests, got: ${emptyRewrittenCount}")
+    assertTrue(emptyAddedCount == 0, "Empty table should have 0 added 
manifests, got: ${emptyAddedCount}")
+
+    logger.info("Empty table test completed: rewritten=${emptyRewrittenCount}, 
added=${emptyAddedCount}")
+
+    // 
=====================================================================================
+    // Negative Test Cases: Parameter validation and error handling
+    // 
=====================================================================================
+    logger.info("Starting negative test cases for rewrite_manifests")
+
+    // Test with invalid spec_id format
+
+    test {
+        sql """
+            ALTER TABLE ${catalog_name}.${db_name}.${table_name}
+            EXECUTE rewrite_manifests('spec_id' = 'not-a-number')
+        """
+        exception "Invalid"
+    }
+
+    // Test with non-existent spec_id (on spec_id table)
+
+    // Test with non-existent spec_id (very large number unlikely to exist) on 
spec_id table
+    List<List<Object>> nonExistentSpecOnSpecTable = sql """
+        ALTER TABLE ${catalog_name}.${db_name}.${spec_id_table}
+        EXECUTE rewrite_manifests('spec_id' = '99999')
+    """
+    assertTrue(nonExistentSpecOnSpecTable[0][0] as int == 0, "Non-existent 
spec_id on spec_id_table should return 0 rewritten")
+    assertTrue(nonExistentSpecOnSpecTable[0][1] as int == 0, "Non-existent 
spec_id on spec_id_table should return 0 added")
+
+    // Test with unknown parameter
+    test {
+        sql """
+            ALTER TABLE ${catalog_name}.${db_name}.${table_name}
+            EXECUTE rewrite_manifests('unknown-parameter' = 'value')
+        """
+        exception "Unknown argument: unknown-parameter"
+    }
+
+    // Test rewrite_manifests with partition specification (should fail)
+    test {
+        sql """
+            ALTER TABLE ${catalog_name}.${db_name}.${table_name}
+            EXECUTE rewrite_manifests() PARTITIONS (part1)
+        """
+        exception "Action 'rewrite_manifests' does not support partition 
specification"
+    }
+
+    // Test rewrite_manifests with WHERE condition (should fail)
+    test {
+        sql """
+            ALTER TABLE ${catalog_name}.${db_name}.${table_name}
+            EXECUTE rewrite_manifests() WHERE id > 0
+        """
+        exception "Action 'rewrite_manifests' does not support WHERE condition"
+    }
+
+    // Test on non-existent table
+    test {
+        sql """
+            ALTER TABLE ${catalog_name}.${db_name}.non_existent_table
+            EXECUTE rewrite_manifests()
+        """
+        exception "Table non_existent_table does not exist"
+    }
+
+    logger.info("All rewrite_manifests test cases completed successfully")
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to