This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new f9272456b8 [KYUUBI #6973][AUTHZ] Support Paimon DELETE FROM / UPDATE / 
MERGE INTO commands
f9272456b8 is described below

commit f9272456b8320b9180ad12579ce2b5f591be0824
Author: davidyuan <[email protected]>
AuthorDate: Wed Mar 12 13:29:17 2025 +0800

    [KYUUBI #6973][AUTHZ] Support Paimon DELETE FROM / UPDATE / MERGE INTO 
commands
    
    ### Why are the changes needed?
    
    Support ranger check with paimon Update & Delete & MergeInti Table Command
    
    ### How was this patch tested?
    
    Test Paimon Update & Delete Table Command with ranger check
    #6973
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    This patch had conflicts when merged, resolved by
    Committer: Cheng Pan <[email protected]>
    
    Closes #6974 from davidyuan1223/update_table.
    
    Closes #6973
    
    3bd607300 [davidyuan] update
    1d68494ce [davidyuan] Test MergeInto
    a27ea633f [davidyuan] Test MergeInto
    56638f47c [davidyuan] Merge branch 'master' into update_table
    1c3464df5 [davidyuan] Test Table Update & Delete
    
    Authored-by: davidyuan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../src/main/resources/table_command_spec.json     | 73 ++++++++++++++++++++++
 .../spark/authz/gen/JsonSpecFileGenerator.scala    |  4 +-
 .../plugin/spark/authz/gen/PaimonCommands.scala    | 63 +++++++++++++++++++
 .../PaimonCatalogRangerSparkExtensionSuite.scala   | 72 +++++++++++++++++++++
 4 files changed, 211 insertions(+), 1 deletion(-)

diff --git 
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
 
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
index 18a39b0131..2c7f2d4727 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
+++ 
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
@@ -2589,4 +2589,77 @@
     "isInput" : false,
     "comment" : "Delta"
   } ]
+}, {
+  "classname" : 
"org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand",
+  "tableDescs" : [ {
+    "fieldName" : "relation",
+    "fieldExtractor" : "DataSourceV2RelationTableExtractor",
+    "columnDesc" : null,
+    "actionTypeDesc" : {
+      "fieldName" : null,
+      "fieldExtractor" : null,
+      "actionType" : "UPDATE",
+      "comment" : ""
+    },
+    "tableTypeDesc" : null,
+    "catalogDesc" : null,
+    "isInput" : false,
+    "setCurrentDatabaseIfMissing" : false,
+    "comment" : ""
+  } ],
+  "opType" : "QUERY",
+  "queryDescs" : [ ],
+  "uriDescs" : [ ]
+}, {
+  "classname" : "org.apache.paimon.spark.commands.MergeIntoPaimonTable",
+  "tableDescs" : [ {
+    "fieldName" : "targetTable",
+    "fieldExtractor" : "DataSourceV2RelationTableExtractor",
+    "columnDesc" : null,
+    "actionTypeDesc" : {
+      "fieldName" : null,
+      "fieldExtractor" : null,
+      "actionType" : "UPDATE",
+      "comment" : ""
+    },
+    "tableTypeDesc" : null,
+    "catalogDesc" : null,
+    "isInput" : false,
+    "setCurrentDatabaseIfMissing" : false,
+    "comment" : ""
+  }, {
+    "fieldName" : "sourceTable",
+    "fieldExtractor" : "DataSourceV2RelationTableExtractor",
+    "columnDesc" : null,
+    "actionTypeDesc" : null,
+    "tableTypeDesc" : null,
+    "catalogDesc" : null,
+    "isInput" : true,
+    "setCurrentDatabaseIfMissing" : false,
+    "comment" : ""
+  } ],
+  "opType" : "QUERY",
+  "queryDescs" : [ ],
+  "uriDescs" : [ ]
+}, {
+  "classname" : "org.apache.paimon.spark.commands.UpdatePaimonTableCommand",
+  "tableDescs" : [ {
+    "fieldName" : "relation",
+    "fieldExtractor" : "DataSourceV2RelationTableExtractor",
+    "columnDesc" : null,
+    "actionTypeDesc" : {
+      "fieldName" : null,
+      "fieldExtractor" : null,
+      "actionType" : "UPDATE",
+      "comment" : ""
+    },
+    "tableTypeDesc" : null,
+    "catalogDesc" : null,
+    "isInput" : false,
+    "setCurrentDatabaseIfMissing" : false,
+    "comment" : ""
+  } ],
+  "opType" : "QUERY",
+  "queryDescs" : [ ],
+  "uriDescs" : [ ]
 } ]
\ No newline at end of file
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/JsonSpecFileGenerator.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/JsonSpecFileGenerator.scala
index 58d161ce05..4d76a8b18e 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/JsonSpecFileGenerator.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/JsonSpecFileGenerator.scala
@@ -45,7 +45,9 @@ class JsonSpecFileGenerator extends AnyFunSuite {
   // scalastyle:on
   test("check spec json files") {
     writeCommandSpecJson("database", Seq(DatabaseCommands))
-    writeCommandSpecJson("table", Seq(TableCommands, IcebergCommands, 
HudiCommands, DeltaCommands))
+    writeCommandSpecJson(
+      "table",
+      Seq(TableCommands, IcebergCommands, HudiCommands, DeltaCommands, 
PaimonCommands))
     writeCommandSpecJson("function", Seq(FunctionCommands))
     writeCommandSpecJson("scan", Seq(Scans))
   }
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/PaimonCommands.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/PaimonCommands.scala
new file mode 100644
index 0000000000..4d3a111591
--- /dev/null
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/PaimonCommands.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.spark.authz.gen
+
+import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectActionType._
+import org.apache.kyuubi.plugin.spark.authz.serde._
+
+object PaimonCommands extends CommandSpecs[TableCommandSpec] {
+
+  val UpdatePaimonTable = {
+    val cmd = "org.apache.paimon.spark.commands.UpdatePaimonTableCommand"
+    val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
+    val paimonFileStoreTableDesc = TableDesc(
+      "relation",
+      classOf[DataSourceV2RelationTableExtractor],
+      actionTypeDesc = Some(actionTypeDesc))
+    TableCommandSpec(cmd, Seq(paimonFileStoreTableDesc))
+  }
+
+  val DeleteFromPaimonTable = {
+    val cmd = "org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand"
+    val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
+    val paimonFileStoreTableDesc = TableDesc(
+      "relation",
+      classOf[DataSourceV2RelationTableExtractor],
+      actionTypeDesc = Some(actionTypeDesc))
+    TableCommandSpec(cmd, Seq(paimonFileStoreTableDesc))
+  }
+
+  val MergeIntoPaimonTable = {
+    val cmd = "org.apache.paimon.spark.commands.MergeIntoPaimonTable"
+    val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
+    val targetTableDesc = TableDesc(
+      "targetTable",
+      classOf[DataSourceV2RelationTableExtractor],
+      actionTypeDesc = Some(actionTypeDesc))
+    val sourceTableDesc = TableDesc(
+      "sourceTable",
+      classOf[DataSourceV2RelationTableExtractor],
+      isInput = true)
+    TableCommandSpec(cmd, Seq(targetTableDesc, sourceTableDesc))
+  }
+
+  override def specs: Seq[TableCommandSpec] = Seq(
+    UpdatePaimonTable,
+    DeleteFromPaimonTable,
+    MergeIntoPaimonTable)
+}
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/PaimonCatalogRangerSparkExtensionSuite.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/PaimonCatalogRangerSparkExtensionSuite.scala
index 532f6450d7..4248ea4416 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/PaimonCatalogRangerSparkExtensionSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/PaimonCatalogRangerSparkExtensionSuite.scala
@@ -32,6 +32,9 @@ import org.apache.kyuubi.util.AssertionUtils._
 class PaimonCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite 
{
   override protected val catalogImpl: String = "hive"
   private def isSupportedVersion = isScalaV212
+  override protected val sqlExtensions: String =
+    if (isSupportedVersion) 
"org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions"
+    else ""
 
   val catalogV2 = "paimon_catalog"
   val namespace1 = "paimon_ns"
@@ -459,6 +462,75 @@ class PaimonCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite {
     }
   }
 
+  test("UPDATE & DELETE FROM") {
+    if (isSupportedVersion) {
+      withCleanTmpResources(Seq(
+        (s"$catalogV2.$namespace1.$table1", "table"))) {
+        val createTable = createTableSql(namespace1, table1)
+        doAs(admin, sql(createTable))
+        doAs(admin, sql(s"INSERT INTO $catalogV2.$namespace1.$table1 VALUES 
(1, 'a'), (2, 'b')"))
+
+        val updateSql =
+          s"""
+             |UPDATE $catalogV2.$namespace1.$table1 SET name='c' where id=1
+             |""".stripMargin
+        val deleteSql =
+          s"""
+             |DELETE FROM $catalogV2.$namespace1.$table1 WHERE id=1
+             |""".stripMargin
+        interceptEndsWith[AccessControlException] {
+          doAs(table1OnlyUserForNs, sql(updateSql))
+        }(s"does not have [update] privilege on [$namespace1/$table1]")
+        interceptEndsWith[AccessControlException] {
+          doAs(someone, sql(updateSql))
+        }(s"does not have [update] privilege on [$namespace1/$table1]")
+        doAs(admin, sql(updateSql))
+
+        interceptEndsWith[AccessControlException] {
+          doAs(table1OnlyUserForNs, sql(deleteSql))
+        }(s"does not have [update] privilege on [$namespace1/$table1]")
+        interceptEndsWith[AccessControlException] {
+          doAs(someone, sql(deleteSql))
+        }(s"does not have [update] privilege on [$namespace1/$table1]")
+        doAs(admin, sql(deleteSql))
+      }
+    }
+  }
+
+  test("MERGE INTO") {
+    if (isSupportedVersion) {
+      val table2 = "table2"
+      withCleanTmpResources(Seq(
+        (s"$catalogV2.$namespace1.$table1", "table"),
+        (s"$catalogV2.$namespace1.$table2", "table"))) {
+        doAs(admin, sql(createTableSql(namespace1, table1)))
+        doAs(admin, sql(createTableSql(namespace1, table2)))
+
+        doAs(admin, sql(s"INSERT INTO $catalogV2.$namespace1.$table1 VALUES 
(1, 'a'), (2, 'b')"))
+        doAs(admin, sql(s"INSERT INTO $catalogV2.$namespace1.$table1 VALUES 
(2, 'c'), (3, 'd')"))
+
+        val mergeIntoSql =
+          s"""
+             |MERGE INTO $catalogV2.$namespace1.$table2
+             |USING $catalogV2.$namespace1.$table1
+             |ON
+             |$catalogV2.$namespace1.$table2.id = 
$catalogV2.$namespace1.$table1.id
+             |WHEN MATCHED THEN
+             |  UPDATE SET name = $catalogV2.$namespace1.$table1.name
+             |WHEN NOT MATCHED THEN
+             |  INSERT *
+             |""".stripMargin
+        interceptEndsWith[AccessControlException] {
+          doAs(table1OnlyUserForNs, sql(mergeIntoSql))
+        }(s"does not have [update] privilege on [$namespace1/$table2]")
+        interceptEndsWith[AccessControlException] {
+          doAs(someone, sql(mergeIntoSql))
+        }(s"does not have [select] privilege on [$namespace1/$table1]")
+        doAs(admin, sql(mergeIntoSql))
+      }
+    }
+  }
+
   def createTableSql(namespace: String, table: String): String =
     s"""
        |CREATE TABLE IF NOT EXISTS $catalogV2.$namespace.$table

Reply via email to