This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.1
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit dd7b60976557973cd2b45159114825f4f69bc8ba
Author: Zouxxyy <[email protected]>
AuthorDate: Thu May 1 08:56:58 2025 +0800

    [spark] Fix NPE when replace tag if tag exists (#5561)
---
 .../main/java/org/apache/paimon/table/Table.java   |  8 +++--
 .../spark/execution/CreateOrReplaceTagExec.scala   | 42 +++++++++++-----------
 .../paimon/spark/sql/PaimonTagDdlTestBase.scala    | 30 +++++++++++++++-
 3 files changed, 54 insertions(+), 26 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java 
b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
index e72073317c..9c1f07f3aa 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
@@ -32,6 +32,8 @@ import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.SimpleFileReader;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 import java.time.Duration;
 import java.util.Arrays;
@@ -121,21 +123,21 @@ public interface Table extends Serializable {
     void createTag(String tagName, long fromSnapshotId);
 
     @Experimental
-    void createTag(String tagName, long fromSnapshotId, Duration timeRetained);
+    void createTag(String tagName, long fromSnapshotId, @Nullable Duration 
timeRetained);
 
     /** Create a tag from latest snapshot. */
     @Experimental
     void createTag(String tagName);
 
     @Experimental
-    void createTag(String tagName, Duration timeRetained);
+    void createTag(String tagName, @Nullable Duration timeRetained);
 
     @Experimental
     void renameTag(String tagName, String targetTagName);
 
     /** Replace a tag with new snapshot id and new time retained. */
     @Experimental
-    void replaceTag(String tagName, Long fromSnapshotId, Duration 
timeRetained);
+    void replaceTag(String tagName, @Nullable Long fromSnapshotId, @Nullable 
Duration timeRetained);
 
     /** Delete a tag by name. */
     @Experimental
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CreateOrReplaceTagExec.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CreateOrReplaceTagExec.scala
index 0506ed42f1..5a1f356d85 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CreateOrReplaceTagExec.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CreateOrReplaceTagExec.scala
@@ -23,6 +23,7 @@ import 
org.apache.paimon.spark.catalyst.plans.logical.TagOptions
 import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec
 import org.apache.paimon.table.FileStoreTable
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
@@ -35,40 +36,37 @@ case class CreateOrReplaceTagExec(
     create: Boolean,
     replace: Boolean,
     ifNotExists: Boolean)
-  extends PaimonLeafV2CommandExec {
+  extends PaimonLeafV2CommandExec
+  with Logging {
 
   override protected def run(): Seq[InternalRow] = {
-    val table = catalog.loadTable(ident)
-    assert(table.isInstanceOf[SparkTable])
-
-    table.asInstanceOf[SparkTable].getTable match {
-      case paimonTable: FileStoreTable =>
-        val tagIsExists = paimonTable.tagManager().tagExists(tagName)
+    catalog.loadTable(ident) match {
+      case SparkTable(paimonTable: FileStoreTable) =>
+        val tagExists = paimonTable.tagManager().tagExists(tagName)
         val timeRetained = tagOptions.timeRetained.orNull
-        val snapshotId = tagOptions.snapshotId
+        val snapshotIdOpt = tagOptions.snapshotId
 
-        if (create && replace && !tagIsExists) {
-          if (snapshotId.isEmpty) {
-            paimonTable.createTag(tagName, timeRetained)
+        if ((create || replace) && !tagExists) {
+          if (snapshotIdOpt.isDefined) {
+            paimonTable.createTag(tagName, snapshotIdOpt.get, timeRetained)
           } else {
-            paimonTable.createTag(tagName, snapshotId.get, timeRetained)
+            paimonTable.createTag(tagName, timeRetained)
           }
         } else if (replace) {
-          paimonTable.replaceTag(tagName, snapshotId.get, timeRetained)
-        } else {
-          if (tagIsExists && ifNotExists) {
-            return Nil
+          if (snapshotIdOpt.isDefined) {
+            paimonTable.replaceTag(tagName, snapshotIdOpt.get, timeRetained)
+          } else {
+            paimonTable.replaceTag(tagName, null, timeRetained)
           }
-
-          if (snapshotId.isEmpty) {
-            paimonTable.createTag(tagName, timeRetained)
+        } else {
+          if (ifNotExists) {
+            logInfo(s"Tag $tagName is exists, skip creating tag.")
           } else {
-            paimonTable.createTag(tagName, snapshotId.get, timeRetained)
+            throw new RuntimeException(s"Tag $tagName is exists.")
           }
         }
       case t =>
-        throw new UnsupportedOperationException(
-          s"Can not create tag for non-paimon FileStoreTable: $t")
+        throw new UnsupportedOperationException(s"Unsupported table : $t")
     }
     Nil
   }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonTagDdlTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonTagDdlTestBase.scala
index 047a222c4a..27cb52cafe 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonTagDdlTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonTagDdlTestBase.scala
@@ -23,6 +23,7 @@ import org.apache.paimon.spark.PaimonSparkTestBase
 import org.apache.spark.sql.Row
 
 abstract class PaimonTagDdlTestBase extends PaimonSparkTestBase {
+
   test("Tag ddl: show tags syntax") {
     spark.sql("""CREATE TABLE T (id INT, name STRING)
                 |USING PAIMON
@@ -39,7 +40,7 @@ abstract class PaimonTagDdlTestBase extends 
PaimonSparkTestBase {
       Row("2024-10-11") :: Row("2024-10-12") :: Row("2024-10-13") :: Nil)
   }
 
-  test("Tag ddl: alter table t crete tag syntax") {
+  test("Tag ddl: alter table t create tag syntax") {
     spark.sql("""CREATE TABLE T (id INT, name STRING)
                 |USING PAIMON
                 |TBLPROPERTIES ('primary-key'='id')""".stripMargin)
@@ -100,6 +101,33 @@ abstract class PaimonTagDdlTestBase extends 
PaimonSparkTestBase {
       Row("tag-2", 2, "PT1H") :: Nil)
   }
 
+  test("Tag ddl: alter table t create or replace tag twice") {
+    sql("""
+          |CREATE TABLE T (id INT, name STRING)
+          |USING PAIMON
+          |TBLPROPERTIES ('primary-key'='id')
+          |""".stripMargin)
+
+    sql("INSERT INTO T VALUES (1, 'a')")
+
+    sql("ALTER TABLE  T CREATE TAG `tag-1`")
+    checkAnswer(sql("SHOW TAGS T"), Row("tag-1"))
+
+    // create tag
+    val t = intercept[Throwable] {
+      sql("ALTER TABLE T CREATE TAG `tag-1`")
+    }
+    assert(t.getMessage.contains("Tag tag-1 is exists."))
+    sql("ALTER TABLE T CREATE TAG IF NOT EXISTS `tag-1`")
+    sql("ALTER TABLE T CREATE TAG IF NOT EXISTS `tag-1` AS OF VERSION 1")
+    sql("ALTER TABLE T CREATE TAG IF NOT EXISTS `tag-1` AS OF VERSION 1 RETAIN 
1 HOURS")
+
+    // replace tag
+    sql("ALTER TABLE T CREATE OR REPLACE TAG `tag-1`")
+    sql("ALTER TABLE T CREATE OR REPLACE TAG `tag-1` AS OF VERSION 1")
+    sql("ALTER TABLE T CREATE OR REPLACE TAG `tag-1` AS OF VERSION 1 RETAIN 1 
HOURS")
+  }
+
   test("Tag ddl: alter table t delete tag syntax") {
     spark.sql("""CREATE TABLE T (id INT, name STRING)
                 |USING PAIMON

Reply via email to