This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0f6384b993 [core] Fix computed column and projection of BinlogTable
(#5566)
0f6384b993 is described below
commit 0f6384b99369685f1c8f17d27b0e9f7d91d578d7
Author: yuzelin <[email protected]>
AuthorDate: Wed May 7 19:04:06 2025 +0800
[core] Fix computed column and projection of BinlogTable (#5566)
---
.../apache/paimon/table/system/BinlogTable.java | 10 ++++++++--
.../apache/paimon/flink/SystemCatalogTable.java | 22 ++++++++++++---------
.../apache/paimon/flink/BatchFileStoreITCase.java | 23 ++++++++++++++++++++++
.../paimon/spark/sql/PaimonSystemTableTest.scala | 10 ++++++++++
4 files changed, 54 insertions(+), 11 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
index eafd37f1d7..a023028bd1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
@@ -90,6 +90,8 @@ public class BinlogTable extends AuditLogTable {
private class BinlogRead extends AuditLogRead {
+ private RowType wrappedReadType = wrapped.rowType();
+
private BinlogRead(InnerTableRead dataRead) {
super(dataRead);
}
@@ -97,20 +99,24 @@ public class BinlogTable extends AuditLogTable {
@Override
public InnerTableRead withReadType(RowType readType) {
List<DataField> fields = new ArrayList<>();
+ List<DataField> wrappedReadFields = new ArrayList<>();
for (DataField field : readType.getFields()) {
if (field.name().equals(SpecialFields.ROW_KIND.name())) {
fields.add(field);
} else {
- fields.add(field.newType(((ArrayType)
field.type()).getElementType()));
+ DataField origin = field.newType(((ArrayType)
field.type()).getElementType());
+ fields.add(origin);
+ wrappedReadFields.add(origin);
}
}
+ this.wrappedReadType =
this.wrappedReadType.copy(wrappedReadFields);
return super.withReadType(readType.copy(fields));
}
@Override
public RecordReader<InternalRow> createReader(Split split) throws
IOException {
DataSplit dataSplit = (DataSplit) split;
- InternalRow.FieldGetter[] fieldGetters =
wrapped.rowType().fieldGetters();
+ InternalRow.FieldGetter[] fieldGetters =
wrappedReadType.fieldGetters();
if (dataSplit.isStreaming()) {
return new PackChangelogReader(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java
index 7fe26e5607..5878b25ce7 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.system.AuditLogTable;
+import org.apache.paimon.table.system.BinlogTable;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogTable;
@@ -67,15 +68,18 @@ public class SystemCatalogTable implements CatalogTable {
deserializeWatermarkSpec(newOptions, builder);
}
- // add non-physical columns
- List<String> physicalColumns = table.rowType().getFieldNames();
- int columnCount =
- physicalColumns.size() +
nonPhysicalColumnsCount(newOptions, physicalColumns);
- for (int i = 0; i < columnCount; i++) {
- String optionalName = newOptions.get(compoundKey(SCHEMA, i,
NAME));
- if (optionalName != null &&
!physicalColumns.contains(optionalName)) {
- // build non-physical column from options
- deserializeNonPhysicalColumn(newOptions, i, builder);
+ if (!(table instanceof BinlogTable)) {
+ // add non-physical columns
+ List<String> physicalColumns = table.rowType().getFieldNames();
+ int columnCount =
+ physicalColumns.size()
+ + nonPhysicalColumnsCount(newOptions,
physicalColumns);
+ for (int i = 0; i < columnCount; i++) {
+ String optionalName = newOptions.get(compoundKey(SCHEMA,
i, NAME));
+ if (optionalName != null &&
!physicalColumns.contains(optionalName)) {
+ // build non-physical column from options
+ deserializeNonPhysicalColumn(newOptions, i, builder);
+ }
}
}
return builder.build();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 144a2d5a8f..2b846b68bc 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -792,4 +792,27 @@ public class BatchFileStoreITCase extends
CatalogITCaseBase {
assertThat(sql("SELECT * FROM `test_table$audit_log`"))
.containsExactly(Row.of("+I", 1, 1, 2));
}
+
+ @Test
+ public void testBinlogTableWithComputedColumn() {
+ sql("CREATE TABLE test_table (a int, b int, c AS a + b);");
+ String ddl = sql("SHOW CREATE TABLE
`test_table$binlog`").get(0).getFieldAs(0);
+ assertThat(ddl).doesNotContain("`c` AS `a` + `b`");
+
+ sql("INSERT INTO test_table VALUES (1, 1)");
+ assertThat(sql("SELECT * FROM `test_table$binlog`"))
+ .containsExactly(Row.of("+I", new Integer[] {1}, new Integer[]
{1}));
+ }
+
+ @Test
+ public void testBinlogTableWithProjection() {
+ sql("CREATE TABLE test_table (a int, b string);");
+ sql("INSERT INTO test_table VALUES (1, 'A')");
+ assertThat(sql("SELECT * FROM `test_table$binlog`"))
+ .containsExactly(Row.of("+I", new Integer[] {1}, new String[]
{"A"}));
+ assertThat(sql("SELECT b FROM `test_table$binlog`"))
+ .containsExactly(Row.of((Object) new String[] {"A"}));
+ assertThat(sql("SELECT rowkind, b FROM `test_table$binlog`"))
+ .containsExactly(Row.of("+I", new String[] {"A"}));
+ }
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala
index 0c645191f6..de9a4b040b 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala
@@ -114,5 +114,15 @@ class PaimonSystemTableTest extends PaimonSparkTestBase {
sql("SELECT * FROM `T$binlog`"),
Seq(Row("+I", Array(1), Array(3)), Row("+I", Array(2), Array(2)))
)
+
+ checkAnswer(
+ sql("SELECT b FROM `T$binlog`"),
+ Seq(Row(Array(3)), Row(Array(2)))
+ )
+
+ checkAnswer(
+ sql("SELECT rowkind, b FROM `T$binlog`"),
+ Seq(Row("+I", Array(3)), Row("+I", Array(2)))
+ )
}
}