This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b8d7ac74b0 [core] Allow chain table to use non-deduplicate merge
engines. (#7172)
b8d7ac74b0 is described below
commit b8d7ac74b0929a6d3a33bc90bc02a530a9fb0df0
Author: Junrui Lee <[email protected]>
AuthorDate: Mon Feb 2 17:47:18 2026 +0800
[core] Allow chain table to use non-deduplicate merge engines. (#7172)
---
.../org/apache/paimon/schema/SchemaValidation.java | 3 -
.../apache/paimon/schema/SchemaValidationTest.java | 24 ++
.../apache/paimon/spark/SparkChainTableITCase.java | 316 ++++++++++++++++-----
3 files changed, 268 insertions(+), 75 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index fd07bcef17..f4cb16a018 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -660,9 +660,6 @@ public class SchemaValidation {
options.bucket() > 0, "Bucket number must be greater than
0 for chain table.");
Preconditions.checkArgument(
options.sequenceField() != null, "Sequence field is
required for chain table.");
- Preconditions.checkArgument(
- options.mergeEngine() == MergeEngine.DEDUPLICATE,
- "Merge engine must be deduplicate for chain table.");
Preconditions.checkArgument(
changelogProducer == ChangelogProducer.NONE
|| changelogProducer == ChangelogProducer.INPUT,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
index 57613fe67f..9b789e246b 100644
---
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
@@ -176,4 +176,28 @@ class SchemaValidationTest {
options.put("fields.f2.sequence-group", "f3");
assertThatCode(() ->
validateTableSchemaExec(options)).doesNotThrowAnyException();
}
+
+ @Test
+ public void testChainTableAllowsNonDeduplicateMergeEngine() {
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.CHAIN_TABLE_ENABLED.key(), "true");
+ options.put(CoreOptions.BUCKET.key(), "1");
+ options.put(CoreOptions.SEQUENCE_FIELD.key(), "f2");
+ options.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), "$f0");
+ options.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(),
"yyyy-MM-dd");
+ options.put(CoreOptions.MERGE_ENGINE.key(), "partial-update");
+
+ List<DataField> fields =
+ Arrays.asList(
+ new DataField(0, "f0", DataTypes.STRING()),
+ new DataField(1, "f1", DataTypes.INT()),
+ new DataField(2, "f2", DataTypes.BIGINT()),
+ new DataField(3, "f3", DataTypes.STRING()));
+ List<String> partitionKeys = singletonList("f0");
+ List<String> primaryKeys = Arrays.asList("f0", "f1");
+ TableSchema schema =
+ new TableSchema(1, fields, 10, partitionKeys, primaryKeys,
options, "");
+
+ assertThatNoException().isThrownBy(() -> validateTableSchema(schema));
+ }
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
index b5a28fe500..c808b56491 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
@@ -51,28 +51,55 @@ public class SparkChainTableITCase {
testHiveMetastore.stop();
}
+ private SparkSession.Builder createSparkSessionBuilder(Path warehousePath)
{
+ return SparkSession.builder()
+ .config("spark.sql.warehouse.dir", warehousePath.toString())
+ // with hive metastore
+ .config("spark.sql.catalogImplementation", "hive")
+ .config("hive.metastore.uris", "thrift://localhost:" + PORT)
+ .config("spark.sql.catalog.spark_catalog",
SparkCatalog.class.getName())
+ .config("spark.sql.catalog.spark_catalog.metastore", "hive")
+ .config(
+ "spark.sql.catalog.spark_catalog.hive.metastore.uris",
+ "thrift://localhost:" + PORT)
+
.config("spark.sql.catalog.spark_catalog.format-table.enabled", "true")
+ .config("spark.sql.catalog.spark_catalog.warehouse",
warehousePath.toString())
+ .config(
+ "spark.sql.extensions",
+
"org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
+ .master("local[2]");
+ }
+
+ private void setupChainTableBranches(SparkSession spark, String tableName)
{
+ // Create branches
+ spark.sql(String.format("CALL sys.create_branch('my_db1.%s',
'snapshot');", tableName));
+ spark.sql(String.format("CALL sys.create_branch('my_db1.%s',
'delta')", tableName));
+
+ // Set branch properties
+ spark.sql(
+ String.format(
+ "ALTER TABLE my_db1.%s SET tblproperties ("
+ + "'scan.fallback-snapshot-branch' =
'snapshot', "
+ + "'scan.fallback-delta-branch' = 'delta')",
+ tableName));
+ spark.sql(
+ String.format(
+ "ALTER TABLE `my_db1`.`%s$branch_snapshot` SET
tblproperties ("
+ + "'scan.fallback-snapshot-branch' =
'snapshot',"
+ + "'scan.fallback-delta-branch' = 'delta')",
+ tableName));
+ spark.sql(
+ String.format(
+ "ALTER TABLE `my_db1`.`%s$branch_delta` SET
tblproperties ("
+ + "'scan.fallback-snapshot-branch' =
'snapshot',"
+ + "'scan.fallback-delta-branch' = 'delta')",
+ tableName));
+ }
+
@Test
public void testChainTable(@TempDir java.nio.file.Path tempDir) throws
IOException {
Path warehousePath = new Path("file:" + tempDir.toString());
- SparkSession.Builder builder =
- SparkSession.builder()
- .config("spark.sql.warehouse.dir",
warehousePath.toString())
- // with hive metastore
- .config("spark.sql.catalogImplementation", "hive")
- .config("hive.metastore.uris", "thrift://localhost:" +
PORT)
- .config("spark.sql.catalog.spark_catalog",
SparkCatalog.class.getName())
- .config("spark.sql.catalog.spark_catalog.metastore",
"hive")
- .config(
-
"spark.sql.catalog.spark_catalog.hive.metastore.uris",
- "thrift://localhost:" + PORT)
-
.config("spark.sql.catalog.spark_catalog.format-table.enabled", "true")
- .config(
- "spark.sql.catalog.spark_catalog.warehouse",
- warehousePath.toString())
- .config(
- "spark.sql.extensions",
-
"org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
- .master("local[2]");
+ SparkSession.Builder builder =
createSparkSessionBuilder(warehousePath);
SparkSession spark = builder.getOrCreate();
spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
spark.sql("USE spark_catalog.my_db1");
@@ -97,23 +124,7 @@ public class SparkChainTableITCase {
+ " 'sequence.field' = 't2'\n"
+ " )");
- /** Create branch */
- spark.sql("CALL sys.create_branch('my_db1.chain_test', 'snapshot');");
- spark.sql("CALL sys.create_branch('my_db1.chain_test', 'delta')");
-
- /** Set branch */
- spark.sql(
- "ALTER TABLE my_db1.chain_test SET tblproperties ("
- + "'scan.fallback-snapshot-branch' = 'snapshot', "
- + "'scan.fallback-delta-branch' = 'delta')");
- spark.sql(
- "ALTER TABLE `my_db1`.`chain_test$branch_snapshot` SET
tblproperties ("
- + "'scan.fallback-snapshot-branch' = 'snapshot',"
- + "'scan.fallback-delta-branch' = 'delta')");
- spark.sql(
- "ALTER TABLE `my_db1`.`chain_test$branch_delta` SET
tblproperties ("
- + "'scan.fallback-snapshot-branch' = 'snapshot',"
- + "'scan.fallback-delta-branch' = 'delta')");
+ setupChainTableBranches(spark, "chain_test");
spark.close();
spark = builder.getOrCreate();
@@ -281,25 +292,7 @@ public class SparkChainTableITCase {
@Test
public void testHourlyChainTable(@TempDir java.nio.file.Path tempDir)
throws IOException {
Path warehousePath = new Path("file:" + tempDir.toString());
- SparkSession.Builder builder =
- SparkSession.builder()
- .config("spark.sql.warehouse.dir",
warehousePath.toString())
- // with hive metastore
- .config("spark.sql.catalogImplementation", "hive")
- .config("hive.metastore.uris", "thrift://localhost:" +
PORT)
- .config("spark.sql.catalog.spark_catalog",
SparkCatalog.class.getName())
- .config("spark.sql.catalog.spark_catalog.metastore",
"hive")
- .config(
-
"spark.sql.catalog.spark_catalog.hive.metastore.uris",
- "thrift://localhost:" + PORT)
-
.config("spark.sql.catalog.spark_catalog.format-table.enabled", "true")
- .config(
- "spark.sql.catalog.spark_catalog.warehouse",
- warehousePath.toString())
- .config(
- "spark.sql.extensions",
-
"org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
- .master("local[2]");
+ SparkSession.Builder builder =
createSparkSessionBuilder(warehousePath);
SparkSession spark = builder.getOrCreate();
spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
spark.sql("USE spark_catalog.my_db1");
@@ -324,23 +317,7 @@ public class SparkChainTableITCase {
+ " 'sequence.field' = 't2'\n"
+ " )");
- /** Create branch */
- spark.sql("CALL sys.create_branch('my_db1.chain_test', 'snapshot');");
- spark.sql("CALL sys.create_branch('my_db1.chain_test', 'delta')");
-
- /** Set branch */
- spark.sql(
- "ALTER TABLE my_db1.chain_test SET tblproperties ("
- + "'scan.fallback-snapshot-branch' = 'snapshot', "
- + "'scan.fallback-delta-branch' = 'delta')");
- spark.sql(
- "ALTER TABLE `my_db1`.`chain_test$branch_snapshot` SET
tblproperties ("
- + "'scan.fallback-snapshot-branch' = 'snapshot',"
- + "'scan.fallback-delta-branch' = 'delta')");
- spark.sql(
- "ALTER TABLE `my_db1`.`chain_test$branch_delta` SET
tblproperties ("
- + "'scan.fallback-snapshot-branch' = 'snapshot',"
- + "'scan.fallback-delta-branch' = 'delta')");
+ setupChainTableBranches(spark, "chain_test");
spark.close();
spark = builder.getOrCreate();
@@ -506,4 +483,199 @@ public class SparkChainTableITCase {
spark.close();
}
+
+ /**
+ * Test chain table with partial-update merge engine.
+ *
+ * <p>Data layout across branches and partitions (key is primary key, seq
is sequence field):
+ *
+ * <pre>
+ *
┌──────────┬──────────┬─────────────────────────────────────────────────────────────────┐
+ * │ Branch │ dt │ Data (key, seq, v1, v2)
│
+ *
├──────────┼──────────┼─────────────────────────────────────────────────────────────────┤
+ * │ main │ 20250810 │ (1,1,'a','A'), (2,1,'b','B')
│
+ *
├──────────┼──────────┼─────────────────────────────────────────────────────────────────┤
+ * │ delta │ 20250809 │ (3,1,'c','C'), (4,1,'d','D')
│
+ * │ │ 20250810 │ (1,2,null,'A1'), (2,2,'b1',null), (3,1,'c','C')
│
+ * │ │ 20250811 │ (1,3,'a1',null), (2,3,null,'B1'), (5,1,'e','E')
│
+ * │ │ 20250812 │ (1,4,null,'A2'), (5,2,'e1',null)
│
+ *
├──────────┼──────────┼─────────────────────────────────────────────────────────────────┤
+ * │ snapshot │ 20250810 │ (1,2,'a','A1'), (2,2,'b1','B'), (3,1,'c','C')
│
+ * │ │ 20250812 │ (1,4,'a1','A2'), (2,3,'b1','B1'),
(3,1,'c','C'), (5,2,'e1','E') │
+ *
└──────────┴──────────┴─────────────────────────────────────────────────────────────────┘
+ * </pre>
+ *
+ * <p>Expected read results (chain read merges snapshot + delta with
partial-update):
+ *
+ * <pre>
+ *
┌──────────┬───────────────────────────────────────────────────────────────────────────────┐
+ * │ dt │ Result (key, seq, v1, v2)
│
+ *
├──────────┼───────────────────────────────────────────────────────────────────────────────┤
+ * │ 20250809 │ (3,1,'c','C'), (4,1,'d','D') -- delta only, no
pre-snapshot │
+ * │ 20250810 │ (1,1,'a','A'), (2,1,'b','B') -- main branch
data │
+ * │ 20250811 │ (1,3,'a1','A1'), (2,3,'b1','B1'), (3,1,'c','C'),
(5,1,'e','E') │
+ * │ │ --
snapshot[20250810] + delta[20250811] │
+ * │ 20250812 │ (1,4,'a1','A2'), (2,3,'b1','B1'), (3,1,'c','C'),
(5,2,'e1','E') │
+ * │ │ -- snapshot
branch data │
+ *
└──────────┴───────────────────────────────────────────────────────────────────────────────┘
+ * </pre>
+ */
+ @Test
+ public void testChainTableWithPartialUpdate(@TempDir java.nio.file.Path
tempDir)
+ throws IOException {
+ Path warehousePath = new Path("file:" + tempDir.toString());
+ SparkSession.Builder builder =
createSparkSessionBuilder(warehousePath);
+ SparkSession spark = builder.getOrCreate();
+ spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
+ spark.sql("USE spark_catalog.my_db1");
+
+ // Create table with partial-update merge engine
+ spark.sql(
+ "CREATE TABLE IF NOT EXISTS \n"
+ + " `my_db1`.`chain_test_partial` (\n"
+ + " `key` BIGINT COMMENT 'key',\n"
+ + " `seq` BIGINT COMMENT 'seq',\n"
+ + " `v1` STRING COMMENT 'v1',\n"
+ + " `v2` STRING COMMENT 'v2'\n"
+ + " ) PARTITIONED BY (`dt` STRING COMMENT 'dt') ROW
FORMAT SERDE 'org.apache.paimon.hive.PaimonSerDe'\n"
+ + "WITH\n"
+ + " SERDEPROPERTIES ('serialization.format' = '1')
STORED AS INPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonInputFormat'
OUTPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonOutputFormat' TBLPROPERTIES
(\n"
+ + " 'bucket-key' = 'key',\n"
+ + " 'primary-key' = 'dt,key',\n"
+ + " 'partition.timestamp-pattern' = '$dt',\n"
+ + " 'partition.timestamp-formatter' = 'yyyyMMdd',\n"
+ + " 'chain-table.enabled' = 'true',\n"
+ + " 'bucket' = '2',\n"
+ + " 'merge-engine' = 'partial-update', \n"
+ + " 'sequence.field' = 'seq'\n"
+ + " )");
+
+ setupChainTableBranches(spark, "chain_test_partial");
+ spark.close();
+ spark = builder.getOrCreate();
+
+ // Write main branch
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test_partial`
partition (dt = '20250810') "
+ + "values (1, 1, 'a', 'A'), (2, 1, 'b', 'B');");
+
+ // Write delta branch
+ spark.sql("set spark.paimon.branch=delta;");
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test_partial`
partition (dt = '20250809') "
+ + "values (3, 1, 'c', 'C'), (4, 1, 'd', 'D');");
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test_partial`
partition (dt = '20250810') "
+ + "values (1, 2, CAST(NULL AS STRING), 'A1'), "
+ + "(2, 2, 'b1', CAST(NULL AS STRING)), "
+ + "(3, 1, 'c', 'C');");
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test_partial`
partition (dt = '20250811') "
+ + "values (1, 3, 'a1', CAST(NULL AS STRING)), "
+ + "(2, 3, CAST(NULL AS STRING), 'B1'), "
+ + "(5, 1, 'e', 'E');");
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test_partial`
partition (dt = '20250812') "
+ + "values (1, 4, CAST(NULL AS STRING), 'A2'), "
+ + "(5, 2, 'e1', CAST(NULL AS STRING));");
+
+ // Write snapshot branch
+ spark.sql("set spark.paimon.branch=snapshot;");
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test_partial`
partition (dt = '20250810') "
+ + "values (1, 2, 'a', 'A1'), (2, 2, 'b1', 'B'), (3, 1,
'c', 'C');");
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test_partial`
partition (dt = '20250812') "
+ + "values (1, 4, 'a1', 'A2'), (2, 3, 'b1', 'B1'), (3,
1, 'c', 'C'), (5, 2, 'e1', 'E');");
+
+ spark.close();
+ spark = builder.getOrCreate();
+
+ // Main read - should return original main branch data
+ assertThat(
+ spark
+ .sql(
+ "SELECT * FROM
`my_db1`.`chain_test_partial` where dt = '20250810'")
+ .collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder("[1,1,a,A,20250810]",
"[2,1,b,B,20250810]");
+
+ // Snapshot read - should return snapshot branch data directly
+ assertThat(
+ spark
+ .sql(
+ "SELECT * FROM
`my_db1`.`chain_test_partial` where dt = '20250812'")
+ .collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder(
+ "[1,4,a1,A2,20250812]",
+ "[2,3,b1,B1,20250812]",
+ "[3,1,c,C,20250812]",
+ "[5,2,e1,E,20250812]");
+
+ // Chain read
+ // 1. non pre snapshot - read delta directly (20250809)
+ assertThat(
+ spark
+ .sql(
+ "SELECT * FROM
`my_db1`.`chain_test_partial` where dt = '20250809'")
+ .collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder("[3,1,c,C,20250809]",
"[4,1,d,D,20250809]");
+
+ // 2. has pre snapshot (20250811) - should merge snapshot(20250810) +
delta(20250811)
+ assertThat(
+ spark
+ .sql(
+ "SELECT * FROM
`my_db1`.`chain_test_partial` where dt = '20250811'")
+ .collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder(
+ "[1,3,a1,A1,20250811]",
+ "[2,3,b1,B1,20250811]",
+ "[3,1,c,C,20250811]",
+ "[5,1,e,E,20250811]");
+
+ // Multi partition Read
+ assertThat(
+ spark
+ .sql(
+ "SELECT * FROM
`my_db1`.`chain_test_partial` where dt in ('20250810', '20250811', '20250812')")
+ .collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder(
+ "[1,1,a,A,20250810]",
+ "[2,1,b,B,20250810]",
+ "[1,3,a1,A1,20250811]",
+ "[2,3,b1,B1,20250811]",
+ "[3,1,c,C,20250811]",
+ "[5,1,e,E,20250811]",
+ "[1,4,a1,A2,20250812]",
+ "[2,3,b1,B1,20250812]",
+ "[3,1,c,C,20250812]",
+ "[5,2,e1,E,20250812]");
+
+ // Incremental read - read delta branch only with partial update data
+ assertThat(
+ spark
+ .sql(
+ "SELECT * FROM
`my_db1`.`chain_test_partial$branch_delta` where dt = '20250811'")
+ .collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder(
+ "[1,3,a1,null,20250811]", "[2,3,null,B1,20250811]",
"[5,1,e,E,20250811]");
+
+ spark.close();
+ spark = builder.getOrCreate();
+ // Drop table
+ spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test_partial`;");
+
+ spark.close();
+ }
}