This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new abf947a7ac [KYUUBI #7065] [#7066] Iceberg Support add partition field
check
abf947a7ac is described below
commit abf947a7ac175a8e9d45d6035619027129294ce9
Author: davidyuan <[email protected]>
AuthorDate: Tue May 20 14:53:52 2025 +0800
[KYUUBI #7065] [#7066] Iceberg Support add partition field check
#7066
### Why are the changes needed?
Iceberg missing some check, this pr try to fix add partition field check
### How was this patch tested?
### Was this patch authored or co-authored using generative AI tooling?
Closes #7065 from davidyuan1223/icerberg_authz.
Closes #7065
be2684671 [davidyuan] update
231ed3356 [davidyuan] sort spi file
6d2a5bf20 [davidyuan] sort spi file
bc21310cc [davidyuan] update
52ca367f1 [davidyuan] update
Authored-by: davidyuan <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
....kyuubi.plugin.spark.authz.serde.TableExtractor | 1 +
.../src/main/resources/table_command_spec.json | 96 +++++++++++++++++++
.../plugin/spark/authz/serde/tableExtractors.scala | 16 ++++
.../plugin/spark/authz/gen/IcebergCommands.scala | 41 ++++++++
.../IcebergCatalogRangerSparkExtensionSuite.scala | 106 +++++++++++++++++++++
5 files changed, 260 insertions(+)
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
index 7010766f24..663cd4962e 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
+++
b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
@@ -15,6 +15,7 @@
# limitations under the License.
#
+org.apache.kyuubi.plugin.spark.authz.serde.ArrayBufferTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.CatalogTableOptionTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.CatalogTableTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.DataSourceV2RelationTableExtractor
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
index ba018c6420..be18ffbf4e 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
+++
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
@@ -1809,6 +1809,22 @@
"isInput" : false,
"comment" : ""
} ]
+}, {
+ "classname" :
"org.apache.spark.sql.catalyst.plans.logical.AddPartitionField",
+ "tableDescs" : [ {
+ "fieldName" : "table",
+ "fieldExtractor" : "ArrayBufferTableExtractor",
+ "columnDesc" : null,
+ "actionTypeDesc" : null,
+ "tableTypeDesc" : null,
+ "catalogDesc" : null,
+ "isInput" : false,
+ "setCurrentDatabaseIfMissing" : false,
+ "comment" : "Iceberg"
+ } ],
+ "opType" : "ALTERTABLE_PROPERTIES",
+ "queryDescs" : [ ],
+ "uriDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.Call",
"tableDescs" : [ {
@@ -1846,6 +1862,38 @@
"opType" : "QUERY",
"queryDescs" : [ ],
"uriDescs" : [ ]
+}, {
+ "classname" :
"org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields",
+ "tableDescs" : [ {
+ "fieldName" : "table",
+ "fieldExtractor" : "ArrayBufferTableExtractor",
+ "columnDesc" : null,
+ "actionTypeDesc" : null,
+ "tableTypeDesc" : null,
+ "catalogDesc" : null,
+ "isInput" : false,
+ "setCurrentDatabaseIfMissing" : false,
+ "comment" : "Iceberg"
+ } ],
+ "opType" : "ALTERTABLE_PROPERTIES",
+ "queryDescs" : [ ],
+ "uriDescs" : [ ]
+}, {
+ "classname" :
"org.apache.spark.sql.catalyst.plans.logical.DropPartitionField",
+ "tableDescs" : [ {
+ "fieldName" : "table",
+ "fieldExtractor" : "ArrayBufferTableExtractor",
+ "columnDesc" : null,
+ "actionTypeDesc" : null,
+ "tableTypeDesc" : null,
+ "catalogDesc" : null,
+ "isInput" : false,
+ "setCurrentDatabaseIfMissing" : false,
+ "comment" : "Iceberg"
+ } ],
+ "opType" : "ALTERTABLE_PROPERTIES",
+ "queryDescs" : [ ],
+ "uriDescs" : [ ]
}, {
"classname" :
"org.apache.spark.sql.catalyst.plans.logical.MergeIntoIcebergTable",
"tableDescs" : [ {
@@ -1871,6 +1919,54 @@
"comment" : ""
} ],
"uriDescs" : [ ]
+}, {
+ "classname" :
"org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField",
+ "tableDescs" : [ {
+ "fieldName" : "table",
+ "fieldExtractor" : "ArrayBufferTableExtractor",
+ "columnDesc" : null,
+ "actionTypeDesc" : null,
+ "tableTypeDesc" : null,
+ "catalogDesc" : null,
+ "isInput" : false,
+ "setCurrentDatabaseIfMissing" : false,
+ "comment" : "Iceberg"
+ } ],
+ "opType" : "ALTERTABLE_PROPERTIES",
+ "queryDescs" : [ ],
+ "uriDescs" : [ ]
+}, {
+ "classname" :
"org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields",
+ "tableDescs" : [ {
+ "fieldName" : "table",
+ "fieldExtractor" : "ArrayBufferTableExtractor",
+ "columnDesc" : null,
+ "actionTypeDesc" : null,
+ "tableTypeDesc" : null,
+ "catalogDesc" : null,
+ "isInput" : false,
+ "setCurrentDatabaseIfMissing" : false,
+ "comment" : "Iceberg"
+ } ],
+ "opType" : "ALTERTABLE_PROPERTIES",
+ "queryDescs" : [ ],
+ "uriDescs" : [ ]
+}, {
+ "classname" :
"org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering",
+ "tableDescs" : [ {
+ "fieldName" : "table",
+ "fieldExtractor" : "ArrayBufferTableExtractor",
+ "columnDesc" : null,
+ "actionTypeDesc" : null,
+ "tableTypeDesc" : null,
+ "catalogDesc" : null,
+ "isInput" : false,
+ "setCurrentDatabaseIfMissing" : false,
+ "comment" : "Iceberg"
+ } ],
+ "opType" : "ALTERTABLE_PROPERTIES",
+ "queryDescs" : [ ],
+ "uriDescs" : [ ]
}, {
"classname" :
"org.apache.spark.sql.catalyst.plans.logical.UnresolvedMergeIntoIcebergTable",
"tableDescs" : [ {
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
index 03506036d0..c036660691 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
@@ -184,6 +184,22 @@ class ExpressionSeqTableExtractor extends TableExtractor {
}
}
+/**
+ * org.apache.spark.sql.catalyst.plans.logical.AddPartitionField
+ */
+class ArrayBufferTableExtractor extends TableExtractor {
+ override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
+ // Iceberg will transform table to ArrayBuffer[String]
+ val maybeTable = v1.asInstanceOf[Seq[String]] match {
+ case Seq(tblName) => Table(None, None, tblName, None)
+ case Seq(dbName, tblName) => Table(None, Some(dbName), tblName, None)
+ case Seq(catalogName, dbName, tblName) =>
+ Table(Some(catalogName), Some(dbName), tblName, None)
+ }
+ Option(maybeTable)
+ }
+}
+
/**
* org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
*/
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/IcebergCommands.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/IcebergCommands.scala
index 33e94d718c..7594ae8069 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/IcebergCommands.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/IcebergCommands.scala
@@ -58,11 +58,52 @@ object IcebergCommands extends
CommandSpecs[TableCommandSpec] {
TableCommandSpec(cmd, Seq(td), opType =
OperationType.ALTERTABLE_PROPERTIES)
}
+ val AddPartitionFiled = {
+ val cmd = "org.apache.spark.sql.catalyst.plans.logical.AddPartitionField"
+ val tableDesc =
+ TableDesc(
+ "table",
+ classOf[ArrayBufferTableExtractor],
+ comment = "Iceberg")
+ TableCommandSpec(cmd, Seq(tableDesc), opType =
OperationType.ALTERTABLE_PROPERTIES)
+ }
+
+ val DropPartitionField = {
+ val cmd = "org.apache.spark.sql.catalyst.plans.logical.DropPartitionField"
+ AddPartitionFiled.copy(cmd)
+ }
+
+ val ReplacePartitionField = {
+ val cmd =
"org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField"
+ AddPartitionFiled.copy(cmd)
+ }
+
+ val WriteDistributionAndOrdering = {
+ val cmd =
"org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering"
+ AddPartitionFiled.copy(cmd)
+ }
+
+ val SetIdentifierFields = {
+ val cmd = "org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields"
+ AddPartitionFiled.copy(cmd)
+ }
+
+ val DropIdentifierFields = {
+ val cmd =
"org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields"
+ AddPartitionFiled.copy(cmd)
+ }
+
override def specs: Seq[TableCommandSpec] = Seq(
CallProcedure,
DeleteFromIcebergTable,
UpdateIcebergTable,
MergeIntoIcebergTable,
+ AddPartitionFiled,
+ DropPartitionField,
+ ReplacePartitionField,
+ WriteDistributionAndOrdering,
+ SetIdentifierFields,
+ DropIdentifierFields,
MergeIntoIcebergTable.copy(classname =
"org.apache.spark.sql.catalyst.plans.logical.UnresolvedMergeIntoIcebergTable"))
}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
index cf798cdfae..9e780d4e3e 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
@@ -378,4 +378,110 @@ class IcebergCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite
doAs(admin, sql(callSetCurrentSnapshot))
}
}
+
+ test("ALTER TABLE ADD PARTITION FIELD for Iceberg") {
+ val table = s"$catalogV2.$namespace1.partitioned_table"
+ withCleanTmpResources(Seq((table, "table"))) {
+ doAs(
+ admin,
+ sql(
+ s"CREATE TABLE $table (id int, name string, city string) USING
iceberg PARTITIONED BY (city)"))
+ val addPartitionSql = s"ALTER TABLE $table ADD PARTITION FIELD id"
+ interceptEndsWith[AccessControlException] {
+ doAs(someone, sql(addPartitionSql))
+ }(s"does not have [alter] privilege on [$namespace1/partitioned_table]")
+ doAs(admin, sql(addPartitionSql))
+ }
+ }
+
+ test("ALTER TABLE DROP PARTITION FIELD for Iceberg") {
+ val table = s"$catalogV2.$namespace1.partitioned_table"
+ withCleanTmpResources(Seq((table, "table"))) {
+ doAs(
+ admin,
+ sql(
+ s"CREATE TABLE $table (id int, name string, city string) USING
iceberg PARTITIONED BY (id, city)"))
+ val addPartitionSql = s"ALTER TABLE $table DROP PARTITION FIELD id"
+ interceptEndsWith[AccessControlException] {
+ doAs(someone, sql(addPartitionSql))
+ }(s"does not have [alter] privilege on [$namespace1/partitioned_table]")
+ doAs(admin, sql(addPartitionSql))
+ }
+ }
+
+ test("ALTER TABLE REPLACE PARTITION FIELD for Iceberg") {
+ val table = s"$catalogV2.$namespace1.partitioned_table"
+ withCleanTmpResources(Seq((table, "table"))) {
+ doAs(
+ admin,
+ sql(
+ s"CREATE TABLE $table (id int, name string, city string) USING
iceberg PARTITIONED BY (city)"))
+ val addPartitionSql = s"ALTER TABLE $table REPLACE PARTITION FIELD city
WITH id"
+ interceptEndsWith[AccessControlException] {
+ doAs(someone, sql(addPartitionSql))
+ }(s"does not have [alter] privilege on [$namespace1/partitioned_table]")
+ doAs(admin, sql(addPartitionSql))
+ }
+ }
+
+ test("ALTER TABLE WRITE ORDER BY for Iceberg") {
+ val table = s"$catalogV2.$namespace1.partitioned_table"
+ withCleanTmpResources(Seq((table, "table"))) {
+ doAs(
+ admin,
+ sql(
+ s"CREATE TABLE $table (id int, name string, city string) USING
iceberg"))
+ val writeOrderBySql = s"ALTER TABLE $table WRITE ORDERED BY id"
+ interceptEndsWith[AccessControlException] {
+ doAs(someone, sql(writeOrderBySql))
+ }(s"does not have [alter] privilege on [$namespace1/partitioned_table]")
+ doAs(admin, sql(writeOrderBySql))
+ }
+ }
+
+ test("ALTER TABLE WRITE DISTRIBUTED BY PARTITION for Iceberg") {
+ val table = s"$catalogV2.$namespace1.partitioned_table"
+ withCleanTmpResources(Seq((table, "table"))) {
+ doAs(
+ admin,
+ sql(
+ s"CREATE TABLE $table (id int, name string, city string) USING
iceberg PARTITIONED BY (city)"))
+ val writeDistributedSql = s"ALTER TABLE $table WRITE DISTRIBUTED BY
PARTITION"
+ interceptEndsWith[AccessControlException] {
+ doAs(someone, sql(writeDistributedSql))
+ }(s"does not have [alter] privilege on [$namespace1/partitioned_table]")
+ doAs(admin, sql(writeDistributedSql))
+ }
+ }
+
+ test("ALTER TABLE SET IDENTIFIER FIELD for Iceberg") {
+ val table = s"$catalogV2.$namespace1.partitioned_table"
+ withCleanTmpResources(Seq((table, "table"))) {
+ doAs(
+ admin,
+ sql(
+ s"CREATE TABLE $table (id int NOT NULL, name string, city string)
USING iceberg"))
+ val setIdentifierSql = s"ALTER TABLE $table SET IDENTIFIER FIELDS id"
+ interceptEndsWith[AccessControlException] {
+ doAs(someone, sql(setIdentifierSql))
+ }(s"does not have [alter] privilege on [$namespace1/partitioned_table]")
+ doAs(admin, sql(setIdentifierSql))
+ }
+ }
+
+ test("ALTER TABLE DROP IDENTIFIER FIELD for Iceberg") {
+ val table = s"$catalogV2.$namespace1.partitioned_table"
+ withCleanTmpResources(Seq((table, "table"))) {
+ doAs(
+ admin,
+ sql(
+ s"CREATE TABLE $table (id int NOT NULL, name string, city string)
USING iceberg"))
+ doAs(admin, sql(s"ALTER TABLE $table SET IDENTIFIER FIELDS id"))
+ val dropIdentifierSql = s"ALTER TABLE $table DROP IDENTIFIER FIELDS id"
+ interceptEndsWith[AccessControlException] {
+ doAs(someone, sql(dropIdentifierSql))
+ }(s"does not have [alter] privilege on [$namespace1/partitioned_table]")
+ doAs(admin, sql(dropIdentifierSql))
+ }
+ }
}