This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6c3c551944c [Feature](Iceberg) Implement publish_changes procedure for
Iceberg tables (#58755)
6c3c551944c is described below
commit 6c3c551944c8672d59f5ba765879735007cfb8ba
Author: xylaaaaa <[email protected]>
AuthorDate: Tue Dec 9 22:31:02 2025 +0800
[Feature](Iceberg) Implement publish_changes procedure for Iceberg tables
(#58755)
### What problem does this PR solve?
- **Issue Number**: part of #58199
- **Related PR**: N/A
Problem Summary:
This PR implements the `publish_changes` action for Iceberg tables. This
action serves as the "Publish" step in the Write-Audit-Publish (WAP)
pattern. The procedure locates a snapshot tagged with a specific
`wap.id` property and cherry-picks it into the current table state. This
allows users to atomically make "staged" data visible after validation.
Syntax:
```sql
EXECUTE TABLE catalog.db.table_name publish_changes("wap_id" = "batch_123");
````
Output:
Returns `previous_snapshot_id` (STRING) and `current_snapshot_id`
(STRING) indicating the state transition.
Use cases:
1. Implement Write-Audit-Publish (WAP) workflows.
2. Atomically publish validated data to the main branch.
3. Manage staged snapshots based on custom WAP IDs.
Co-authored-by: Chenjunwei <[email protected]>
---
.../create_preinstalled_scripts/iceberg/run23.sql | 40 +++++++
.../action/IcebergExecuteActionFactory.java | 7 +-
.../action/IcebergPublishChangesAction.java | 128 +++++++++++++++++++++
.../action/test_iceberg_execute_actions.out | 6 +
.../action/test_iceberg_execute_actions.groovy | 94 +++++++++++++++
5 files changed, 274 insertions(+), 1 deletion(-)
diff --git
a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run23.sql
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run23.sql
new file mode 100644
index 00000000000..313766e7b59
--- /dev/null
+++
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run23.sql
@@ -0,0 +1,40 @@
+
+CREATE DATABASE IF NOT EXISTS demo.wap_test;
+
+
+USE demo.wap_test;
+
+
+DROP TABLE IF EXISTS orders_wap;
+
+-- WAP-enabled orders table
+CREATE TABLE orders_wap (
+ order_id INT,
+ customer_id INT,
+ amount DECIMAL(10, 2),
+ order_date STRING
+)
+USING iceberg;
+ALTER TABLE wap_test.orders_wap SET TBLPROPERTIES ('write.wap.enabled'='true');
+
+SET spark.wap.id = test_wap_001;
+
+
+
+INSERT INTO orders_wap VALUES
+ (1, 103, 150.00, '2025-12-03'),
+ (2, 104, 320.25, '2025-12-04');
+
+
+DROP TABLE IF EXISTS orders_non_wap;
+-- Non WAP-enabled orders table
+CREATE TABLE orders_non_wap (
+ order_id INT,
+ customer_id INT,
+ amount DECIMAL(10, 2),
+ order_date STRING
+)
+USING iceberg;
+
+INSERT INTO orders_non_wap VALUES
+(1, 201, 10.00, '2025-12-01');
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExecuteActionFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExecuteActionFactory.java
index 94be847700c..7c208cb7db6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExecuteActionFactory.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExecuteActionFactory.java
@@ -39,6 +39,7 @@ public class IcebergExecuteActionFactory {
public static final String FAST_FORWARD = "fast_forward";
public static final String EXPIRE_SNAPSHOTS = "expire_snapshots";
public static final String REWRITE_DATA_FILES = "rewrite_data_files";
+ public static final String PUBLISH_CHANGES = "publish_changes";
/**
* Create an Iceberg-specific ExecuteAction instance.
@@ -80,6 +81,9 @@ public class IcebergExecuteActionFactory {
case REWRITE_DATA_FILES:
return new IcebergRewriteDataFilesAction(properties,
partitionNamesInfo,
whereCondition);
+ case PUBLISH_CHANGES:
+ return new IcebergPublishChangesAction(properties,
partitionNamesInfo,
+ whereCondition);
default:
throw new DdlException("Unsupported Iceberg procedure: " +
actionType
+ ". Supported procedures: " + String.join(", ",
getSupportedActions()));
@@ -99,7 +103,8 @@ public class IcebergExecuteActionFactory {
CHERRYPICK_SNAPSHOT,
FAST_FORWARD,
EXPIRE_SNAPSHOTS,
- REWRITE_DATA_FILES
+ REWRITE_DATA_FILES,
+ PUBLISH_CHANGES
};
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergPublishChangesAction.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergPublishChangesAction.java
new file mode 100644
index 00000000000..e1bf8cbdad4
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergPublishChangesAction.java
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.action;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.ArgumentParsers;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.iceberg.IcebergExternalTable;
+import org.apache.doris.info.PartitionNamesInfo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+
+import com.google.common.collect.Lists;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Implements Iceberg's publish_changes action (Core of the WAP pattern).
+ * This action finds a snapshot tagged with a specific 'wap.id' and
cherry-picks it
+ * into the current table state.
+ * Corresponds to Spark syntax: CALL catalog.system.publish_changes('table',
'wap_id_123')
+ */
+public class IcebergPublishChangesAction extends BaseIcebergAction {
+ public static final String WAP_ID = "wap_id";
+ private static final String WAP_ID_PROP = "wap.id";
+
+ public IcebergPublishChangesAction(Map<String, String> properties,
+ Optional<PartitionNamesInfo> partitionNamesInfo,
Optional<Expression> whereCondition) {
+ super("publish_changes", properties, partitionNamesInfo,
whereCondition);
+ }
+
+ @Override
+ protected void registerIcebergArguments() {
+ namedArguments.registerRequiredArgument(WAP_ID,
+ "The WAP ID matching the snapshot to publish",
+ ArgumentParsers.nonEmptyString(WAP_ID));
+ }
+
+ @Override
+ protected void validateIcebergAction() throws UserException {
+ validateNoPartitions();
+ validateNoWhereCondition();
+ }
+
+ @Override
+ protected List<String> executeAction(TableIf table) throws UserException {
+ Table icebergTable = ((IcebergExternalTable) table).getIcebergTable();
+ String targetWapId = namedArguments.getString(WAP_ID);
+
+ // Find the target WAP snapshot
+ Snapshot wapSnapshot = null;
+ for (Snapshot snapshot : icebergTable.snapshots()) {
+ if (targetWapId.equals(snapshot.summary().get(WAP_ID_PROP))) {
+ wapSnapshot = snapshot;
+ break;
+ }
+ }
+
+ if (wapSnapshot == null) {
+ throw new UserException("Cannot find snapshot with " + WAP_ID_PROP
+ " = " + targetWapId);
+ }
+
+ long wapSnapshotId = wapSnapshot.snapshotId();
+
+ try {
+ // Get previous snapshot ID for result
+ Snapshot previousSnapshot = icebergTable.currentSnapshot();
+ Long previousSnapshotId = previousSnapshot != null ?
previousSnapshot.snapshotId() : null;
+
+ // Execute Cherry-pick
+ icebergTable.manageSnapshots().cherrypick(wapSnapshotId).commit();
+
+ // Get current snapshot ID after commit
+ Snapshot currentSnapshot = icebergTable.currentSnapshot();
+ Long currentSnapshotId = currentSnapshot != null ?
currentSnapshot.snapshotId() : null;
+
+ // Invalidate iceberg catalog table cache
+
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache((ExternalTable)
table);
+
+ String previousSnapshotIdString = previousSnapshotId != null ?
String.valueOf(previousSnapshotId) : "null";
+ String currentSnapshotIdString = currentSnapshotId != null ?
String.valueOf(currentSnapshotId) : "null";
+
+ return Lists.newArrayList(
+ previousSnapshotIdString,
+ currentSnapshotIdString
+ );
+
+ } catch (Exception e) {
+ throw new UserException("Failed to publish changes for wap.id " +
targetWapId + ": " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ protected List<Column> getResultSchema() {
+ return Lists.newArrayList(
+ new Column("previous_snapshot_id", Type.STRING, false,
+ "ID of the snapshot before the publish operation"),
+ new Column("current_snapshot_id", Type.STRING, false,
+ "ID of the new snapshot created as a result of the
publish operation"));
+ }
+
+ @Override
+ public String getDescription() {
+ return "Publish a WAP snapshot by cherry-picking it to the current
table state";
+ }
+}
diff --git
a/regression-test/data/external_table_p0/iceberg/action/test_iceberg_execute_actions.out
b/regression-test/data/external_table_p0/iceberg/action/test_iceberg_execute_actions.out
index a815666d92a..bac2b4e6bf7 100644
---
a/regression-test/data/external_table_p0/iceberg/action/test_iceberg_execute_actions.out
+++
b/regression-test/data/external_table_p0/iceberg/action/test_iceberg_execute_actions.out
@@ -66,3 +66,9 @@
2 record2 200
3 record3 300
+-- !wap_before_publish --
+
+-- !wap_after_publish --
+1 103 150.00 2025-12-03
+2 104 320.25 2025-12-04
+
diff --git
a/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_execute_actions.groovy
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_execute_actions.groovy
index f84e05de167..00906456633 100644
---
a/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_execute_actions.groovy
+++
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_execute_actions.groovy
@@ -636,4 +636,98 @@ suite("test_iceberg_optimize_actions_ddl",
"p0,external,doris,external_docker,ex
"""
exception "Action 'expire_snapshots' does not support partition
specification"
}
+
+ //
=====================================================================================
+// Test Case 6: publish_changes action with WAP (Write-Audit-Publish) pattern
+// Simplified workflow:
+//
+// - Main branch is initially empty (0 rows)
+// - A WAP snapshot exists with wap.id = "test_wap_001" and 2 rows
+// - publish_changes should cherry-pick the WAP snapshot into the main branch
+//
=====================================================================================
+
+logger.info("Starting simplified WAP (Write-Audit-Publish) workflow
verification test")
+
+// WAP test database and table
+String wap_db = "wap_test"
+String wap_table = "orders_wap"
+
+// Step 1: Verify no data is visible before publish_changes
+logger.info("Step 1: Verifying table is empty before publish_changes")
+qt_wap_before_publish """
+ SELECT order_id, customer_id, amount, order_date
+ FROM ${catalog_name}.${wap_db}.${wap_table}
+ ORDER BY order_id
+"""
+
+// Step 2: Publish the WAP changes with wap_id = "test_wap_001"
+logger.info("Step 2: Publishing WAP changes with wap_id=test_wap_001")
+sql """
+ ALTER TABLE ${catalog_name}.${wap_db}.${wap_table}
+ EXECUTE publish_changes("wap_id" = "test_wap_001")
+"""
+logger.info("Publish changes executed successfully")
+
+// Step 3: Verify WAP data is visible after publish_changes
+logger.info("Step 3: Verifying WAP data is visible after publish_changes")
+qt_wap_after_publish """
+ SELECT order_id, customer_id, amount, order_date
+ FROM ${catalog_name}.${wap_db}.${wap_table}
+ ORDER BY order_id
+"""
+
+logger.info("Simplified WAP (Write-Audit-Publish) workflow verification
completed successfully")
+
+// Negative tests for publish_changes
+
+// publish_changes on table without write.wap.enabled = true (should fail)
+test {
+ String nonWapDb = "wap_test"
+ String nonWapTable = "orders_non_wap"
+
+ sql """
+ ALTER TABLE ${catalog_name}.${nonWapDb}.${nonWapTable}
+ EXECUTE publish_changes("wap_id" = "test_wap_001")
+ """
+ exception "Cannot find snapshot with wap.id = test_wap_001"
+}
+
+
+// publish_changes with missing wap_id (should fail)
+test {
+ sql """
+ ALTER TABLE ${catalog_name}.${db_name}.${table_name}
+ EXECUTE publish_changes ()
+ """
+ exception "Missing required argument: wap_id"
+}
+
+// publish_changes with invalid wap_id (should fail)
+test {
+ sql """
+ ALTER TABLE ${catalog_name}.${wap_db}.${wap_table}
+ EXECUTE publish_changes("wap_id" = "non_existing_wap_id")
+ """
+ exception "Cannot find snapshot with wap.id = non_existing_wap_id"
+}
+
+// publish_changes with partition specification (should fail)
+test {
+ sql """
+ ALTER TABLE ${catalog_name}.${db_name}.${table_name}
+ EXECUTE publish_changes ("wap_id" = "test_wap_001") PARTITIONS (part1)
+ """
+ exception "Action 'publish_changes' does not support partition
specification"
+}
+
+// publish_changes with WHERE condition (should fail)
+test {
+ sql """
+ ALTER TABLE ${catalog_name}.${db_name}.${table_name}
+ EXECUTE publish_changes ("wap_id" = "test_wap_001") WHERE id > 0
+ """
+ exception "Action 'publish_changes' does not support WHERE condition"
+}
+
+
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]