This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b65dc1d16b [core] Fix default value not work for primary key table
with bucket -1 (#6050)
b65dc1d16b is described below
commit b65dc1d16b9da8401a17b45f53b183ee2678ac7d
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Aug 11 15:45:36 2025 +0800
[core] Fix default value not work for primary key table with bucket -1
(#6050)
---
.../java/org/apache/paimon/table/sink/TableWriteImpl.java | 15 ++++-----------
.../java/org/apache/paimon/flink/BranchSqlITCase.java | 9 +++++++++
2 files changed, 13 insertions(+), 11 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index aba740fe0f..9d3f4e8b51 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -171,26 +171,19 @@ public class TableWriteImpl<T> implements
InnerTableWrite, Restorable<List<State
@Nullable
public SinkRecord writeAndReturn(InternalRow row) throws Exception {
- checkNullability(row);
- row = wrapDefaultValue(row);
- RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row);
- if (ignoreDelete && rowKind.isRetract()) {
- return null;
- }
- SinkRecord record = toSinkRecord(row);
- write.write(record.partition(), record.bucket(),
recordExtractor.extract(record, rowKind));
- return record;
+ return writeAndReturn(row, -1);
}
@Nullable
public SinkRecord writeAndReturn(InternalRow row, int bucket) throws
Exception {
checkNullability(row);
+ row = wrapDefaultValue(row);
RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row);
if (ignoreDelete && rowKind.isRetract()) {
return null;
}
- SinkRecord record = toSinkRecord(row, bucket);
- write.write(record.partition(), bucket,
recordExtractor.extract(record, rowKind));
+ SinkRecord record = bucket == -1 ? toSinkRecord(row) :
toSinkRecord(row, bucket);
+ write.write(record.partition(), record.bucket(),
recordExtractor.extract(record, rowKind));
return record;
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index 2160cb1a2d..dd7750b2f0 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -52,6 +52,15 @@ public class BranchSqlITCase extends CatalogITCaseBase {
.containsExactlyInAnyOrder("+I[1, 5]", "+I[2, 5]");
}
+ @Test
+ public void testDefaultValueForPkTableDynamicBucket() throws Exception {
+ sql("CREATE TABLE T (a INT PRIMARY KEY NOT ENFORCED, b INT)");
+ sql("CALL sys.alter_column_default_value('default.T', 'b', '5')");
+ sql("INSERT INTO T (a) VALUES (1), (2)");
+ assertThat(collectResult("SELECT * FROM T"))
+ .containsExactlyInAnyOrder("+I[1, 5]", "+I[2, 5]");
+ }
+
@Test
public void testAlterBranchTable() throws Exception {
sql(