This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.0 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit d9efecf68a7ff8c735b289064234213a2a5d0f13 Author: xuzifu666 <[email protected]> AuthorDate: Wed Jan 22 11:06:33 2025 +0800 [spark] Fix rollback not correctly identify tag or snapshot (#4947) --- docs/content/spark/procedures.md | 10 ++-- .../paimon/spark/procedure/RollbackProcedure.java | 36 +++++++++++-- .../spark/procedure/RollbackProcedureTest.scala | 60 ++++++++++++++++++++++ 3 files changed, 98 insertions(+), 8 deletions(-) diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md index bf7b8ae2d5..13d24d4d33 100644 --- a/docs/content/spark/procedures.md +++ b/docs/content/spark/procedures.md @@ -159,13 +159,17 @@ This section introduce all available spark procedures about paimon. <tr> <td>rollback</td> <td> - To rollback to a specific version of target table. Argument: + To rollback to a specific version of target table, note version/snapshot/tag must set one of them. Argument: <li>table: the target table identifier. Cannot be empty.</li> - <li>version: id of the snapshot or name of tag that will roll back to.</li> + <li>version: id of the snapshot or name of tag that will roll back to, version would be Deprecated.</li> + <li>snapshot: snapshot that will roll back to.</li> + <li>tag: tag that will roll back to.</li> </td> <td> CALL sys.rollback(table => 'default.T', version => 'my_tag')<br/><br/> - CALL sys.rollback(table => 'default.T', version => 10) + CALL sys.rollback(table => 'default.T', version => 10)<br/><br/> + CALL sys.rollback(table => 'default.T', tag => 'tag1') + CALL sys.rollback(table => 'default.T', snapshot => 2) </td> </tr> <tr> diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java index 6d004e9466..d9a8876332 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java @@ -18,6 +18,9 @@ package org.apache.paimon.spark.procedure; +import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.utils.StringUtils; + import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; @@ -26,6 +29,7 @@ import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import static org.apache.spark.sql.types.DataTypes.LongType; import static org.apache.spark.sql.types.DataTypes.StringType; /** A procedure to rollback to a snapshot or a tag. */ @@ -35,7 +39,9 @@ public class RollbackProcedure extends BaseProcedure { new ProcedureParameter[] { ProcedureParameter.required("table", StringType), // snapshot id or tag name - ProcedureParameter.required("version", StringType) + ProcedureParameter.optional("version", StringType), + ProcedureParameter.optional("snapshot", LongType), + ProcedureParameter.optional("tag", StringType) }; private static final StructType OUTPUT_TYPE = @@ -61,15 +67,35 @@ public class RollbackProcedure extends BaseProcedure { @Override public InternalRow[] call(InternalRow args) { Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); - String version = args.getString(1); + String version = args.isNullAt(1) ? null : args.getString(1); return modifyPaimonTable( tableIdent, table -> { - if (version.chars().allMatch(Character::isDigit)) { - table.rollbackTo(Long.parseLong(version)); + Long snapshot = null; + String tag = null; + if (!StringUtils.isNullOrWhitespaceOnly(version)) { + Preconditions.checkState( + args.isNullAt(2) && args.isNullAt(3), + "only can set one of version/snapshot/tag in RollbackProcedure."); + if (version.chars().allMatch(Character::isDigit)) { + snapshot = Long.parseLong(version); + } else { + tag = version; + } + } else { + Preconditions.checkState( + (args.isNullAt(2) && !args.isNullAt(3) + || !args.isNullAt(2) && args.isNullAt(3)), + "only can set one of version/snapshot/tag in RollbackProcedure."); + snapshot = args.isNullAt(2) ? null : args.getLong(2); + tag = args.isNullAt(3) ? null : args.getString(3); + } + + if (snapshot != null) { + table.rollbackTo(snapshot); } else { - table.rollbackTo(version); + table.rollbackTo(tag); } InternalRow outputRow = newInternalRow(true); return new InternalRow[] {outputRow}; diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala index 5f5facc57a..dde0af3d22 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala @@ -94,6 +94,66 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest { } } + test("Paimon Procedure: rollback to tag check test") { + spark.sql(s""" + |CREATE TABLE T (a INT, b STRING) + |TBLPROPERTIES ('primary-key'='a', 'bucket'='3', 'file.format'='orc') + |""".stripMargin) + + val query = () => spark.sql("SELECT * FROM T ORDER BY a") + + // snapshot-1 + spark.sql("insert into T select 1, 'a'") + checkAnswer(query(), Row(1, "a") :: Nil) + + checkAnswer( + spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => '20250122', snapshot => 1)"), + Row(true) :: Nil) + + // snapshot-2 + spark.sql("insert into T select 2, 'b'") + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) + + // snapshot-3 + spark.sql("insert into T select 3, 'c'") + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil) + + // snapshot-4 + spark.sql("insert into T select 4, 'd'") + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Row(4, "d") :: Nil) + + assertThrows[RuntimeException] { + spark.sql("CALL paimon.sys.rollback(table => 'test.T_exception', version => '4')") + } + // rollback to snapshot + checkAnswer( + spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '3')"), + Row(true) :: Nil) + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil) + + // version/snapshot/tag can only set one of them + assertThrows[RuntimeException] { + spark.sql( + "CALL paimon.sys.rollback(table => 'test.T', version => '20250122', tag => '20250122')") + } + + assertThrows[RuntimeException] { + spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '20250122', snapshot => 1)") + } + + assertThrows[RuntimeException] { + spark.sql("CALL paimon.sys.rollback(table => 'test.T', tag => '20250122', snapshot => 1)") + } + + // rollback to snapshot + spark.sql("CALL paimon.sys.rollback(table => 'test.T', snapshot => 2)") + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) + + // rollback to tag + spark.sql("CALL paimon.sys.rollback(table => 'test.T', tag => '20250122')") + checkAnswer(query(), Row(1, "a") :: Nil) + } + test("Paimon Procedure: rollback to timestamp") { failAfter(streamingTimeout) { withTempDir {
