This is an automated email from the ASF dual-hosted git repository.

zouxxyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f59458102 [spark] Adding support for Iceberg compatibility options to 
be passed as table properties with dataframe APIs (#6803)
2f59458102 is described below

commit 2f594581028514dfaf55143be2de51e3da46df22
Author: junmuz <[email protected]>
AuthorDate: Mon Dec 22 04:08:23 2025 +0000

    [spark] Adding support for Iceberg compatibility options to be passed as 
table properties with dataframe APIs (#6803)
---
 .../org/apache/paimon/iceberg/IcebergOptions.java  | 23 +++++++++++++
 .../shim/PaimonCreateTableAsSelectStrategy.scala   | 13 ++++++--
 .../shim/PaimonCreateTableAsSelectStrategy.scala   | 13 ++++++--
 .../shim/PaimonCreateTableAsSelectStrategy.scala   | 13 ++++++--
 .../shim/PaimonCreateTableAsSelectStrategy.scala   | 13 ++++++--
 .../apache/paimon/spark/sql/PaimonOptionTest.scala | 39 ++++++++++++++++++++++
 6 files changed, 102 insertions(+), 12 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
index f371db825c..be4f6152ef 100644
--- a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
@@ -26,7 +26,10 @@ import org.apache.paimon.options.description.InlineElement;
 import org.apache.paimon.options.description.TextElement;
 import org.apache.paimon.utils.Preconditions;
 
+import java.lang.reflect.Field;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.apache.paimon.options.ConfigOptions.key;
@@ -255,4 +258,24 @@ public class IcebergOptions {
             return TextElement.text(description);
         }
     }
+
+    /**
+     * Returns all ConfigOption fields defined in this class. This method uses 
reflection to
+     * dynamically discover all ConfigOption fields, ensuring that new options 
are automatically
+     * included without code changes.
+     */
+    public static List<ConfigOption<?>> getOptions() {
+        final Field[] fields = IcebergOptions.class.getFields();
+        final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
+        for (Field field : fields) {
+            if (ConfigOption.class.isAssignableFrom(field.getType())) {
+                try {
+                    list.add((ConfigOption<?>) 
field.get(IcebergOptions.class));
+                } catch (IllegalAccessException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+        return list;
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
index 05370ddc90..cc6258e6eb 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
@@ -19,6 +19,7 @@
 package org.apache.spark.sql.execution.shim
 
 import org.apache.paimon.CoreOptions
+import org.apache.paimon.iceberg.IcebergOptions
 import org.apache.paimon.spark.catalog.FormatTableCatalog
 
 import org.apache.spark.sql.{SparkSession, Strategy}
@@ -39,10 +40,16 @@ case class PaimonCreateTableAsSelectStrategy(spark: 
SparkSession) extends Strate
           throw new RuntimeException("Paimon can't extend StagingTableCatalog 
for now.")
         case _ =>
           val coreOptionKeys = 
CoreOptions.getOptions.asScala.map(_.key()).toSeq
-          val (coreOptions, writeOptions) = options.partition {
-            case (key, _) => coreOptionKeys.contains(key)
+
+          // Include Iceberg compatibility options in table properties (fix 
for DataFrame writer options)
+          val icebergOptionKeys = 
IcebergOptions.getOptions.asScala.map(_.key()).toSeq
+
+          val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys
+
+          val (tableOptions, writeOptions) = options.partition {
+            case (key, _) => allTableOptionKeys.contains(key)
           }
-          val newProps = CatalogV2Util.withDefaultOwnership(props) ++ 
coreOptions
+          val newProps = CatalogV2Util.withDefaultOwnership(props) ++ 
tableOptions
 
           val isPartitionedFormatTable = {
             catalog match {
diff --git 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
index 4d4104d1ed..a09996f153 100644
--- 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
+++ 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
@@ -19,6 +19,7 @@
 package org.apache.spark.sql.execution.shim
 
 import org.apache.paimon.CoreOptions
+import org.apache.paimon.iceberg.IcebergOptions
 import org.apache.paimon.spark.SparkCatalog
 import org.apache.paimon.spark.catalog.FormatTableCatalog
 
@@ -51,10 +52,16 @@ case class PaimonCreateTableAsSelectStrategy(spark: 
SparkSession)
           throw new RuntimeException("Paimon can't extend StagingTableCatalog 
for now.")
         case _ =>
           val coreOptionKeys = 
CoreOptions.getOptions.asScala.map(_.key()).toSeq
-          val (coreOptions, writeOptions) = options.partition {
-            case (key, _) => coreOptionKeys.contains(key)
+
+          // Include Iceberg compatibility options in table properties (fix 
for DataFrame writer options)
+          val icebergOptionKeys = 
IcebergOptions.getOptions.asScala.map(_.key()).toSeq
+
+          val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys
+
+          val (tableOptions, writeOptions) = options.partition {
+            case (key, _) => allTableOptionKeys.contains(key)
           }
-          val newTableSpec = tableSpec.copy(properties = tableSpec.properties 
++ coreOptions)
+          val newTableSpec = tableSpec.copy(properties = tableSpec.properties 
++ tableOptions)
 
           val isPartitionedFormatTable = {
             catalog match {
diff --git 
a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
 
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
index d5424493eb..4a82f35188 100644
--- 
a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
+++ 
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
@@ -19,6 +19,7 @@
 package org.apache.spark.sql.execution.shim
 
 import org.apache.paimon.CoreOptions
+import org.apache.paimon.iceberg.IcebergOptions
 import org.apache.paimon.spark.SparkCatalog
 import org.apache.paimon.spark.catalog.FormatTableCatalog
 
@@ -53,10 +54,16 @@ case class PaimonCreateTableAsSelectStrategy(spark: 
SparkSession)
           throw new RuntimeException("Paimon can't extend StagingTableCatalog 
for now.")
         case _ =>
           val coreOptionKeys = 
CoreOptions.getOptions.asScala.map(_.key()).toSeq
-          val (coreOptions, writeOptions) = options.partition {
-            case (key, _) => coreOptionKeys.contains(key)
+
+          // Include Iceberg compatibility options in table properties (fix 
for DataFrame writer options)
+          val icebergOptionKeys = 
IcebergOptions.getOptions.asScala.map(_.key()).toSeq
+
+          val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys
+
+          val (tableOptions, writeOptions) = options.partition {
+            case (key, _) => allTableOptionKeys.contains(key)
           }
-          val newTableSpec = tableSpec.copy(properties = tableSpec.properties 
++ coreOptions)
+          val newTableSpec = tableSpec.copy(properties = tableSpec.properties 
++ tableOptions)
 
           val isPartitionedFormatTable = {
             catalog match {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
index fd6627c095..61e25b7c16 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
@@ -19,6 +19,7 @@
 package org.apache.spark.sql.execution.shim
 
 import org.apache.paimon.CoreOptions
+import org.apache.paimon.iceberg.IcebergOptions
 import org.apache.paimon.spark.SparkCatalog
 import org.apache.paimon.spark.catalog.FormatTableCatalog
 
@@ -51,10 +52,16 @@ case class PaimonCreateTableAsSelectStrategy(spark: 
SparkSession)
           throw new RuntimeException("Paimon can't extend StagingTableCatalog 
for now.")
         case _ =>
           val coreOptionKeys = 
CoreOptions.getOptions.asScala.map(_.key()).toSeq
-          val (coreOptions, writeOptions) = options.partition {
-            case (key, _) => coreOptionKeys.contains(key)
+
+          // Include Iceberg compatibility options in table properties (fix 
for DataFrame writer options)
+          val icebergOptionKeys = 
IcebergOptions.getOptions.asScala.map(_.key()).toSeq
+
+          val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys
+
+          val (tableOptions, writeOptions) = options.partition {
+            case (key, _) => allTableOptionKeys.contains(key)
           }
-          val newTableSpec = tableSpec.copy(properties = tableSpec.properties 
++ coreOptions)
+          val newTableSpec = tableSpec.copy(properties = tableSpec.properties 
++ tableOptions)
 
           val isPartitionedFormatTable = {
             catalog match {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala
index a51893941e..14351103f4 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala
@@ -55,6 +55,45 @@ class PaimonOptionTest extends PaimonSparkTestBase {
     }
   }
 
+  test("Paimon Option: create table with Iceberg compatibility options via 
DataFrame writer") {
+    Seq((1L, "x1"), (2L, "x2"))
+      .toDF("a", "b")
+      .write
+      .format("paimon")
+      .option("primary-key", "a")
+      .option("bucket", "-1")
+      .option("metadata.iceberg.database", "db_t")
+      .option("metadata.iceberg.table", "t_ib")
+      .option("metadata.iceberg.storage", "hadoop-catalog")
+      .option("metadata.iceberg.storage-location", "table-location")
+      .option("metadata.iceberg.manifest-legacy-version", "true")
+      .option("metadata.iceberg.manifest-compression", "snappy")
+      .option("metadata.iceberg.previous-versions-max", "5")
+      .option("metadata.iceberg.uri", "")
+      .saveAsTable("T_IB")
+
+    val table = loadTable("T_IB")
+
+    // Verify primary key is also stored (existing functionality still works)
+    Assertions.assertEquals(1, table.primaryKeys().size())
+    Assertions.assertEquals("a", table.primaryKeys().get(0))
+
+    // Verify bucket configuration
+    Assertions.assertEquals("-1", table.options().get("bucket"))
+
+    // Verify Iceberg compatibility options are stored permanently
+    Assertions.assertEquals("db_t", 
table.options().get("metadata.iceberg.database"))
+    Assertions.assertEquals("t_ib", 
table.options().get("metadata.iceberg.table"))
+    Assertions.assertEquals("hadoop-catalog", 
table.options().get("metadata.iceberg.storage"))
+    Assertions.assertEquals(
+      "table-location",
+      table.options().get("metadata.iceberg.storage-location"))
+    Assertions.assertEquals("true", 
table.options().get("metadata.iceberg.manifest-legacy-version"))
+    Assertions.assertEquals("snappy", 
table.options().get("metadata.iceberg.manifest-compression"))
+    Assertions.assertEquals("5", 
table.options().get("metadata.iceberg.previous-versions-max"))
+    Assertions.assertEquals("", table.options().get("metadata.iceberg.uri"))
+  }
+
   test("Paimon Option: query table with sql conf") {
     sql("CREATE TABLE T (id INT)")
     sql("INSERT INTO T VALUES 1")

Reply via email to