This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.1
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 823120192729f98011c0e37cd8622e13d88af3cc
Author: yuzelin <[email protected]>
AuthorDate: Wed May 7 19:04:06 2025 +0800

    [core] Fix computed column and projection of BinlogTable (#5566)
---
 .../apache/paimon/table/system/BinlogTable.java    | 10 +++++--
 .../apache/paimon/flink/SystemCatalogTable.java    | 23 ++++++++++++++-
 .../apache/paimon/flink/BatchFileStoreITCase.java  | 34 ++++++++++++++++++++++
 .../paimon/spark/sql/PaimonSystemTableTest.scala   | 10 +++++++
 4 files changed, 74 insertions(+), 3 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
index eafd37f1d7..a023028bd1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
@@ -90,6 +90,8 @@ public class BinlogTable extends AuditLogTable {
 
     private class BinlogRead extends AuditLogRead {
 
+        private RowType wrappedReadType = wrapped.rowType();
+
         private BinlogRead(InnerTableRead dataRead) {
             super(dataRead);
         }
@@ -97,20 +99,24 @@ public class BinlogTable extends AuditLogTable {
         @Override
         public InnerTableRead withReadType(RowType readType) {
             List<DataField> fields = new ArrayList<>();
+            List<DataField> wrappedReadFields = new ArrayList<>();
             for (DataField field : readType.getFields()) {
                 if (field.name().equals(SpecialFields.ROW_KIND.name())) {
                     fields.add(field);
                 } else {
-                    fields.add(field.newType(((ArrayType) 
field.type()).getElementType()));
+                    DataField origin = field.newType(((ArrayType) 
field.type()).getElementType());
+                    fields.add(origin);
+                    wrappedReadFields.add(origin);
                 }
             }
+            this.wrappedReadType = 
this.wrappedReadType.copy(wrappedReadFields);
             return super.withReadType(readType.copy(fields));
         }
 
         @Override
         public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
             DataSplit dataSplit = (DataSplit) split;
-            InternalRow.FieldGetter[] fieldGetters = 
wrapped.rowType().fieldGetters();
+            InternalRow.FieldGetter[] fieldGetters = 
wrappedReadType.fieldGetters();
 
             if (dataSplit.isStreaming()) {
                 return new PackChangelogReader(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java
index f88a808713..5878b25ce7 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink;
 
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.system.AuditLogTable;
+import org.apache.paimon.table.system.BinlogTable;
 
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.catalog.CatalogTable;
@@ -34,7 +35,10 @@ import java.util.Optional;
 import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
 import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.SCHEMA;
 import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey;
+import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn;
 import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeWatermarkSpec;
+import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.nonPhysicalColumnsCount;
+import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.NAME;
 import static 
org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK;
 
 /** A {@link CatalogTable} to represent system table. */
@@ -57,11 +61,28 @@ public class SystemCatalogTable implements CatalogTable {
                 
TypeConversions.fromLogicalToDataType(toLogicalType(table.rowType())));
         if (table instanceof AuditLogTable) {
             Map<String, String> newOptions = new HashMap<>(table.options());
+
+            // add watermark
             if (newOptions.keySet().stream()
                     .anyMatch(key -> key.startsWith(compoundKey(SCHEMA, 
WATERMARK)))) {
                 deserializeWatermarkSpec(newOptions, builder);
-                return builder.build();
             }
+
+            if (!(table instanceof BinlogTable)) {
+                // add non-physical columns
+                List<String> physicalColumns = table.rowType().getFieldNames();
+                int columnCount =
+                        physicalColumns.size()
+                                + nonPhysicalColumnsCount(newOptions, 
physicalColumns);
+                for (int i = 0; i < columnCount; i++) {
+                    String optionalName = newOptions.get(compoundKey(SCHEMA, 
i, NAME));
+                    if (optionalName != null && 
!physicalColumns.contains(optionalName)) {
+                        // build non-physical column from options
+                        deserializeNonPhysicalColumn(newOptions, i, builder);
+                    }
+                }
+            }
+            return builder.build();
         }
         return builder.build();
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index f1d8a44ca5..d7e00d9583 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -775,4 +775,38 @@ public class BatchFileStoreITCase extends 
CatalogITCaseBase {
                 .containsExactlyInAnyOrder(
                         Row.of("+I", 2, "B"), Row.of("-D", 2, "B"), 
Row.of("+I", 3, "C"));
     }
+
+    @Test
+    public void testAuditLogTableWithComputedColumn() throws Exception {
+        sql("CREATE TABLE test_table (a int, b int, c AS a + b);");
+        String ddl = sql("SHOW CREATE TABLE 
`test_table$audit_log`").get(0).getFieldAs(0);
+        assertThat(ddl).contains("`c` AS `a` + `b`");
+
+        sql("INSERT INTO test_table VALUES (1, 1)");
+        assertThat(sql("SELECT * FROM `test_table$audit_log`"))
+                .containsExactly(Row.of("+I", 1, 1, 2));
+    }
+
+    @Test
+    public void testBinlogTableWithComputedColumn() {
+        sql("CREATE TABLE test_table (a int, b int, c AS a + b);");
+        String ddl = sql("SHOW CREATE TABLE 
`test_table$binlog`").get(0).getFieldAs(0);
+        assertThat(ddl).doesNotContain("`c` AS `a` + `b`");
+
+        sql("INSERT INTO test_table VALUES (1, 1)");
+        assertThat(sql("SELECT * FROM `test_table$binlog`"))
+                .containsExactly(Row.of("+I", new Integer[] {1}, new Integer[] 
{1}));
+    }
+
+    @Test
+    public void testBinlogTableWithProjection() {
+        sql("CREATE TABLE test_table (a int, b string);");
+        sql("INSERT INTO test_table VALUES (1, 'A')");
+        assertThat(sql("SELECT * FROM `test_table$binlog`"))
+                .containsExactly(Row.of("+I", new Integer[] {1}, new String[] 
{"A"}));
+        assertThat(sql("SELECT b FROM `test_table$binlog`"))
+                .containsExactly(Row.of((Object) new String[] {"A"}));
+        assertThat(sql("SELECT rowkind, b FROM `test_table$binlog`"))
+                .containsExactly(Row.of("+I", new String[] {"A"}));
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala
index 0c645191f6..de9a4b040b 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala
@@ -114,5 +114,15 @@ class PaimonSystemTableTest extends PaimonSparkTestBase {
       sql("SELECT * FROM `T$binlog`"),
       Seq(Row("+I", Array(1), Array(3)), Row("+I", Array(2), Array(2)))
     )
+
+    checkAnswer(
+      sql("SELECT b FROM `T$binlog`"),
+      Seq(Row(Array(3)), Row(Array(2)))
+    )
+
+    checkAnswer(
+      sql("SELECT rowkind, b FROM `T$binlog`"),
+      Seq(Row("+I", Array(3)), Row("+I", Array(2)))
+    )
   }
 }

Reply via email to