This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new f9272456b8 [KYUUBI #6973][AUTHZ] Support Paimon DELETE FROM / UPDATE /
MERGE INTO commands
f9272456b8 is described below
commit f9272456b8320b9180ad12579ce2b5f591be0824
Author: davidyuan <[email protected]>
AuthorDate: Wed Mar 12 13:29:17 2025 +0800
[KYUUBI #6973][AUTHZ] Support Paimon DELETE FROM / UPDATE / MERGE INTO
commands
### Why are the changes needed?
Support ranger check with paimon Update & Delete & MergeInti Table Command
### How was this patch tested?
Test Paimon Update & Delete Table Command with ranger check
#6973
### Was this patch authored or co-authored using generative AI tooling?
No
This patch had conflicts when merged, resolved by
Committer: Cheng Pan <[email protected]>
Closes #6974 from davidyuan1223/update_table.
Closes #6973
3bd607300 [davidyuan] update
1d68494ce [davidyuan] Test MergeInto
a27ea633f [davidyuan] Test MergeInto
56638f47c [davidyuan] Merge branch 'master' into update_table
1c3464df5 [davidyuan] Test Table Update & Delete
Authored-by: davidyuan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../src/main/resources/table_command_spec.json | 73 ++++++++++++++++++++++
.../spark/authz/gen/JsonSpecFileGenerator.scala | 4 +-
.../plugin/spark/authz/gen/PaimonCommands.scala | 63 +++++++++++++++++++
.../PaimonCatalogRangerSparkExtensionSuite.scala | 72 +++++++++++++++++++++
4 files changed, 211 insertions(+), 1 deletion(-)
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
index 18a39b0131..2c7f2d4727 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
+++
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
@@ -2589,4 +2589,77 @@
"isInput" : false,
"comment" : "Delta"
} ]
+}, {
+ "classname" :
"org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand",
+ "tableDescs" : [ {
+ "fieldName" : "relation",
+ "fieldExtractor" : "DataSourceV2RelationTableExtractor",
+ "columnDesc" : null,
+ "actionTypeDesc" : {
+ "fieldName" : null,
+ "fieldExtractor" : null,
+ "actionType" : "UPDATE",
+ "comment" : ""
+ },
+ "tableTypeDesc" : null,
+ "catalogDesc" : null,
+ "isInput" : false,
+ "setCurrentDatabaseIfMissing" : false,
+ "comment" : ""
+ } ],
+ "opType" : "QUERY",
+ "queryDescs" : [ ],
+ "uriDescs" : [ ]
+}, {
+ "classname" : "org.apache.paimon.spark.commands.MergeIntoPaimonTable",
+ "tableDescs" : [ {
+ "fieldName" : "targetTable",
+ "fieldExtractor" : "DataSourceV2RelationTableExtractor",
+ "columnDesc" : null,
+ "actionTypeDesc" : {
+ "fieldName" : null,
+ "fieldExtractor" : null,
+ "actionType" : "UPDATE",
+ "comment" : ""
+ },
+ "tableTypeDesc" : null,
+ "catalogDesc" : null,
+ "isInput" : false,
+ "setCurrentDatabaseIfMissing" : false,
+ "comment" : ""
+ }, {
+ "fieldName" : "sourceTable",
+ "fieldExtractor" : "DataSourceV2RelationTableExtractor",
+ "columnDesc" : null,
+ "actionTypeDesc" : null,
+ "tableTypeDesc" : null,
+ "catalogDesc" : null,
+ "isInput" : true,
+ "setCurrentDatabaseIfMissing" : false,
+ "comment" : ""
+ } ],
+ "opType" : "QUERY",
+ "queryDescs" : [ ],
+ "uriDescs" : [ ]
+}, {
+ "classname" : "org.apache.paimon.spark.commands.UpdatePaimonTableCommand",
+ "tableDescs" : [ {
+ "fieldName" : "relation",
+ "fieldExtractor" : "DataSourceV2RelationTableExtractor",
+ "columnDesc" : null,
+ "actionTypeDesc" : {
+ "fieldName" : null,
+ "fieldExtractor" : null,
+ "actionType" : "UPDATE",
+ "comment" : ""
+ },
+ "tableTypeDesc" : null,
+ "catalogDesc" : null,
+ "isInput" : false,
+ "setCurrentDatabaseIfMissing" : false,
+ "comment" : ""
+ } ],
+ "opType" : "QUERY",
+ "queryDescs" : [ ],
+ "uriDescs" : [ ]
} ]
\ No newline at end of file
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/JsonSpecFileGenerator.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/JsonSpecFileGenerator.scala
index 58d161ce05..4d76a8b18e 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/JsonSpecFileGenerator.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/JsonSpecFileGenerator.scala
@@ -45,7 +45,9 @@ class JsonSpecFileGenerator extends AnyFunSuite {
// scalastyle:on
test("check spec json files") {
writeCommandSpecJson("database", Seq(DatabaseCommands))
- writeCommandSpecJson("table", Seq(TableCommands, IcebergCommands,
HudiCommands, DeltaCommands))
+ writeCommandSpecJson(
+ "table",
+ Seq(TableCommands, IcebergCommands, HudiCommands, DeltaCommands,
PaimonCommands))
writeCommandSpecJson("function", Seq(FunctionCommands))
writeCommandSpecJson("scan", Seq(Scans))
}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/PaimonCommands.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/PaimonCommands.scala
new file mode 100644
index 0000000000..4d3a111591
--- /dev/null
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/PaimonCommands.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.spark.authz.gen
+
+import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectActionType._
+import org.apache.kyuubi.plugin.spark.authz.serde._
+
+object PaimonCommands extends CommandSpecs[TableCommandSpec] {
+
+ val UpdatePaimonTable = {
+ val cmd = "org.apache.paimon.spark.commands.UpdatePaimonTableCommand"
+ val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
+ val paimonFileStoreTableDesc = TableDesc(
+ "relation",
+ classOf[DataSourceV2RelationTableExtractor],
+ actionTypeDesc = Some(actionTypeDesc))
+ TableCommandSpec(cmd, Seq(paimonFileStoreTableDesc))
+ }
+
+ val DeleteFromPaimonTable = {
+ val cmd = "org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand"
+ val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
+ val paimonFileStoreTableDesc = TableDesc(
+ "relation",
+ classOf[DataSourceV2RelationTableExtractor],
+ actionTypeDesc = Some(actionTypeDesc))
+ TableCommandSpec(cmd, Seq(paimonFileStoreTableDesc))
+ }
+
+ val MergeIntoPaimonTable = {
+ val cmd = "org.apache.paimon.spark.commands.MergeIntoPaimonTable"
+ val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
+ val targetTableDesc = TableDesc(
+ "targetTable",
+ classOf[DataSourceV2RelationTableExtractor],
+ actionTypeDesc = Some(actionTypeDesc))
+ val sourceTableDesc = TableDesc(
+ "sourceTable",
+ classOf[DataSourceV2RelationTableExtractor],
+ isInput = true)
+ TableCommandSpec(cmd, Seq(targetTableDesc, sourceTableDesc))
+ }
+
+ override def specs: Seq[TableCommandSpec] = Seq(
+ UpdatePaimonTable,
+ DeleteFromPaimonTable,
+ MergeIntoPaimonTable)
+}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/PaimonCatalogRangerSparkExtensionSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/PaimonCatalogRangerSparkExtensionSuite.scala
index 532f6450d7..4248ea4416 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/PaimonCatalogRangerSparkExtensionSuite.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/PaimonCatalogRangerSparkExtensionSuite.scala
@@ -32,6 +32,9 @@ import org.apache.kyuubi.util.AssertionUtils._
class PaimonCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite
{
override protected val catalogImpl: String = "hive"
private def isSupportedVersion = isScalaV212
+ override protected val sqlExtensions: String =
+ if (isSupportedVersion)
"org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions"
+ else ""
val catalogV2 = "paimon_catalog"
val namespace1 = "paimon_ns"
@@ -459,6 +462,75 @@ class PaimonCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
}
}
+ test("UPDATE & DELETE FROM") {
+ if (isSupportedVersion) {
+ withCleanTmpResources(Seq(
+ (s"$catalogV2.$namespace1.$table1", "table"))) {
+ val createTable = createTableSql(namespace1, table1)
+ doAs(admin, sql(createTable))
+ doAs(admin, sql(s"INSERT INTO $catalogV2.$namespace1.$table1 VALUES
(1, 'a'), (2, 'b')"))
+
+ val updateSql =
+ s"""
+ |UPDATE $catalogV2.$namespace1.$table1 SET name='c' where id=1
+ |""".stripMargin
+ val deleteSql =
+ s"""
+ |DELETE FROM $catalogV2.$namespace1.$table1 WHERE id=1
+ |""".stripMargin
+ interceptEndsWith[AccessControlException] {
+ doAs(table1OnlyUserForNs, sql(updateSql))
+ }(s"does not have [update] privilege on [$namespace1/$table1]")
+ interceptEndsWith[AccessControlException] {
+ doAs(someone, sql(updateSql))
+ }(s"does not have [update] privilege on [$namespace1/$table1]")
+ doAs(admin, sql(updateSql))
+
+ interceptEndsWith[AccessControlException] {
+ doAs(table1OnlyUserForNs, sql(deleteSql))
+ }(s"does not have [update] privilege on [$namespace1/$table1]")
+ interceptEndsWith[AccessControlException] {
+ doAs(someone, sql(deleteSql))
+ }(s"does not have [update] privilege on [$namespace1/$table1]")
+ doAs(admin, sql(deleteSql))
+ }
+ }
+ }
+
+ test("MERGE INTO") {
+ if (isSupportedVersion) {
+ val table2 = "table2"
+ withCleanTmpResources(Seq(
+ (s"$catalogV2.$namespace1.$table1", "table"),
+ (s"$catalogV2.$namespace1.$table2", "table"))) {
+ doAs(admin, sql(createTableSql(namespace1, table1)))
+ doAs(admin, sql(createTableSql(namespace1, table2)))
+
+ doAs(admin, sql(s"INSERT INTO $catalogV2.$namespace1.$table1 VALUES
(1, 'a'), (2, 'b')"))
+ doAs(admin, sql(s"INSERT INTO $catalogV2.$namespace1.$table1 VALUES
(2, 'c'), (3, 'd')"))
+
+ val mergeIntoSql =
+ s"""
+ |MERGE INTO $catalogV2.$namespace1.$table2
+ |USING $catalogV2.$namespace1.$table1
+ |ON
+ |$catalogV2.$namespace1.$table2.id =
$catalogV2.$namespace1.$table1.id
+ |WHEN MATCHED THEN
+ | UPDATE SET name = $catalogV2.$namespace1.$table1.name
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
+ interceptEndsWith[AccessControlException] {
+ doAs(table1OnlyUserForNs, sql(mergeIntoSql))
+ }(s"does not have [update] privilege on [$namespace1/$table2]")
+ interceptEndsWith[AccessControlException] {
+ doAs(someone, sql(mergeIntoSql))
+ }(s"does not have [select] privilege on [$namespace1/$table1]")
+ doAs(admin, sql(mergeIntoSql))
+ }
+ }
+ }
+
def createTableSql(namespace: String, table: String): String =
s"""
|CREATE TABLE IF NOT EXISTS $catalogV2.$namespace.$table