This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e3edd69e45 [core] Introduce RollbackToWatermarkProcedure for rollback 
(#4687)
e3edd69e45 is described below

commit e3edd69e45434facf2f84b978f60018a69a335d9
Author: xuzifu666 <[email protected]>
AuthorDate: Wed Dec 11 21:48:22 2024 +0800

    [core] Introduce RollbackToWatermarkProcedure for rollback (#4687)
---
 docs/content/flink/procedures.md                   |  22 +++++
 docs/content/spark/procedures.md                   |  11 +++
 .../org/apache/paimon/utils/SnapshotManager.java   |  59 ++++++++++++
 .../procedure/RollbackToWatermarkProcedure.java    |  59 ++++++++++++
 .../procedure/RollbackToWatermarkProcedure.java    |  66 +++++++++++++
 .../services/org.apache.paimon.factories.Factory   |   1 +
 .../RollbackToWatermarkProcedureITCase.java        |  79 ++++++++++++++++
 .../org/apache/paimon/spark/SparkProcedures.java   |   2 +
 .../procedure/RollbackToWatermarkProcedure.java    | 105 +++++++++++++++++++++
 9 files changed, 404 insertions(+)

diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index 59b02f82bf..7a9b238073 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -412,6 +412,28 @@ All available procedures are listed below.
          CALL sys.rollback_to_timestamp(`table` => 'default.T', timestamp => 
1730292023000)
       </td>
    </tr>
+   <tr>
+          <td>rollback_to_watermark</td>
+      <td>
+         -- for Flink 1.18<br/>
+         -- rollback to the snapshot which earlier or equal than 
watermark.<br/>
+         CALL sys.rollback_to_watermark('identifier', watermark)<br/><br/>
+         -- for Flink 1.19 and later<br/>
+         -- rollback to the snapshot which earlier or equal than 
watermark.<br/>
+         CALL sys.rollback_to_watermark(`table` => 'default.T', `watermark` => 
watermark)<br/><br/>
+      </td>
+      <td>
+         To rollback to the snapshot which earlier or equal than watermark. 
Argument:
+            <li>identifier: the target table identifier. Cannot be empty.</li>
+            <li>watermark (Long): Roll back to the snapshot which earlier or 
equal than watermark.</li>
+      </td>
+      <td>
+         -- for Flink 1.18<br/>
+         CALL sys.rollback_to_watermark('default.T', 1730292023000)
+         -- for Flink 1.19 and later<br/>
+         CALL sys.rollback_to_watermark(`table` => 'default.T', watermark => 
1730292023000)
+      </td>
+   </tr>
    <tr>
       <td>expire_snapshots</td>
       <td>
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index 88d46fabbb..5b0efd5f90 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -179,6 +179,17 @@ This section introduce all available spark procedures 
about paimon.
           CALL sys.rollback_to_timestamp(table => 'default.T', timestamp => 
1730292023000)<br/><br/>
       </td>
     </tr>
+    <tr>
+      <td>rollback_to_watermark</td>
+      <td>
+         To rollback to the snapshot which earlier or equal than watermark. 
Argument:
+            <li>table: the target table identifier. Cannot be empty.</li>
+            <li>watermark: roll back to the snapshot which earlier or equal 
than watermark.</li>
+      </td>
+      <td>
+          CALL sys.rollback_to_watermark(table => 'default.T', watermark => 
1730292023000)<br/><br/>
+      </td>
+    </tr>
     <tr>
       <td>migrate_database</td>
       <td>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index cbe33ffaf4..eb7333366f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -366,6 +366,65 @@ public class SnapshotManager implements Serializable {
         return finalSnapshot;
     }
 
+    public @Nullable Snapshot earlierOrEqualWatermark(long watermark) {
+        Long earliest = earliestSnapshotId();
+        Long latest = latestSnapshotId();
+        // If latest == Long.MIN_VALUE don't need next binary search for 
watermark
+        // which can reduce IO cost with snapshot
+        if (earliest == null || latest == null || snapshot(latest).watermark() 
== Long.MIN_VALUE) {
+            return null;
+        }
+        Long earliestWatermark = null;
+        // find the first snapshot with watermark
+        if ((earliestWatermark = snapshot(earliest).watermark()) == null) {
+            while (earliest < latest) {
+                earliest++;
+                earliestWatermark = snapshot(earliest).watermark();
+                if (earliestWatermark != null) {
+                    break;
+                }
+            }
+        }
+        if (earliestWatermark == null) {
+            return null;
+        }
+
+        if (earliestWatermark >= watermark) {
+            return snapshot(earliest);
+        }
+        Snapshot finalSnapshot = null;
+
+        while (earliest <= latest) {
+            long mid = earliest + (latest - earliest) / 2; // Avoid overflow
+            Snapshot snapshot = snapshot(mid);
+            Long commitWatermark = snapshot.watermark();
+            if (commitWatermark == null) {
+                // find the first snapshot with watermark
+                while (mid >= earliest) {
+                    mid--;
+                    commitWatermark = snapshot(mid).watermark();
+                    if (commitWatermark != null) {
+                        break;
+                    }
+                }
+            }
+            if (commitWatermark == null) {
+                earliest = mid + 1;
+            } else {
+                if (commitWatermark > watermark) {
+                    latest = mid - 1; // Search in the left half
+                } else if (commitWatermark < watermark) {
+                    earliest = mid + 1; // Search in the right half
+                    finalSnapshot = snapshot;
+                } else {
+                    finalSnapshot = snapshot; // Found the exact match
+                    break;
+                }
+            }
+        }
+        return finalSnapshot;
+    }
+
     public @Nullable Snapshot laterOrEqualWatermark(long watermark) {
         Long earliest = earliestSnapshotId();
         Long latest = latestSnapshotId();
diff --git 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java
 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java
new file mode 100644
index 0000000000..da0b38f16b
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Rollback to watermark procedure. Usage:
+ *
+ * <pre><code>
+ *  -- rollback to the snapshot which earlier or equal than watermark.
+ *  CALL sys.rollback_to_watermark('tableId', watermark)
+ * </code></pre>
+ */
+public class RollbackToWatermarkProcedure extends ProcedureBase {
+
+    public static final String IDENTIFIER = "rollback_to_watermark";
+
+    public String[] call(ProcedureContext procedureContext, String tableId, 
long watermark)
+            throws Catalog.TableNotExistException {
+        Preconditions.checkNotNull(tableId, "table can not be empty");
+        Table table = catalog.getTable(Identifier.fromString(tableId));
+        FileStoreTable fileStoreTable = (FileStoreTable) table;
+        Snapshot snapshot = 
fileStoreTable.snapshotManager().earlierOrEqualWatermark(watermark);
+        Preconditions.checkNotNull(
+                snapshot, String.format("count not find snapshot earlier than 
%s", watermark));
+        long snapshotId = snapshot.id();
+        fileStoreTable.rollbackTo(snapshotId);
+        return new String[] {String.format("Success roll back to snapshot: %s 
.", snapshotId)};
+    }
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java
new file mode 100644
index 0000000000..ab1ea8080d
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Rollback to watermark procedure. Usage:
+ *
+ * <pre><code>
+ *  -- rollback to the snapshot which earlier or equal than watermark.
+ *  CALL sys.rollback_to_watermark(`table` => 'tableId', watermark => 
watermark)
+ * </code></pre>
+ */
+public class RollbackToWatermarkProcedure extends ProcedureBase {
+
+    public static final String IDENTIFIER = "rollback_to_watermark";
+
+    @ProcedureHint(
+            argument = {
+                @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+                @ArgumentHint(name = "watermark", type = 
@DataTypeHint("BIGINT"))
+            })
+    public String[] call(ProcedureContext procedureContext, String tableId, 
Long watermark)
+            throws Catalog.TableNotExistException {
+        Table table = catalog.getTable(Identifier.fromString(tableId));
+        FileStoreTable fileStoreTable = (FileStoreTable) table;
+        Snapshot snapshot = 
fileStoreTable.snapshotManager().earlierOrEqualWatermark(watermark);
+        Preconditions.checkNotNull(
+                snapshot, String.format("count not find snapshot earlier than 
%s", watermark));
+        long snapshotId = snapshot.id();
+        fileStoreTable.rollbackTo(snapshotId);
+        return new String[] {String.format("Success roll back to snapshot: %s 
.", snapshotId)};
+    }
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 0ff3ac1f1e..6c3b0e7664 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -61,6 +61,7 @@ org.apache.paimon.flink.procedure.MergeIntoProcedure
 org.apache.paimon.flink.procedure.ResetConsumerProcedure
 org.apache.paimon.flink.procedure.RollbackToProcedure
 org.apache.paimon.flink.procedure.RollbackToTimestampProcedure
+org.apache.paimon.flink.procedure.RollbackToWatermarkProcedure
 org.apache.paimon.flink.procedure.MigrateTableProcedure
 org.apache.paimon.flink.procedure.MigrateDatabaseProcedure
 org.apache.paimon.flink.procedure.MigrateFileProcedure
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedureITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedureITCase.java
new file mode 100644
index 0000000000..f87ecd2475
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedureITCase.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.flink.CatalogITCaseBase;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT Case for {@link RollbackToWatermarkProcedure}. */
+public class RollbackToWatermarkProcedureITCase extends CatalogITCaseBase {
+
+    @Test
+    public void testCreateTagsFromSnapshotsWatermark() throws Exception {
+        sql(
+                "CREATE TABLE T ("
+                        + " k STRING,"
+                        + " dt STRING,"
+                        + " PRIMARY KEY (k, dt) NOT ENFORCED"
+                        + ") PARTITIONED BY (dt) WITH ("
+                        + " 'bucket' = '1'"
+                        + ")");
+
+        // create snapshot 1 with watermark 1000.
+        sql(
+                "insert into T/*+ OPTIONS('end-input.watermark'= '1000') */ 
values('k1', '2024-12-02')");
+        // create snapshot 2 with watermark 2000.
+        sql(
+                "insert into T/*+ OPTIONS('end-input.watermark'= '2000') */ 
values('k2', '2024-12-02')");
+        // create snapshot 3 with watermark 3000.
+        sql(
+                "insert into T/*+ OPTIONS('end-input.watermark'= '3000') */ 
values('k3', '2024-12-02')");
+
+        FileStoreTable table = paimonTable("T");
+
+        long watermark1 = table.snapshotManager().snapshot(1).watermark();
+        long watermark2 = table.snapshotManager().snapshot(2).watermark();
+        long watermark3 = table.snapshotManager().snapshot(3).watermark();
+
+        assertThat(watermark1 == 1000).isTrue();
+        assertThat(watermark2 == 2000).isTrue();
+        assertThat(watermark3 == 3000).isTrue();
+
+        assertThat(sql("select * from T").stream().map(Row::toString))
+                .containsExactlyInAnyOrder(
+                        "+I[k1, 2024-12-02]", "+I[k2, 2024-12-02]", "+I[k3, 
2024-12-02]");
+
+        sql("CALL sys.rollback_to_watermark(`table` => 'default.T',`watermark` 
=> 2001)");
+
+        // check for snapshot 2
+        assertThat(sql("select * from T").stream().map(Row::toString))
+                .containsExactlyInAnyOrder("+I[k1, 2024-12-02]", "+I[k2, 
2024-12-02]");
+
+        sql("CALL sys.rollback_to_watermark(`table` => 'default.T',`watermark` 
=> 1001)");
+
+        // check for snapshot 1
+        assertThat(sql("select * from T").stream().map(Row::toString))
+                .containsExactlyInAnyOrder("+I[k1, 2024-12-02]");
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index 21f14e5d7a..b2fa66a150 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -43,6 +43,7 @@ import org.apache.paimon.spark.procedure.ReplaceTagProcedure;
 import org.apache.paimon.spark.procedure.ResetConsumerProcedure;
 import org.apache.paimon.spark.procedure.RollbackProcedure;
 import org.apache.paimon.spark.procedure.RollbackToTimestampProcedure;
+import org.apache.paimon.spark.procedure.RollbackToWatermarkProcedure;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
 
@@ -72,6 +73,7 @@ public class SparkProcedures {
                 ImmutableMap.builder();
         procedureBuilders.put("rollback", RollbackProcedure::builder);
         procedureBuilders.put("rollback_to_timestamp", 
RollbackToTimestampProcedure::builder);
+        procedureBuilders.put("rollback_to_watermark", 
RollbackToWatermarkProcedure::builder);
         procedureBuilders.put("create_tag", CreateTagProcedure::builder);
         procedureBuilders.put("replace_tag", ReplaceTagProcedure::builder);
         procedureBuilders.put("rename_tag", RenameTagProcedure::builder);
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToWatermarkProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToWatermarkProcedure.java
new file mode 100644
index 0000000000..09185f02c9
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToWatermarkProcedure.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import static org.apache.spark.sql.types.DataTypes.LongType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/** A procedure to rollback to a watermark. */
+public class RollbackToWatermarkProcedure extends BaseProcedure {
+
+    private static final ProcedureParameter[] PARAMETERS =
+            new ProcedureParameter[] {
+                ProcedureParameter.required("table", StringType),
+                // watermark value
+                ProcedureParameter.required("watermark", LongType)
+            };
+
+    private static final StructType OUTPUT_TYPE =
+            new StructType(
+                    new StructField[] {
+                        new StructField("result", StringType, true, 
Metadata.empty())
+                    });
+
+    private RollbackToWatermarkProcedure(TableCatalog tableCatalog) {
+        super(tableCatalog);
+    }
+
+    @Override
+    public ProcedureParameter[] parameters() {
+        return PARAMETERS;
+    }
+
+    @Override
+    public StructType outputType() {
+        return OUTPUT_TYPE;
+    }
+
+    @Override
+    public InternalRow[] call(InternalRow args) {
+        Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
+        Long watermark = args.getLong(1);
+
+        return modifyPaimonTable(
+                tableIdent,
+                table -> {
+                    FileStoreTable fileStoreTable = (FileStoreTable) table;
+                    Snapshot snapshot =
+                            
fileStoreTable.snapshotManager().earlierOrEqualWatermark(watermark);
+                    Preconditions.checkNotNull(
+                            snapshot,
+                            String.format("count not find snapshot earlier 
than %s", watermark));
+                    long snapshotId = snapshot.id();
+                    fileStoreTable.rollbackTo(snapshotId);
+                    InternalRow outputRow =
+                            newInternalRow(
+                                    UTF8String.fromString(
+                                            String.format(
+                                                    "Success roll back to 
snapshot: %s .",
+                                                    snapshotId)));
+                    return new InternalRow[] {outputRow};
+                });
+    }
+
+    public static ProcedureBuilder builder() {
+        return new BaseProcedure.Builder<RollbackToWatermarkProcedure>() {
+            @Override
+            public RollbackToWatermarkProcedure doBuild() {
+                return new RollbackToWatermarkProcedure(tableCatalog());
+            }
+        };
+    }
+
+    @Override
+    public String description() {
+        return "RollbackToWatermarkProcedure";
+    }
+}

Reply via email to