This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 44ae5029f [spark] Support table options via SQL conf for Spark Engine 
and unify user experience with Flink engine (#4393)
44ae5029f is described below

commit 44ae5029f0bb1253082d2d0903a982fb68174be9
Author: xiangyu0xf <[email protected]>
AuthorDate: Tue Nov 5 15:46:02 2024 +0800

    [spark] Support table options via SQL conf for Spark Engine and unify user 
experience with Flink engine (#4393)
---
 docs/content/flink/quick-start.md                  |  13 ++-
 docs/content/spark/auxiliary.md                    |  22 +++-
 .../org/apache/paimon/options/OptionsUtils.java    |  30 +++++
 .../paimon/flink/AbstractFlinkTableFactory.java    |  35 +++---
 .../apache/paimon/flink/FlinkConnectorOptions.java |   2 +-
 .../flink/AbstractFlinkTableFactoryTest.java       |  25 +++-
 .../java/org/apache/paimon/spark/SparkCatalog.java |   4 +-
 .../org/apache/paimon/spark/SparkSource.scala      |  16 ++-
 .../org/apache/paimon/spark/util/OptionUtils.scala |  53 ++++++---
 .../apache/paimon/spark/sql/PaimonOptionTest.scala | 126 +++++++++++++++++++++
 10 files changed, 279 insertions(+), 47 deletions(-)

diff --git a/docs/content/flink/quick-start.md 
b/docs/content/flink/quick-start.md
index 62559065e..e50acfe48 100644
--- a/docs/content/flink/quick-start.md
+++ b/docs/content/flink/quick-start.md
@@ -269,11 +269,16 @@ SELECT * FROM ....;
 ## Setting dynamic options
 
 When interacting with the Paimon table, table options can be tuned without 
changing the options in the catalog. Paimon will extract job-level dynamic 
options and take effect in the current session.
-The dynamic option's key format is 
`paimon.${catalogName}.${dbName}.${tableName}.${config_key}`. The 
catalogName/dbName/tableName can be `*`, which means matching all the specific 
parts.
+The dynamic table option's key format is 
`paimon.${catalogName}.${dbName}.${tableName}.${config_key}`. The 
catalogName/dbName/tableName can be `*`, which means matching all the specific 
parts. 
+The dynamic global option's key format is `${config_key}`. Global options will 
take effect for all the tables. Table options will override global options if 
there are conflicts.
 
 For example:
 
 ```sql
+-- set scan.timestamp-millis=1697018249001 for all tables
+SET 'scan.timestamp-millis' = '1697018249001';
+SELECT * FROM T;
+
 -- set scan.timestamp-millis=1697018249000 for the table mycatalog.default.T
 SET 'paimon.mycatalog.default.T.scan.timestamp-millis' = '1697018249000';
 SELECT * FROM T;
@@ -281,4 +286,10 @@ SELECT * FROM T;
 -- set scan.timestamp-millis=1697018249000 for the table default.T in any 
catalog
 SET 'paimon.*.default.T.scan.timestamp-millis' = '1697018249000';
 SELECT * FROM T;
+
+-- set scan.timestamp-millis=1697018249000 for the table mycatalog.default.T1
+-- set scan.timestamp-millis=1697018249001 for others tables
+SET 'paimon.mycatalog.default.T1.scan.timestamp-millis' = '1697018249000';
+SET 'scan.timestamp-millis' = '1697018249001';
+SELECT * FROM T1 JOIN T2 ON xxxx;
 ```
diff --git a/docs/content/spark/auxiliary.md b/docs/content/spark/auxiliary.md
index b822d1b89..6330ca27c 100644
--- a/docs/content/spark/auxiliary.md
+++ b/docs/content/spark/auxiliary.md
@@ -29,17 +29,35 @@ under the License.
 ## Set / Reset
 The SET command sets a property, returns the value of an existing property or 
returns all SQLConf properties with value and meaning.
 The RESET command resets runtime configurations specific to the current 
session which were set via the SET command to their default values.
-To set paimon configs specifically, you need add the `spark.paimon.` prefix.
+To set dynamic options globally, you need add the `spark.paimon.` prefix. You 
can also set dynamic table options at this format: 
+`spark.paimon.${catalogName}.${dbName}.${tableName}.${config_key}`. The 
catalogName/dbName/tableName can be `*`, which means matching all 
+the specific parts. Dynamic table options will override global options if 
there are conflicts.
 
 ```sql
 -- set spark conf
 SET spark.sql.sources.partitionOverwriteMode=dynamic;
- 
+
 -- set paimon conf
 SET spark.paimon.file.block-size=512M;
 
 -- reset conf
 RESET spark.paimon.file.block-size;
+
+-- set catalog
+USE paimon;
+
+-- set scan.snapshot-id=1 for the table default.T in any catalogs
+SET spark.paimon.*.default.T.scan.snapshot-id=1;
+SELECT * FROM default.T;
+
+-- set scan.snapshot-id=1 for the table T in any databases and catalogs
+SET spark.paimon.*.*.T.scan.snapshot-id=1;
+SELECT * FROM default.T;
+
+-- set scan.snapshot-id=2 for the table default.T1 in any catalogs and 
scan.snapshot-id=1 on other tables
+SET spark.paimon.scan.snapshot-id=1;
+SET spark.paimon.*.default.T1.scan.snapshot-id=2;
+SELECT * FROM default.T1 JOIN default.T2 ON xxxx;
 ```
 
 ## Describe table
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/options/OptionsUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/options/OptionsUtils.java
index 47eb45007..a625454f3 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/OptionsUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/OptionsUtils.java
@@ -27,6 +27,8 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.function.Predicate;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import static 
org.apache.paimon.options.StructuredOptionsSplitter.escapeWithSingleQuote;
@@ -302,6 +304,34 @@ public class OptionsUtils {
         return properties;
     }
 
+    public static Map<String, String> convertToDynamicTableProperties(
+            Map<String, String> confData,
+            String globalOptionKeyPrefix,
+            Pattern tableOptionKeyPattern,
+            int keyGroup) {
+        Map<String, String> globalOptions = new HashMap<>();
+        Map<String, String> tableOptions = new HashMap<>();
+
+        confData.keySet().stream()
+                .filter(k -> k.startsWith(globalOptionKeyPrefix))
+                .forEach(
+                        k -> {
+                            Matcher matcher = tableOptionKeyPattern.matcher(k);
+                            if (matcher.find()) {
+                                tableOptions.put(
+                                        matcher.group(keyGroup), 
convertToString(confData.get(k)));
+                            } else {
+                                globalOptions.put(
+                                        
k.substring(globalOptionKeyPrefix.length()),
+                                        convertToString(confData.get(k)));
+                            }
+                        });
+
+        // table options should override global options for the same key
+        globalOptions.putAll(tableOptions);
+        return globalOptions;
+    }
+
     static boolean containsPrefixMap(Map<String, String> confData, String key) 
{
         return confData.keySet().stream().anyMatch(candidate -> 
filterPrefixMapKey(key, candidate));
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
index 27eef48b2..9f90a2cd0 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
@@ -35,6 +35,7 @@ import org.apache.paimon.lineage.LineageMetaFactory;
 import org.apache.paimon.lineage.TableLineageEntity;
 import org.apache.paimon.lineage.TableLineageEntityImpl;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.options.OptionsUtils;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.FileStoreTable;
@@ -71,7 +72,6 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.function.BiConsumer;
-import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
@@ -239,7 +239,7 @@ public abstract class AbstractFlinkTableFactory
         CatalogTable origin = context.getCatalogTable().getOrigin();
         Table table;
 
-        Map<String, String> dynamicOptions = 
getDynamicTableConfigOptions(context);
+        Map<String, String> dynamicOptions = getDynamicConfigOptions(context);
         dynamicOptions.forEach(
                 (key, newValue) -> {
                     String oldValue = origin.getOptions().get(key);
@@ -249,6 +249,7 @@ public abstract class AbstractFlinkTableFactory
                 });
         Map<String, String> newOptions = new HashMap<>();
         newOptions.putAll(origin.getOptions());
+        // dynamic options should override origin options
         newOptions.putAll(dynamicOptions);
 
         FileStoreTable fileStoreTable;
@@ -324,16 +325,19 @@ public abstract class AbstractFlinkTableFactory
     /**
      * The dynamic option's format is:
      *
-     * <p>{@link
-     * 
FlinkConnectorOptions#TABLE_DYNAMIC_OPTION_PREFIX}.${catalog}.${database}.${tableName}.key
 =
-     * value. These job level configs will be extracted and injected into the 
target table option.
+     * <p>Global Options: key = value .
+     *
+     * <p>Table Options: {@link
+     * 
FlinkConnectorOptions#TABLE_DYNAMIC_OPTION_PREFIX}${catalog}.${database}.${tableName}.key
 =
+     * value.
+     *
+     * <p>These job level options will be extracted and injected into the 
target table option. Table
+     * options will override global options if there are conflicts.
      *
      * @param context The table factory context.
      * @return The dynamic options of this target table.
      */
-    static Map<String, String> 
getDynamicTableConfigOptions(DynamicTableFactory.Context context) {
-
-        Map<String, String> optionsFromTableConfig = new HashMap<>();
+    static Map<String, String> 
getDynamicConfigOptions(DynamicTableFactory.Context context) {
 
         ReadableConfig config = context.getConfiguration();
 
@@ -349,23 +353,14 @@ public abstract class AbstractFlinkTableFactory
 
         String template =
                 String.format(
-                        "(%s)\\.(%s|\\*)\\.(%s|\\*)\\.(%s|\\*)\\.(.+)",
+                        "(%s)(%s|\\*)\\.(%s|\\*)\\.(%s|\\*)\\.(.+)",
                         FlinkConnectorOptions.TABLE_DYNAMIC_OPTION_PREFIX,
                         context.getObjectIdentifier().getCatalogName(),
                         context.getObjectIdentifier().getDatabaseName(),
                         context.getObjectIdentifier().getObjectName());
         Pattern pattern = Pattern.compile(template);
-
-        conf.keySet()
-                .forEach(
-                        (key) -> {
-                            if 
(key.startsWith(FlinkConnectorOptions.TABLE_DYNAMIC_OPTION_PREFIX)) {
-                                Matcher matcher = pattern.matcher(key);
-                                if (matcher.find()) {
-                                    
optionsFromTableConfig.put(matcher.group(5), conf.get(key));
-                                }
-                            }
-                        });
+        Map<String, String> optionsFromTableConfig =
+                OptionsUtils.convertToDynamicTableProperties(conf, "", 
pattern, 5);
 
         if (!optionsFromTableConfig.isEmpty()) {
             LOG.info(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 55e21a354..e7bc6d23d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -44,7 +44,7 @@ public class FlinkConnectorOptions {
 
     public static final String NONE = "none";
 
-    public static final String TABLE_DYNAMIC_OPTION_PREFIX = "paimon";
+    public static final String TABLE_DYNAMIC_OPTION_PREFIX = "paimon.";
 
     public static final int MIN_CLUSTERING_SAMPLE_FACTOR = 20;
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java
index aeb46da8d..38d48fa21 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java
@@ -63,8 +63,7 @@ public class AbstractFlinkTableFactoryTest {
     @Test
     public void testGetDynamicOptions() {
         Configuration configuration = new Configuration();
-        configuration.setString("paimon.catalog1.db.T.k1", "v1");
-        configuration.setString("paimon.*.db.*.k2", "v2");
+        configuration.setString("k1", "v2");
         ObjectIdentifier identifier = ObjectIdentifier.of("catalog1", "db", 
"T");
         DynamicTableFactory.Context context =
                 new FactoryUtil.DefaultDynamicTableContext(
@@ -74,9 +73,25 @@ public class AbstractFlinkTableFactoryTest {
                         configuration,
                         AbstractFlinkTableFactoryTest.class.getClassLoader(),
                         false);
-        Map<String, String> options =
-                
AbstractFlinkTableFactory.getDynamicTableConfigOptions(context);
-        assertThat(options).isEqualTo(ImmutableMap.of("k1", "v1", "k2", "v2"));
+        Map<String, String> options = 
AbstractFlinkTableFactory.getDynamicConfigOptions(context);
+        assertThat(options).isEqualTo(ImmutableMap.of("k1", "v2"));
+
+        configuration = new Configuration();
+        configuration.setString("k1", "v2");
+        configuration.setString("k3", "v3");
+        configuration.setString("paimon.catalog1.db.T.k1", "v1");
+        configuration.setString("paimon.*.db.*.k2", "v2");
+        identifier = ObjectIdentifier.of("catalog1", "db", "T");
+        context =
+                new FactoryUtil.DefaultDynamicTableContext(
+                        identifier,
+                        null,
+                        new HashMap<>(),
+                        configuration,
+                        AbstractFlinkTableFactoryTest.class.getClassLoader(),
+                        false);
+        options = AbstractFlinkTableFactory.getDynamicConfigOptions(context);
+        assertThat(options).isEqualTo(ImmutableMap.of("k1", "v1", "k2", "v2", 
"k3", "v3"));
     }
 
     private void innerTest(RowType r1, RowType r2, boolean expectEquals) {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index bf19d1ec7..ae5ab8b6e 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -503,7 +503,9 @@ public class SparkCatalog extends SparkBaseCatalog 
implements SupportFunction {
             if (paimonTable instanceof FormatTable) {
                 return convertToFileTable(ident, (FormatTable) paimonTable);
             } else {
-                return new SparkTable(copyWithSQLConf(paimonTable, 
extraOptions));
+                return new SparkTable(
+                        copyWithSQLConf(
+                                paimonTable, catalogName, toIdentifier(ident), 
extraOptions));
             }
         } catch (Catalog.TableNotExistException e) {
             throw new NoSuchTableException(ident);
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
index 67ab1312f..0170a29f6 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
@@ -18,11 +18,13 @@
 
 package org.apache.paimon.spark
 
-import org.apache.paimon.catalog.CatalogContext
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.catalog.{CatalogContext, CatalogUtils, Identifier}
 import org.apache.paimon.options.Options
+import org.apache.paimon.spark.SparkSource.NAME
 import org.apache.paimon.spark.commands.WriteIntoPaimonTable
 import org.apache.paimon.spark.sources.PaimonSink
-import org.apache.paimon.spark.util.OptionUtils.mergeSQLConf
+import org.apache.paimon.spark.util.OptionUtils.{extractCatalogName, 
mergeSQLConfWithIdentifier}
 import org.apache.paimon.table.{DataTable, FileStoreTable, 
FileStoreTableFactory}
 import org.apache.paimon.table.system.AuditLogTable
 
@@ -80,9 +82,15 @@ class SparkSource
   }
 
   private def loadTable(options: JMap[String, String]): DataTable = {
+    val path = CoreOptions.path(options)
     val catalogContext = CatalogContext.create(
-      Options.fromMap(mergeSQLConf(options)),
-      SparkSession.active.sessionState.newHadoopConf())
+      Options.fromMap(
+        mergeSQLConfWithIdentifier(
+          options,
+          extractCatalogName().getOrElse(NAME),
+          Identifier.create(CatalogUtils.database(path), 
CatalogUtils.table(path)))),
+      SparkSession.active.sessionState.newHadoopConf()
+    )
     val table = FileStoreTableFactory.create(catalogContext)
     if (Options.fromMap(options).get(SparkConnectorOptions.READ_CHANGELOG)) {
       new AuditLogTable(table)
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
index 5b762ffb4..b60dd1fb2 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
@@ -18,34 +18,61 @@
 
 package org.apache.paimon.spark.util
 
+import org.apache.paimon.catalog.Identifier
 import org.apache.paimon.table.Table
 
 import org.apache.spark.sql.catalyst.SQLConfHelper
 
-import java.util.{HashMap => JHashMap, Map => JMap}
+import java.util.{Map => JMap}
+import java.util.regex.Pattern
 
 import scala.collection.JavaConverters._
 
 object OptionUtils extends SQLConfHelper {
 
   private val PAIMON_OPTION_PREFIX = "spark.paimon."
+  private val SPARK_CATALOG_PREFIX = "spark.sql.catalog."
 
-  def mergeSQLConf(extraOptions: JMap[String, String]): JMap[String, String] = 
{
-    val mergedOptions = new JHashMap[String, String](
-      conf.getAllConfs
-        .filterKeys(_.startsWith(PAIMON_OPTION_PREFIX))
-        .map {
-          case (key, value) =>
-            key.stripPrefix(PAIMON_OPTION_PREFIX) -> value
-        }
-        .toMap
-        .asJava)
+  def extractCatalogName(): Option[String] = {
+    val sparkCatalogTemplate = String.format("%s([^.]*)$", 
SPARK_CATALOG_PREFIX)
+    val sparkCatalogPattern = Pattern.compile(sparkCatalogTemplate)
+    conf.getAllConfs.filterKeys(_.startsWith(SPARK_CATALOG_PREFIX)).foreach {
+      case (key, _) =>
+        val matcher = sparkCatalogPattern.matcher(key)
+        if (matcher.find())
+          return Option(matcher.group(1))
+    }
+    Option.empty
+  }
+
+  def mergeSQLConfWithIdentifier(
+      extraOptions: JMap[String, String],
+      catalogName: String,
+      ident: Identifier): JMap[String, String] = {
+    val tableOptionsTemplate = String.format(
+      "(%s)(%s|\\*)\\.(%s|\\*)\\.(%s|\\*)\\.(.+)",
+      PAIMON_OPTION_PREFIX,
+      catalogName,
+      ident.getDatabaseName,
+      ident.getObjectName)
+    val tableOptionsPattern = Pattern.compile(tableOptionsTemplate)
+    val mergedOptions = org.apache.paimon.options.OptionsUtils
+      .convertToDynamicTableProperties(
+        conf.getAllConfs.asJava,
+        PAIMON_OPTION_PREFIX,
+        tableOptionsPattern,
+        5)
     mergedOptions.putAll(extraOptions)
     mergedOptions
   }
 
-  def copyWithSQLConf[T <: Table](table: T, extraOptions: JMap[String, 
String]): T = {
-    val mergedOptions = mergeSQLConf(extraOptions)
+  def copyWithSQLConf[T <: Table](
+      table: T,
+      catalogName: String,
+      ident: Identifier,
+      extraOptions: JMap[String, String]): T = {
+    val mergedOptions: JMap[String, String] =
+      mergeSQLConfWithIdentifier(extraOptions, catalogName, ident)
     if (mergedOptions.isEmpty) {
       table
     } else {
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala
index d35ac1d70..f74d6959b 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala
@@ -76,4 +76,130 @@ class PaimonOptionTest extends PaimonSparkTestBase {
       }
     }
   }
+
+  test("Paimon Table Options: query one table with sql conf and table 
options") {
+    sql("CREATE TABLE T (id INT)")
+    sql("INSERT INTO T VALUES 1")
+    sql("INSERT INTO T VALUES 2")
+    checkAnswer(sql("SELECT * FROM T ORDER BY id"), Row(1) :: Row(2) :: Nil)
+    val table = loadTable("T")
+
+    // query with global options
+    withSQLConf("spark.paimon.scan.snapshot-id" -> "1") {
+      checkAnswer(sql("SELECT * FROM T ORDER BY id"), Row(1))
+      checkAnswer(spark.read.format("paimon").load(table.location().toString), 
Row(1))
+    }
+
+    // query with table options
+    withSQLConf("spark.paimon.*.*.T.scan.snapshot-id" -> "1") {
+      checkAnswer(sql("SELECT * FROM T ORDER BY id"), Row(1))
+      checkAnswer(spark.read.format("paimon").load(table.location().toString), 
Row(1))
+    }
+
+    // query with both global and table options
+    withSQLConf(
+      "spark.paimon.scan.snapshot-id" -> "1",
+      "spark.paimon.*.*.T.scan.snapshot-id" -> "2") {
+      checkAnswer(sql("SELECT * FROM T ORDER BY id"), Row(1) :: Row(2) :: Nil)
+      checkAnswer(
+        spark.read.format("paimon").load(table.location().toString),
+        Row(1) :: Row(2) :: Nil)
+    }
+  }
+
+  test("Paimon Table Options: query multiple tables with sql conf and table 
options") {
+    sql("CREATE TABLE T1 (id INT)")
+    sql("INSERT INTO T1 VALUES 1")
+    sql("INSERT INTO T1 VALUES 2")
+
+    sql("CREATE TABLE T2 (id INT)")
+    sql("INSERT INTO T2 VALUES 1")
+    sql("INSERT INTO T2 VALUES 2")
+    checkAnswer(
+      sql("SELECT * FROM T1 join T2 on T1.id = T2.id ORDER BY T1.id"),
+      Row(1, 1) :: Row(2, 2) :: Nil)
+    val table1 = loadTable("T1")
+    val table2 = loadTable("T1")
+
+    // query with global options
+    withSQLConf("spark.paimon.scan.snapshot-id" -> "1") {
+      checkAnswer(sql("SELECT * FROM T1 join T2 on T1.id = T2.id ORDER BY 
T1.id"), Row(1, 1))
+      checkAnswer(
+        spark.read
+          .format("paimon")
+          .load(table1.location().toString)
+          .join(spark.read.format("paimon").load(table2.location().toString), 
"id"),
+        Row(1)
+      )
+    }
+
+    // query with table options
+    withSQLConf("spark.paimon.*.*.*.scan.snapshot-id" -> "1") {
+      checkAnswer(sql("SELECT * FROM T1 join T2 on T1.id = T2.id ORDER BY 
T1.id"), Row(1, 1))
+      checkAnswer(
+        spark.read
+          .format("paimon")
+          .load(table1.location().toString)
+          .join(spark.read.format("paimon").load(table2.location().toString), 
"id"),
+        Row(1)
+      )
+    }
+
+    // query with both global and table options
+    withSQLConf(
+      "spark.paimon.scan.snapshot-id" -> "1",
+      "spark.paimon.*.*.*.scan.snapshot-id" -> "2") {
+      checkAnswer(
+        sql("SELECT * FROM T1 join T2 on T1.id = T2.id ORDER BY T1.id"),
+        Row(1, 1) :: Row(2, 2) :: Nil)
+      checkAnswer(
+        spark.read
+          .format("paimon")
+          .load(table1.location().toString)
+          .join(spark.read.format("paimon").load(table2.location().toString), 
"id"),
+        Row(1) :: Row(2) :: Nil
+      )
+    }
+
+    withSQLConf(
+      "spark.paimon.*.*.T1.scan.snapshot-id" -> "1",
+      "spark.paimon.*.*.T2.scan.snapshot-id" -> "1") {
+      checkAnswer(sql("SELECT * FROM T1 join T2 on T1.id = T2.id ORDER BY 
T1.id"), Row(1, 1))
+      checkAnswer(
+        spark.read
+          .format("paimon")
+          .load(table1.location().toString)
+          .join(spark.read.format("paimon").load(table2.location().toString), 
"id"),
+        Row(1)
+      )
+    }
+
+    withSQLConf(
+      "spark.paimon.*.*.T1.scan.snapshot-id" -> "1",
+      "spark.paimon.*.*.T2.scan.snapshot-id" -> "2") {
+      checkAnswer(sql("SELECT * FROM T1 join T2 on T1.id = T2.id ORDER BY 
T1.id"), Row(1, 1))
+      checkAnswer(
+        spark.read
+          .format("paimon")
+          .load(table1.location().toString)
+          .join(spark.read.format("paimon").load(table2.location().toString), 
"id"),
+        Row(1)
+      )
+    }
+
+    withSQLConf(
+      "spark.paimon.*.*.T1.scan.snapshot-id" -> "2",
+      "spark.paimon.*.*.T2.scan.snapshot-id" -> "2") {
+      checkAnswer(
+        sql("SELECT * FROM T1 join T2 on T1.id = T2.id ORDER BY T1.id"),
+        Row(1, 1) :: Row(2, 2) :: Nil)
+      checkAnswer(
+        spark.read
+          .format("paimon")
+          .load(table1.location().toString)
+          .join(spark.read.format("paimon").load(table2.location().toString), 
"id"),
+        Row(1) :: Row(2) :: Nil
+      )
+    }
+  }
 }

Reply via email to