This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new abf947a7ac [KYUUBI #7065] [#7066] Iceberg Support add partition field 
check
abf947a7ac is described below

commit abf947a7ac175a8e9d45d6035619027129294ce9
Author: davidyuan <[email protected]>
AuthorDate: Tue May 20 14:53:52 2025 +0800

    [KYUUBI #7065] [#7066] Iceberg Support add partition field check
    
    #7066
    ### Why are the changes needed?
    
    Iceberg missing some check, this pr try to fix add partition field check
    
    ### How was this patch tested?
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Closes #7065 from davidyuan1223/icerberg_authz.
    
    Closes #7065
    
    be2684671 [davidyuan] update
    231ed3356 [davidyuan] sort spi file
    6d2a5bf20 [davidyuan] sort spi file
    bc21310cc [davidyuan] update
    52ca367f1 [davidyuan] update
    
    Authored-by: davidyuan <[email protected]>
    Signed-off-by: Kent Yao <[email protected]>
---
 ....kyuubi.plugin.spark.authz.serde.TableExtractor |   1 +
 .../src/main/resources/table_command_spec.json     |  96 +++++++++++++++++++
 .../plugin/spark/authz/serde/tableExtractors.scala |  16 ++++
 .../plugin/spark/authz/gen/IcebergCommands.scala   |  41 ++++++++
 .../IcebergCatalogRangerSparkExtensionSuite.scala  | 106 +++++++++++++++++++++
 5 files changed, 260 insertions(+)

diff --git 
a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
 
b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
index 7010766f24..663cd4962e 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
+++ 
b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
@@ -15,6 +15,7 @@
 # limitations under the License.
 #
 
+org.apache.kyuubi.plugin.spark.authz.serde.ArrayBufferTableExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.CatalogTableOptionTableExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.CatalogTableTableExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.DataSourceV2RelationTableExtractor
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
 
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
index ba018c6420..be18ffbf4e 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
+++ 
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
@@ -1809,6 +1809,22 @@
     "isInput" : false,
     "comment" : ""
   } ]
+}, {
+  "classname" : 
"org.apache.spark.sql.catalyst.plans.logical.AddPartitionField",
+  "tableDescs" : [ {
+    "fieldName" : "table",
+    "fieldExtractor" : "ArrayBufferTableExtractor",
+    "columnDesc" : null,
+    "actionTypeDesc" : null,
+    "tableTypeDesc" : null,
+    "catalogDesc" : null,
+    "isInput" : false,
+    "setCurrentDatabaseIfMissing" : false,
+    "comment" : "Iceberg"
+  } ],
+  "opType" : "ALTERTABLE_PROPERTIES",
+  "queryDescs" : [ ],
+  "uriDescs" : [ ]
 }, {
   "classname" : "org.apache.spark.sql.catalyst.plans.logical.Call",
   "tableDescs" : [ {
@@ -1846,6 +1862,38 @@
   "opType" : "QUERY",
   "queryDescs" : [ ],
   "uriDescs" : [ ]
+}, {
+  "classname" : 
"org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields",
+  "tableDescs" : [ {
+    "fieldName" : "table",
+    "fieldExtractor" : "ArrayBufferTableExtractor",
+    "columnDesc" : null,
+    "actionTypeDesc" : null,
+    "tableTypeDesc" : null,
+    "catalogDesc" : null,
+    "isInput" : false,
+    "setCurrentDatabaseIfMissing" : false,
+    "comment" : "Iceberg"
+  } ],
+  "opType" : "ALTERTABLE_PROPERTIES",
+  "queryDescs" : [ ],
+  "uriDescs" : [ ]
+}, {
+  "classname" : 
"org.apache.spark.sql.catalyst.plans.logical.DropPartitionField",
+  "tableDescs" : [ {
+    "fieldName" : "table",
+    "fieldExtractor" : "ArrayBufferTableExtractor",
+    "columnDesc" : null,
+    "actionTypeDesc" : null,
+    "tableTypeDesc" : null,
+    "catalogDesc" : null,
+    "isInput" : false,
+    "setCurrentDatabaseIfMissing" : false,
+    "comment" : "Iceberg"
+  } ],
+  "opType" : "ALTERTABLE_PROPERTIES",
+  "queryDescs" : [ ],
+  "uriDescs" : [ ]
 }, {
   "classname" : 
"org.apache.spark.sql.catalyst.plans.logical.MergeIntoIcebergTable",
   "tableDescs" : [ {
@@ -1871,6 +1919,54 @@
     "comment" : ""
   } ],
   "uriDescs" : [ ]
+}, {
+  "classname" : 
"org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField",
+  "tableDescs" : [ {
+    "fieldName" : "table",
+    "fieldExtractor" : "ArrayBufferTableExtractor",
+    "columnDesc" : null,
+    "actionTypeDesc" : null,
+    "tableTypeDesc" : null,
+    "catalogDesc" : null,
+    "isInput" : false,
+    "setCurrentDatabaseIfMissing" : false,
+    "comment" : "Iceberg"
+  } ],
+  "opType" : "ALTERTABLE_PROPERTIES",
+  "queryDescs" : [ ],
+  "uriDescs" : [ ]
+}, {
+  "classname" : 
"org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields",
+  "tableDescs" : [ {
+    "fieldName" : "table",
+    "fieldExtractor" : "ArrayBufferTableExtractor",
+    "columnDesc" : null,
+    "actionTypeDesc" : null,
+    "tableTypeDesc" : null,
+    "catalogDesc" : null,
+    "isInput" : false,
+    "setCurrentDatabaseIfMissing" : false,
+    "comment" : "Iceberg"
+  } ],
+  "opType" : "ALTERTABLE_PROPERTIES",
+  "queryDescs" : [ ],
+  "uriDescs" : [ ]
+}, {
+  "classname" : 
"org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering",
+  "tableDescs" : [ {
+    "fieldName" : "table",
+    "fieldExtractor" : "ArrayBufferTableExtractor",
+    "columnDesc" : null,
+    "actionTypeDesc" : null,
+    "tableTypeDesc" : null,
+    "catalogDesc" : null,
+    "isInput" : false,
+    "setCurrentDatabaseIfMissing" : false,
+    "comment" : "Iceberg"
+  } ],
+  "opType" : "ALTERTABLE_PROPERTIES",
+  "queryDescs" : [ ],
+  "uriDescs" : [ ]
 }, {
   "classname" : 
"org.apache.spark.sql.catalyst.plans.logical.UnresolvedMergeIntoIcebergTable",
   "tableDescs" : [ {
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
 
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
index 03506036d0..c036660691 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
@@ -184,6 +184,22 @@ class ExpressionSeqTableExtractor extends TableExtractor {
   }
 }
 
+/**
+ * org.apache.spark.sql.catalyst.plans.logical.AddPartitionField
+ */
+class ArrayBufferTableExtractor extends TableExtractor {
+  override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
+    // Iceberg will transform table to ArrayBuffer[String]
+    val maybeTable = v1.asInstanceOf[Seq[String]] match {
+      case Seq(tblName) => Table(None, None, tblName, None)
+      case Seq(dbName, tblName) => Table(None, Some(dbName), tblName, None)
+      case Seq(catalogName, dbName, tblName) =>
+        Table(Some(catalogName), Some(dbName), tblName, None)
+    }
+    Option(maybeTable)
+  }
+}
+
 /**
  * org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
  */
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/IcebergCommands.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/IcebergCommands.scala
index 33e94d718c..7594ae8069 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/IcebergCommands.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/IcebergCommands.scala
@@ -58,11 +58,52 @@ object IcebergCommands extends 
CommandSpecs[TableCommandSpec] {
     TableCommandSpec(cmd, Seq(td), opType = 
OperationType.ALTERTABLE_PROPERTIES)
   }
 
+  val AddPartitionFiled = {
+    val cmd = "org.apache.spark.sql.catalyst.plans.logical.AddPartitionField"
+    val tableDesc =
+      TableDesc(
+        "table",
+        classOf[ArrayBufferTableExtractor],
+        comment = "Iceberg")
+    TableCommandSpec(cmd, Seq(tableDesc), opType = 
OperationType.ALTERTABLE_PROPERTIES)
+  }
+
+  val DropPartitionField = {
+    val cmd = "org.apache.spark.sql.catalyst.plans.logical.DropPartitionField"
+    AddPartitionFiled.copy(cmd)
+  }
+
+  val ReplacePartitionField = {
+    val cmd = 
"org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField"
+    AddPartitionFiled.copy(cmd)
+  }
+
+  val WriteDistributionAndOrdering = {
+    val cmd = 
"org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering"
+    AddPartitionFiled.copy(cmd)
+  }
+
+  val SetIdentifierFields = {
+    val cmd = "org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields"
+    AddPartitionFiled.copy(cmd)
+  }
+
+  val DropIdentifierFields = {
+    val cmd = 
"org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields"
+    AddPartitionFiled.copy(cmd)
+  }
+
   override def specs: Seq[TableCommandSpec] = Seq(
     CallProcedure,
     DeleteFromIcebergTable,
     UpdateIcebergTable,
     MergeIntoIcebergTable,
+    AddPartitionFiled,
+    DropPartitionField,
+    ReplacePartitionField,
+    WriteDistributionAndOrdering,
+    SetIdentifierFields,
+    DropIdentifierFields,
     MergeIntoIcebergTable.copy(classname =
       
"org.apache.spark.sql.catalyst.plans.logical.UnresolvedMergeIntoIcebergTable"))
 }
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
index cf798cdfae..9e780d4e3e 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
@@ -378,4 +378,110 @@ class IcebergCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite
       doAs(admin, sql(callSetCurrentSnapshot))
     }
   }
+
+  test("ALTER TABLE ADD PARTITION FIELD for Iceberg") {
+    val table = s"$catalogV2.$namespace1.partitioned_table"
+    withCleanTmpResources(Seq((table, "table"))) {
+      doAs(
+        admin,
+        sql(
+          s"CREATE TABLE $table (id int, name string, city string) USING 
iceberg PARTITIONED BY (city)"))
+      val addPartitionSql = s"ALTER TABLE $table ADD PARTITION FIELD id"
+      interceptEndsWith[AccessControlException] {
+        doAs(someone, sql(addPartitionSql))
+      }(s"does not have [alter] privilege on [$namespace1/partitioned_table]")
+      doAs(admin, sql(addPartitionSql))
+    }
+  }
+
+  test("ALTER TABLE DROP PARTITION FIELD for Iceberg") {
+    val table = s"$catalogV2.$namespace1.partitioned_table"
+    withCleanTmpResources(Seq((table, "table"))) {
+      doAs(
+        admin,
+        sql(
+          s"CREATE TABLE $table (id int, name string, city string) USING 
iceberg PARTITIONED BY (id, city)"))
+      val addPartitionSql = s"ALTER TABLE $table DROP PARTITION FIELD id"
+      interceptEndsWith[AccessControlException] {
+        doAs(someone, sql(addPartitionSql))
+      }(s"does not have [alter] privilege on [$namespace1/partitioned_table]")
+      doAs(admin, sql(addPartitionSql))
+    }
+  }
+
+  test("ALTER TABLE REPLACE PARTITION FIELD for Iceberg") {
+    val table = s"$catalogV2.$namespace1.partitioned_table"
+    withCleanTmpResources(Seq((table, "table"))) {
+      doAs(
+        admin,
+        sql(
+          s"CREATE TABLE $table (id int, name string, city string) USING 
iceberg PARTITIONED BY (city)"))
+      val addPartitionSql = s"ALTER TABLE $table REPLACE PARTITION FIELD city 
WITH id"
+      interceptEndsWith[AccessControlException] {
+        doAs(someone, sql(addPartitionSql))
+      }(s"does not have [alter] privilege on [$namespace1/partitioned_table]")
+      doAs(admin, sql(addPartitionSql))
+    }
+  }
+
+  test("ALTER TABLE WRITE ORDER BY for Iceberg") {
+    val table = s"$catalogV2.$namespace1.partitioned_table"
+    withCleanTmpResources(Seq((table, "table"))) {
+      doAs(
+        admin,
+        sql(
+          s"CREATE TABLE $table (id int, name string, city string) USING 
iceberg"))
+      val writeOrderBySql = s"ALTER TABLE $table WRITE ORDERED BY id"
+      interceptEndsWith[AccessControlException] {
+        doAs(someone, sql(writeOrderBySql))
+      }(s"does not have [alter] privilege on [$namespace1/partitioned_table]")
+      doAs(admin, sql(writeOrderBySql))
+    }
+  }
+
+  test("ALTER TABLE WRITE DISTRIBUTED BY PARTITION for Iceberg") {
+    val table = s"$catalogV2.$namespace1.partitioned_table"
+    withCleanTmpResources(Seq((table, "table"))) {
+      doAs(
+        admin,
+        sql(
+          s"CREATE TABLE $table (id int, name string, city string) USING 
iceberg PARTITIONED BY (city)"))
+      val writeDistributedSql = s"ALTER TABLE $table WRITE DISTRIBUTED BY 
PARTITION"
+      interceptEndsWith[AccessControlException] {
+        doAs(someone, sql(writeDistributedSql))
+      }(s"does not have [alter] privilege on [$namespace1/partitioned_table]")
+      doAs(admin, sql(writeDistributedSql))
+    }
+  }
+
+  test("ALTER TABLE SET IDENTIFIER FIELD for Iceberg") {
+    val table = s"$catalogV2.$namespace1.partitioned_table"
+    withCleanTmpResources(Seq((table, "table"))) {
+      doAs(
+        admin,
+        sql(
+          s"CREATE TABLE $table (id int NOT NULL, name string, city string) 
USING iceberg"))
+      val setIdentifierSql = s"ALTER TABLE $table SET IDENTIFIER FIELDS id"
+      interceptEndsWith[AccessControlException] {
+        doAs(someone, sql(setIdentifierSql))
+      }(s"does not have [alter] privilege on [$namespace1/partitioned_table]")
+      doAs(admin, sql(setIdentifierSql))
+    }
+  }
+
+  test("ALTER TABLE DROP IDENTIFIER FIELD for Iceberg") {
+    val table = s"$catalogV2.$namespace1.partitioned_table"
+    withCleanTmpResources(Seq((table, "table"))) {
+      doAs(
+        admin,
+        sql(
+          s"CREATE TABLE $table (id int NOT NULL, name string, city string) 
USING iceberg"))
+      doAs(admin, sql(s"ALTER TABLE $table SET IDENTIFIER FIELDS id"))
+      val dropIdentifierSql = s"ALTER TABLE $table DROP IDENTIFIER FIELDS id"
+      interceptEndsWith[AccessControlException] {
+        doAs(someone, sql(dropIdentifierSql))
+      }(s"does not have [alter] privilege on [$namespace1/partitioned_table]")
+      doAs(admin, sql(dropIdentifierSql))
+    }
+  }
 }

Reply via email to