This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b8d7ac74b0 [core] Allow chain table to use non-deduplicate merge 
engines. (#7172)
b8d7ac74b0 is described below

commit b8d7ac74b0929a6d3a33bc90bc02a530a9fb0df0
Author: Junrui Lee <[email protected]>
AuthorDate: Mon Feb 2 17:47:18 2026 +0800

    [core] Allow chain table to use non-deduplicate merge engines. (#7172)
---
 .../org/apache/paimon/schema/SchemaValidation.java |   3 -
 .../apache/paimon/schema/SchemaValidationTest.java |  24 ++
 .../apache/paimon/spark/SparkChainTableITCase.java | 316 ++++++++++++++++-----
 3 files changed, 268 insertions(+), 75 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index fd07bcef17..f4cb16a018 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -660,9 +660,6 @@ public class SchemaValidation {
                     options.bucket() > 0, "Bucket number must be greater than 
0 for chain table.");
             Preconditions.checkArgument(
                     options.sequenceField() != null, "Sequence field is 
required for chain table.");
-            Preconditions.checkArgument(
-                    options.mergeEngine() == MergeEngine.DEDUPLICATE,
-                    "Merge engine must be deduplicate for chain table.");
             Preconditions.checkArgument(
                     changelogProducer == ChangelogProducer.NONE
                             || changelogProducer == ChangelogProducer.INPUT,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java 
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
index 57613fe67f..9b789e246b 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
@@ -176,4 +176,28 @@ class SchemaValidationTest {
         options.put("fields.f2.sequence-group", "f3");
         assertThatCode(() -> 
validateTableSchemaExec(options)).doesNotThrowAnyException();
     }
+
+    @Test
+    public void testChainTableAllowsNonDeduplicateMergeEngine() {
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.CHAIN_TABLE_ENABLED.key(), "true");
+        options.put(CoreOptions.BUCKET.key(), "1");
+        options.put(CoreOptions.SEQUENCE_FIELD.key(), "f2");
+        options.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), "$f0");
+        options.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), 
"yyyy-MM-dd");
+        options.put(CoreOptions.MERGE_ENGINE.key(), "partial-update");
+
+        List<DataField> fields =
+                Arrays.asList(
+                        new DataField(0, "f0", DataTypes.STRING()),
+                        new DataField(1, "f1", DataTypes.INT()),
+                        new DataField(2, "f2", DataTypes.BIGINT()),
+                        new DataField(3, "f3", DataTypes.STRING()));
+        List<String> partitionKeys = singletonList("f0");
+        List<String> primaryKeys = Arrays.asList("f0", "f1");
+        TableSchema schema =
+                new TableSchema(1, fields, 10, partitionKeys, primaryKeys, 
options, "");
+
+        assertThatNoException().isThrownBy(() -> validateTableSchema(schema));
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
index b5a28fe500..c808b56491 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
@@ -51,28 +51,55 @@ public class SparkChainTableITCase {
         testHiveMetastore.stop();
     }
 
+    private SparkSession.Builder createSparkSessionBuilder(Path warehousePath) 
{
+        return SparkSession.builder()
+                .config("spark.sql.warehouse.dir", warehousePath.toString())
+                // with hive metastore
+                .config("spark.sql.catalogImplementation", "hive")
+                .config("hive.metastore.uris", "thrift://localhost:" + PORT)
+                .config("spark.sql.catalog.spark_catalog", 
SparkCatalog.class.getName())
+                .config("spark.sql.catalog.spark_catalog.metastore", "hive")
+                .config(
+                        "spark.sql.catalog.spark_catalog.hive.metastore.uris",
+                        "thrift://localhost:" + PORT)
+                
.config("spark.sql.catalog.spark_catalog.format-table.enabled", "true")
+                .config("spark.sql.catalog.spark_catalog.warehouse", 
warehousePath.toString())
+                .config(
+                        "spark.sql.extensions",
+                        
"org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
+                .master("local[2]");
+    }
+
+    private void setupChainTableBranches(SparkSession spark, String tableName) 
{
+        // Create branches
+        spark.sql(String.format("CALL sys.create_branch('my_db1.%s', 
'snapshot');", tableName));
+        spark.sql(String.format("CALL sys.create_branch('my_db1.%s', 
'delta')", tableName));
+
+        // Set branch properties
+        spark.sql(
+                String.format(
+                        "ALTER TABLE my_db1.%s SET tblproperties ("
+                                + "'scan.fallback-snapshot-branch' = 
'snapshot', "
+                                + "'scan.fallback-delta-branch' = 'delta')",
+                        tableName));
+        spark.sql(
+                String.format(
+                        "ALTER TABLE `my_db1`.`%s$branch_snapshot` SET 
tblproperties ("
+                                + "'scan.fallback-snapshot-branch' = 
'snapshot',"
+                                + "'scan.fallback-delta-branch' = 'delta')",
+                        tableName));
+        spark.sql(
+                String.format(
+                        "ALTER TABLE `my_db1`.`%s$branch_delta` SET 
tblproperties ("
+                                + "'scan.fallback-snapshot-branch' = 
'snapshot',"
+                                + "'scan.fallback-delta-branch' = 'delta')",
+                        tableName));
+    }
+
     @Test
     public void testChainTable(@TempDir java.nio.file.Path tempDir) throws 
IOException {
         Path warehousePath = new Path("file:" + tempDir.toString());
-        SparkSession.Builder builder =
-                SparkSession.builder()
-                        .config("spark.sql.warehouse.dir", 
warehousePath.toString())
-                        // with hive metastore
-                        .config("spark.sql.catalogImplementation", "hive")
-                        .config("hive.metastore.uris", "thrift://localhost:" + 
PORT)
-                        .config("spark.sql.catalog.spark_catalog", 
SparkCatalog.class.getName())
-                        .config("spark.sql.catalog.spark_catalog.metastore", 
"hive")
-                        .config(
-                                
"spark.sql.catalog.spark_catalog.hive.metastore.uris",
-                                "thrift://localhost:" + PORT)
-                        
.config("spark.sql.catalog.spark_catalog.format-table.enabled", "true")
-                        .config(
-                                "spark.sql.catalog.spark_catalog.warehouse",
-                                warehousePath.toString())
-                        .config(
-                                "spark.sql.extensions",
-                                
"org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
-                        .master("local[2]");
+        SparkSession.Builder builder = 
createSparkSessionBuilder(warehousePath);
         SparkSession spark = builder.getOrCreate();
         spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
         spark.sql("USE spark_catalog.my_db1");
@@ -97,23 +124,7 @@ public class SparkChainTableITCase {
                         + "    'sequence.field' = 't2'\n"
                         + "  )");
 
-        /** Create branch */
-        spark.sql("CALL sys.create_branch('my_db1.chain_test', 'snapshot');");
-        spark.sql("CALL sys.create_branch('my_db1.chain_test', 'delta')");
-
-        /** Set branch */
-        spark.sql(
-                "ALTER TABLE my_db1.chain_test SET tblproperties ("
-                        + "'scan.fallback-snapshot-branch' = 'snapshot', "
-                        + "'scan.fallback-delta-branch' = 'delta')");
-        spark.sql(
-                "ALTER TABLE `my_db1`.`chain_test$branch_snapshot` SET 
tblproperties ("
-                        + "'scan.fallback-snapshot-branch' = 'snapshot',"
-                        + "'scan.fallback-delta-branch' = 'delta')");
-        spark.sql(
-                "ALTER TABLE `my_db1`.`chain_test$branch_delta` SET 
tblproperties ("
-                        + "'scan.fallback-snapshot-branch' = 'snapshot',"
-                        + "'scan.fallback-delta-branch' = 'delta')");
+        setupChainTableBranches(spark, "chain_test");
         spark.close();
         spark = builder.getOrCreate();
 
@@ -281,25 +292,7 @@ public class SparkChainTableITCase {
     @Test
     public void testHourlyChainTable(@TempDir java.nio.file.Path tempDir) 
throws IOException {
         Path warehousePath = new Path("file:" + tempDir.toString());
-        SparkSession.Builder builder =
-                SparkSession.builder()
-                        .config("spark.sql.warehouse.dir", 
warehousePath.toString())
-                        // with hive metastore
-                        .config("spark.sql.catalogImplementation", "hive")
-                        .config("hive.metastore.uris", "thrift://localhost:" + 
PORT)
-                        .config("spark.sql.catalog.spark_catalog", 
SparkCatalog.class.getName())
-                        .config("spark.sql.catalog.spark_catalog.metastore", 
"hive")
-                        .config(
-                                
"spark.sql.catalog.spark_catalog.hive.metastore.uris",
-                                "thrift://localhost:" + PORT)
-                        
.config("spark.sql.catalog.spark_catalog.format-table.enabled", "true")
-                        .config(
-                                "spark.sql.catalog.spark_catalog.warehouse",
-                                warehousePath.toString())
-                        .config(
-                                "spark.sql.extensions",
-                                
"org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
-                        .master("local[2]");
+        SparkSession.Builder builder = 
createSparkSessionBuilder(warehousePath);
         SparkSession spark = builder.getOrCreate();
         spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
         spark.sql("USE spark_catalog.my_db1");
@@ -324,23 +317,7 @@ public class SparkChainTableITCase {
                         + "    'sequence.field' = 't2'\n"
                         + "  )");
 
-        /** Create branch */
-        spark.sql("CALL sys.create_branch('my_db1.chain_test', 'snapshot');");
-        spark.sql("CALL sys.create_branch('my_db1.chain_test', 'delta')");
-
-        /** Set branch */
-        spark.sql(
-                "ALTER TABLE my_db1.chain_test SET tblproperties ("
-                        + "'scan.fallback-snapshot-branch' = 'snapshot', "
-                        + "'scan.fallback-delta-branch' = 'delta')");
-        spark.sql(
-                "ALTER TABLE `my_db1`.`chain_test$branch_snapshot` SET 
tblproperties ("
-                        + "'scan.fallback-snapshot-branch' = 'snapshot',"
-                        + "'scan.fallback-delta-branch' = 'delta')");
-        spark.sql(
-                "ALTER TABLE `my_db1`.`chain_test$branch_delta` SET 
tblproperties ("
-                        + "'scan.fallback-snapshot-branch' = 'snapshot',"
-                        + "'scan.fallback-delta-branch' = 'delta')");
+        setupChainTableBranches(spark, "chain_test");
         spark.close();
         spark = builder.getOrCreate();
 
@@ -506,4 +483,199 @@ public class SparkChainTableITCase {
 
         spark.close();
     }
+
+    /**
+     * Test chain table with partial-update merge engine.
+     *
+     * <p>Data layout across branches and partitions (key is primary key, seq 
is sequence field):
+     *
+     * <pre>
+     * 
┌──────────┬──────────┬─────────────────────────────────────────────────────────────────┐
+     * │  Branch  │    dt    │                    Data (key, seq, v1, v2)      
                │
+     * 
├──────────┼──────────┼─────────────────────────────────────────────────────────────────┤
+     * │   main   │ 20250810 │ (1,1,'a','A'), (2,1,'b','B')                    
                │
+     * 
├──────────┼──────────┼─────────────────────────────────────────────────────────────────┤
+     * │  delta   │ 20250809 │ (3,1,'c','C'), (4,1,'d','D')                    
                │
+     * │          │ 20250810 │ (1,2,null,'A1'), (2,2,'b1',null), (3,1,'c','C') 
                │
+     * │          │ 20250811 │ (1,3,'a1',null), (2,3,null,'B1'), (5,1,'e','E') 
                │
+     * │          │ 20250812 │ (1,4,null,'A2'), (5,2,'e1',null)                
                │
+     * 
├──────────┼──────────┼─────────────────────────────────────────────────────────────────┤
+     * │ snapshot │ 20250810 │ (1,2,'a','A1'), (2,2,'b1','B'), (3,1,'c','C')   
                │
+     * │          │ 20250812 │ (1,4,'a1','A2'), (2,3,'b1','B1'), 
(3,1,'c','C'), (5,2,'e1','E') │
+     * 
└──────────┴──────────┴─────────────────────────────────────────────────────────────────┘
+     * </pre>
+     *
+     * <p>Expected read results (chain read merges snapshot + delta with 
partial-update):
+     *
+     * <pre>
+     * 
┌──────────┬───────────────────────────────────────────────────────────────────────────────┐
+     * │    dt    │  Result (key, seq, v1, v2)                                 
                   │
+     * 
├──────────┼───────────────────────────────────────────────────────────────────────────────┤
+     * │ 20250809 │ (3,1,'c','C'), (4,1,'d','D')             -- delta only, no 
pre-snapshot       │
+     * │ 20250810 │ (1,1,'a','A'), (2,1,'b','B')             -- main branch 
data                  │
+     * │ 20250811 │ (1,3,'a1','A1'), (2,3,'b1','B1'), (3,1,'c','C'), 
(5,1,'e','E')                │
+     * │          │                                       -- 
snapshot[20250810] + delta[20250811] │
+     * │ 20250812 │ (1,4,'a1','A2'), (2,3,'b1','B1'), (3,1,'c','C'), 
(5,2,'e1','E')               │
+     * │          │                                          -- snapshot 
branch data              │
+     * 
└──────────┴───────────────────────────────────────────────────────────────────────────────┘
+     * </pre>
+     */
+    @Test
+    public void testChainTableWithPartialUpdate(@TempDir java.nio.file.Path 
tempDir)
+            throws IOException {
+        Path warehousePath = new Path("file:" + tempDir.toString());
+        SparkSession.Builder builder = 
createSparkSessionBuilder(warehousePath);
+        SparkSession spark = builder.getOrCreate();
+        spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
+        spark.sql("USE spark_catalog.my_db1");
+
+        // Create table with partial-update merge engine
+        spark.sql(
+                "CREATE TABLE IF NOT EXISTS \n"
+                        + "  `my_db1`.`chain_test_partial` (\n"
+                        + "    `key` BIGINT COMMENT 'key',\n"
+                        + "    `seq` BIGINT COMMENT 'seq',\n"
+                        + "    `v1` STRING COMMENT 'v1',\n"
+                        + "    `v2` STRING COMMENT 'v2'\n"
+                        + "  ) PARTITIONED BY (`dt` STRING COMMENT 'dt') ROW 
FORMAT SERDE 'org.apache.paimon.hive.PaimonSerDe'\n"
+                        + "WITH\n"
+                        + "  SERDEPROPERTIES ('serialization.format' = '1') 
STORED AS INPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonInputFormat' 
OUTPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonOutputFormat' TBLPROPERTIES 
(\n"
+                        + "    'bucket-key' = 'key',\n"
+                        + "    'primary-key' = 'dt,key',\n"
+                        + "    'partition.timestamp-pattern' = '$dt',\n"
+                        + "    'partition.timestamp-formatter' = 'yyyyMMdd',\n"
+                        + "    'chain-table.enabled' = 'true',\n"
+                        + "    'bucket' = '2',\n"
+                        + "    'merge-engine' = 'partial-update', \n"
+                        + "    'sequence.field' = 'seq'\n"
+                        + "  )");
+
+        setupChainTableBranches(spark, "chain_test_partial");
+        spark.close();
+        spark = builder.getOrCreate();
+
+        // Write main branch
+        spark.sql(
+                "insert overwrite table `my_db1`.`chain_test_partial` 
partition (dt = '20250810') "
+                        + "values (1, 1, 'a', 'A'), (2, 1, 'b', 'B');");
+
+        // Write delta branch
+        spark.sql("set spark.paimon.branch=delta;");
+        spark.sql(
+                "insert overwrite table `my_db1`.`chain_test_partial` 
partition (dt = '20250809') "
+                        + "values (3, 1, 'c', 'C'), (4, 1, 'd', 'D');");
+        spark.sql(
+                "insert overwrite table `my_db1`.`chain_test_partial` 
partition (dt = '20250810') "
+                        + "values (1, 2, CAST(NULL AS STRING), 'A1'), "
+                        + "(2, 2, 'b1', CAST(NULL AS STRING)), "
+                        + "(3, 1, 'c', 'C');");
+        spark.sql(
+                "insert overwrite table `my_db1`.`chain_test_partial` 
partition (dt = '20250811') "
+                        + "values (1, 3, 'a1', CAST(NULL AS STRING)), "
+                        + "(2, 3, CAST(NULL AS STRING), 'B1'), "
+                        + "(5, 1, 'e', 'E');");
+        spark.sql(
+                "insert overwrite table `my_db1`.`chain_test_partial` 
partition (dt = '20250812') "
+                        + "values (1, 4, CAST(NULL AS STRING), 'A2'), "
+                        + "(5, 2, 'e1', CAST(NULL AS STRING));");
+
+        // Write snapshot branch
+        spark.sql("set spark.paimon.branch=snapshot;");
+        spark.sql(
+                "insert overwrite table `my_db1`.`chain_test_partial` 
partition (dt = '20250810') "
+                        + "values (1, 2, 'a', 'A1'), (2, 2, 'b1', 'B'), (3, 1, 
'c', 'C');");
+        spark.sql(
+                "insert overwrite table `my_db1`.`chain_test_partial` 
partition (dt = '20250812') "
+                        + "values (1, 4, 'a1', 'A2'), (2, 3, 'b1', 'B1'), (3, 
1, 'c', 'C'), (5, 2, 'e1', 'E');");
+
+        spark.close();
+        spark = builder.getOrCreate();
+
+        // Main read - should return original main branch data
+        assertThat(
+                        spark
+                                .sql(
+                                        "SELECT * FROM 
`my_db1`.`chain_test_partial` where dt = '20250810'")
+                                .collectAsList().stream()
+                                .map(Row::toString)
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder("[1,1,a,A,20250810]", 
"[2,1,b,B,20250810]");
+
+        // Snapshot read - should return snapshot branch data directly
+        assertThat(
+                        spark
+                                .sql(
+                                        "SELECT * FROM 
`my_db1`.`chain_test_partial` where dt = '20250812'")
+                                .collectAsList().stream()
+                                .map(Row::toString)
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder(
+                        "[1,4,a1,A2,20250812]",
+                        "[2,3,b1,B1,20250812]",
+                        "[3,1,c,C,20250812]",
+                        "[5,2,e1,E,20250812]");
+
+        // Chain read
+        // 1. non pre snapshot - read delta directly (20250809)
+        assertThat(
+                        spark
+                                .sql(
+                                        "SELECT * FROM 
`my_db1`.`chain_test_partial` where dt = '20250809'")
+                                .collectAsList().stream()
+                                .map(Row::toString)
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder("[3,1,c,C,20250809]", 
"[4,1,d,D,20250809]");
+
+        // 2. has pre snapshot (20250811) - should merge snapshot(20250810) + 
delta(20250811)
+        assertThat(
+                        spark
+                                .sql(
+                                        "SELECT * FROM 
`my_db1`.`chain_test_partial` where dt = '20250811'")
+                                .collectAsList().stream()
+                                .map(Row::toString)
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder(
+                        "[1,3,a1,A1,20250811]",
+                        "[2,3,b1,B1,20250811]",
+                        "[3,1,c,C,20250811]",
+                        "[5,1,e,E,20250811]");
+
+        // Multi partition Read
+        assertThat(
+                        spark
+                                .sql(
+                                        "SELECT * FROM 
`my_db1`.`chain_test_partial` where dt in ('20250810', '20250811', '20250812')")
+                                .collectAsList().stream()
+                                .map(Row::toString)
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder(
+                        "[1,1,a,A,20250810]",
+                        "[2,1,b,B,20250810]",
+                        "[1,3,a1,A1,20250811]",
+                        "[2,3,b1,B1,20250811]",
+                        "[3,1,c,C,20250811]",
+                        "[5,1,e,E,20250811]",
+                        "[1,4,a1,A2,20250812]",
+                        "[2,3,b1,B1,20250812]",
+                        "[3,1,c,C,20250812]",
+                        "[5,2,e1,E,20250812]");
+
+        // Incremental read - read delta branch only with partial update data
+        assertThat(
+                        spark
+                                .sql(
+                                        "SELECT * FROM 
`my_db1`.`chain_test_partial$branch_delta` where dt = '20250811'")
+                                .collectAsList().stream()
+                                .map(Row::toString)
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder(
+                        "[1,3,a1,null,20250811]", "[2,3,null,B1,20250811]", 
"[5,1,e,E,20250811]");
+
+        spark.close();
+        spark = builder.getOrCreate();
+        // Drop table
+        spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test_partial`;");
+
+        spark.close();
+    }
 }

Reply via email to