This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.1 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 823120192729f98011c0e37cd8622e13d88af3cc Author: yuzelin <[email protected]> AuthorDate: Wed May 7 19:04:06 2025 +0800 [core] Fix computed column and projection of BinlogTable (#5566) --- .../apache/paimon/table/system/BinlogTable.java | 10 +++++-- .../apache/paimon/flink/SystemCatalogTable.java | 23 ++++++++++++++- .../apache/paimon/flink/BatchFileStoreITCase.java | 34 ++++++++++++++++++++++ .../paimon/spark/sql/PaimonSystemTableTest.scala | 10 +++++++ 4 files changed, 74 insertions(+), 3 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java index eafd37f1d7..a023028bd1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java @@ -90,6 +90,8 @@ public class BinlogTable extends AuditLogTable { private class BinlogRead extends AuditLogRead { + private RowType wrappedReadType = wrapped.rowType(); + private BinlogRead(InnerTableRead dataRead) { super(dataRead); } @@ -97,20 +99,24 @@ public class BinlogTable extends AuditLogTable { @Override public InnerTableRead withReadType(RowType readType) { List<DataField> fields = new ArrayList<>(); + List<DataField> wrappedReadFields = new ArrayList<>(); for (DataField field : readType.getFields()) { if (field.name().equals(SpecialFields.ROW_KIND.name())) { fields.add(field); } else { - fields.add(field.newType(((ArrayType) field.type()).getElementType())); + DataField origin = field.newType(((ArrayType) field.type()).getElementType()); + fields.add(origin); + wrappedReadFields.add(origin); } } + this.wrappedReadType = this.wrappedReadType.copy(wrappedReadFields); return super.withReadType(readType.copy(fields)); } @Override public RecordReader<InternalRow> createReader(Split split) throws IOException { DataSplit dataSplit = (DataSplit) split; - InternalRow.FieldGetter[] fieldGetters = wrapped.rowType().fieldGetters(); + InternalRow.FieldGetter[] fieldGetters = wrappedReadType.fieldGetters(); if (dataSplit.isStreaming()) { return new PackChangelogReader( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java index f88a808713..5878b25ce7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java @@ -20,6 +20,7 @@ package org.apache.paimon.flink; import org.apache.paimon.table.Table; import org.apache.paimon.table.system.AuditLogTable; +import org.apache.paimon.table.system.BinlogTable; import org.apache.flink.table.api.Schema; import org.apache.flink.table.catalog.CatalogTable; @@ -34,7 +35,10 @@ import java.util.Optional; import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType; import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.SCHEMA; import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey; +import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn; import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeWatermarkSpec; +import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.nonPhysicalColumnsCount; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.NAME; import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK; /** A {@link CatalogTable} to represent system table. */ @@ -57,11 +61,28 @@ public class SystemCatalogTable implements CatalogTable { TypeConversions.fromLogicalToDataType(toLogicalType(table.rowType()))); if (table instanceof AuditLogTable) { Map<String, String> newOptions = new HashMap<>(table.options()); + + // add watermark if (newOptions.keySet().stream() .anyMatch(key -> key.startsWith(compoundKey(SCHEMA, WATERMARK)))) { deserializeWatermarkSpec(newOptions, builder); - return builder.build(); } + + if (!(table instanceof BinlogTable)) { + // add non-physical columns + List<String> physicalColumns = table.rowType().getFieldNames(); + int columnCount = + physicalColumns.size() + + nonPhysicalColumnsCount(newOptions, physicalColumns); + for (int i = 0; i < columnCount; i++) { + String optionalName = newOptions.get(compoundKey(SCHEMA, i, NAME)); + if (optionalName != null && !physicalColumns.contains(optionalName)) { + // build non-physical column from options + deserializeNonPhysicalColumn(newOptions, i, builder); + } + } + } + return builder.build(); } return builder.build(); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index f1d8a44ca5..d7e00d9583 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -775,4 +775,38 @@ public class BatchFileStoreITCase extends CatalogITCaseBase { .containsExactlyInAnyOrder( Row.of("+I", 2, "B"), Row.of("-D", 2, "B"), Row.of("+I", 3, "C")); } + + @Test + public void testAuditLogTableWithComputedColumn() throws Exception { + sql("CREATE TABLE test_table (a int, b int, c AS a + b);"); + String ddl = sql("SHOW CREATE TABLE `test_table$audit_log`").get(0).getFieldAs(0); + assertThat(ddl).contains("`c` AS `a` + `b`"); + + sql("INSERT INTO test_table VALUES (1, 1)"); + assertThat(sql("SELECT * FROM `test_table$audit_log`")) + .containsExactly(Row.of("+I", 1, 1, 2)); + } + + @Test + public void testBinlogTableWithComputedColumn() { + sql("CREATE TABLE test_table (a int, b int, c AS a + b);"); + String ddl = sql("SHOW CREATE TABLE `test_table$binlog`").get(0).getFieldAs(0); + assertThat(ddl).doesNotContain("`c` AS `a` + `b`"); + + sql("INSERT INTO test_table VALUES (1, 1)"); + assertThat(sql("SELECT * FROM `test_table$binlog`")) + .containsExactly(Row.of("+I", new Integer[] {1}, new Integer[] {1})); + } + + @Test + public void testBinlogTableWithProjection() { + sql("CREATE TABLE test_table (a int, b string);"); + sql("INSERT INTO test_table VALUES (1, 'A')"); + assertThat(sql("SELECT * FROM `test_table$binlog`")) + .containsExactly(Row.of("+I", new Integer[] {1}, new String[] {"A"})); + assertThat(sql("SELECT b FROM `test_table$binlog`")) + .containsExactly(Row.of((Object) new String[] {"A"})); + assertThat(sql("SELECT rowkind, b FROM `test_table$binlog`")) + .containsExactly(Row.of("+I", new String[] {"A"})); + } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala index 0c645191f6..de9a4b040b 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala @@ -114,5 +114,15 @@ class PaimonSystemTableTest extends PaimonSparkTestBase { sql("SELECT * FROM `T$binlog`"), Seq(Row("+I", Array(1), Array(3)), Row("+I", Array(2), Array(2))) ) + + checkAnswer( + sql("SELECT b FROM `T$binlog`"), + Seq(Row(Array(3)), Row(Array(2))) + ) + + checkAnswer( + sql("SELECT rowkind, b FROM `T$binlog`"), + Seq(Row("+I", Array(3)), Row("+I", Array(2))) + ) } }
