This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a07ebc315 [spark] Fix the build of read type in binlog table (#4689)
0a07ebc315 is described below

commit 0a07ebc3157da23c91428f80667b2a722c5f6da5
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Dec 12 09:09:34 2024 +0800

    [spark] Fix the build of read type in binlog table (#4689)
---
 .../apache/paimon/table/system/BinlogTable.java    | 22 +++++++++++++++-------
 .../paimon/spark/sql/PaimonSystemTableTest.scala   | 16 ++++++++++++++++
 2 files changed, 31 insertions(+), 7 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
index b17d61d44e..08eea468ea 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
@@ -72,13 +72,8 @@ public class BinlogTable extends AuditLogTable {
         List<DataField> fields = new ArrayList<>();
         fields.add(SpecialFields.ROW_KIND);
         for (DataField field : wrapped.rowType().getFields()) {
-            DataField newField =
-                    new DataField(
-                            field.id(),
-                            field.name(),
-                            new ArrayType(field.type().nullable()), // convert 
to nullable
-                            field.description());
-            fields.add(newField);
+            // convert to nullable
+            fields.add(field.newType(new ArrayType(field.type().nullable())));
         }
         return new RowType(fields);
     }
@@ -99,6 +94,19 @@ public class BinlogTable extends AuditLogTable {
             super(dataRead);
         }
 
+        @Override
+        public InnerTableRead withReadType(RowType readType) {
+            List<DataField> fields = new ArrayList<>();
+            for (DataField field : readType.getFields()) {
+                if (field.name().equals(SpecialFields.ROW_KIND.name())) {
+                    fields.add(field);
+                } else {
+                    fields.add(field.newType(((ArrayType) 
field.type()).getElementType()));
+                }
+            }
+            return super.withReadType(readType.copy(fields));
+        }
+
         @Override
         public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
             DataSplit dataSplit = (DataSplit) split;
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala
index 64baf6232f..7baa57a54d 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala
@@ -81,4 +81,20 @@ class PaimonSystemTableTest extends PaimonSparkTestBase {
       spark.sql("select partition,bucket from `T$buckets`"),
       Row("[2024-10-10, 01]", 0) :: Row("[2024-10-10, 01]", 1) :: 
Row("[2024-10-10, 01]", 2) :: Nil)
   }
+
+  test("system table: binlog table") {
+    sql("""
+          |CREATE TABLE T (a INT, b INT)
+          |TBLPROPERTIES ('primary-key'='a', 'changelog-producer' = 'lookup', 
'bucket' = '2')
+          |""".stripMargin)
+
+    sql("INSERT INTO T VALUES (1, 2)")
+    sql("INSERT INTO T VALUES (1, 3)")
+    sql("INSERT INTO T VALUES (2, 2)")
+
+    checkAnswer(
+      sql("SELECT * FROM `T$binlog`"),
+      Seq(Row("+I", Array(1), Array(3)), Row("+I", Array(2), Array(2)))
+    )
+  }
 }

Reply via email to