This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.1 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit dd7b60976557973cd2b45159114825f4f69bc8ba Author: Zouxxyy <[email protected]> AuthorDate: Thu May 1 08:56:58 2025 +0800 [spark] Fix NPE when replace tag if tag exists (#5561) --- .../main/java/org/apache/paimon/table/Table.java | 8 +++-- .../spark/execution/CreateOrReplaceTagExec.scala | 42 +++++++++++----------- .../paimon/spark/sql/PaimonTagDdlTestBase.scala | 30 +++++++++++++++- 3 files changed, 54 insertions(+), 26 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java index e72073317c..9c1f07f3aa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java @@ -32,6 +32,8 @@ import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.SimpleFileReader; +import javax.annotation.Nullable; + import java.io.Serializable; import java.time.Duration; import java.util.Arrays; @@ -121,21 +123,21 @@ public interface Table extends Serializable { void createTag(String tagName, long fromSnapshotId); @Experimental - void createTag(String tagName, long fromSnapshotId, Duration timeRetained); + void createTag(String tagName, long fromSnapshotId, @Nullable Duration timeRetained); /** Create a tag from latest snapshot. */ @Experimental void createTag(String tagName); @Experimental - void createTag(String tagName, Duration timeRetained); + void createTag(String tagName, @Nullable Duration timeRetained); @Experimental void renameTag(String tagName, String targetTagName); /** Replace a tag with new snapshot id and new time retained. */ @Experimental - void replaceTag(String tagName, Long fromSnapshotId, Duration timeRetained); + void replaceTag(String tagName, @Nullable Long fromSnapshotId, @Nullable Duration timeRetained); /** Delete a tag by name. */ @Experimental diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CreateOrReplaceTagExec.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CreateOrReplaceTagExec.scala index 0506ed42f1..5a1f356d85 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CreateOrReplaceTagExec.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CreateOrReplaceTagExec.scala @@ -23,6 +23,7 @@ import org.apache.paimon.spark.catalyst.plans.logical.TagOptions import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec import org.apache.paimon.table.FileStoreTable +import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} @@ -35,40 +36,37 @@ case class CreateOrReplaceTagExec( create: Boolean, replace: Boolean, ifNotExists: Boolean) - extends PaimonLeafV2CommandExec { + extends PaimonLeafV2CommandExec + with Logging { override protected def run(): Seq[InternalRow] = { - val table = catalog.loadTable(ident) - assert(table.isInstanceOf[SparkTable]) - - table.asInstanceOf[SparkTable].getTable match { - case paimonTable: FileStoreTable => - val tagIsExists = paimonTable.tagManager().tagExists(tagName) + catalog.loadTable(ident) match { + case SparkTable(paimonTable: FileStoreTable) => + val tagExists = paimonTable.tagManager().tagExists(tagName) val timeRetained = tagOptions.timeRetained.orNull - val snapshotId = tagOptions.snapshotId + val snapshotIdOpt = tagOptions.snapshotId - if (create && replace && !tagIsExists) { - if (snapshotId.isEmpty) { - paimonTable.createTag(tagName, timeRetained) + if ((create || replace) && !tagExists) { + if (snapshotIdOpt.isDefined) { + paimonTable.createTag(tagName, snapshotIdOpt.get, timeRetained) } else { - paimonTable.createTag(tagName, snapshotId.get, timeRetained) + paimonTable.createTag(tagName, timeRetained) } } else if (replace) { - paimonTable.replaceTag(tagName, snapshotId.get, timeRetained) - } else { - if (tagIsExists && ifNotExists) { - return Nil + if (snapshotIdOpt.isDefined) { + paimonTable.replaceTag(tagName, snapshotIdOpt.get, timeRetained) + } else { + paimonTable.replaceTag(tagName, null, timeRetained) } - - if (snapshotId.isEmpty) { - paimonTable.createTag(tagName, timeRetained) + } else { + if (ifNotExists) { + logInfo(s"Tag $tagName is exists, skip creating tag.") } else { - paimonTable.createTag(tagName, snapshotId.get, timeRetained) + throw new RuntimeException(s"Tag $tagName is exists.") } } case t => - throw new UnsupportedOperationException( - s"Can not create tag for non-paimon FileStoreTable: $t") + throw new UnsupportedOperationException(s"Unsupported table : $t") } Nil } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonTagDdlTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonTagDdlTestBase.scala index 047a222c4a..27cb52cafe 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonTagDdlTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonTagDdlTestBase.scala @@ -23,6 +23,7 @@ import org.apache.paimon.spark.PaimonSparkTestBase import org.apache.spark.sql.Row abstract class PaimonTagDdlTestBase extends PaimonSparkTestBase { + test("Tag ddl: show tags syntax") { spark.sql("""CREATE TABLE T (id INT, name STRING) |USING PAIMON @@ -39,7 +40,7 @@ abstract class PaimonTagDdlTestBase extends PaimonSparkTestBase { Row("2024-10-11") :: Row("2024-10-12") :: Row("2024-10-13") :: Nil) } - test("Tag ddl: alter table t crete tag syntax") { + test("Tag ddl: alter table t create tag syntax") { spark.sql("""CREATE TABLE T (id INT, name STRING) |USING PAIMON |TBLPROPERTIES ('primary-key'='id')""".stripMargin) @@ -100,6 +101,33 @@ abstract class PaimonTagDdlTestBase extends PaimonSparkTestBase { Row("tag-2", 2, "PT1H") :: Nil) } + test("Tag ddl: alter table t create or replace tag twice") { + sql(""" + |CREATE TABLE T (id INT, name STRING) + |USING PAIMON + |TBLPROPERTIES ('primary-key'='id') + |""".stripMargin) + + sql("INSERT INTO T VALUES (1, 'a')") + + sql("ALTER TABLE T CREATE TAG `tag-1`") + checkAnswer(sql("SHOW TAGS T"), Row("tag-1")) + + // create tag + val t = intercept[Throwable] { + sql("ALTER TABLE T CREATE TAG `tag-1`") + } + assert(t.getMessage.contains("Tag tag-1 is exists.")) + sql("ALTER TABLE T CREATE TAG IF NOT EXISTS `tag-1`") + sql("ALTER TABLE T CREATE TAG IF NOT EXISTS `tag-1` AS OF VERSION 1") + sql("ALTER TABLE T CREATE TAG IF NOT EXISTS `tag-1` AS OF VERSION 1 RETAIN 1 HOURS") + + // replace tag + sql("ALTER TABLE T CREATE OR REPLACE TAG `tag-1`") + sql("ALTER TABLE T CREATE OR REPLACE TAG `tag-1` AS OF VERSION 1") + sql("ALTER TABLE T CREATE OR REPLACE TAG `tag-1` AS OF VERSION 1 RETAIN 1 HOURS") + } + test("Tag ddl: alter table t delete tag syntax") { spark.sql("""CREATE TABLE T (id INT, name STRING) |USING PAIMON
