This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e4b28ae2 [core] Introduce binlog system table to pack the UB and UA 
(#4520)
9e4b28ae2 is described below

commit 9e4b28ae27a786fa9833571556560ce0b9c07f01
Author: WenjunMin <[email protected]>
AuthorDate: Thu Nov 14 18:39:54 2024 +0800

    [core] Introduce binlog system table to pack the UB and UA (#4520)
---
 docs/content/maintenance/system-tables.md          |  19 +++
 .../apache/paimon/reader/PackChangelogReader.java  | 131 ++++++++++++++++++
 .../apache/paimon/table/system/AuditLogTable.java  |  12 +-
 .../apache/paimon/table/system/BinlogTable.java    | 151 +++++++++++++++++++++
 .../paimon/table/system/SystemTableLoader.java     |   2 +
 paimon-flink/paimon-flink-common/pom.xml           |   6 +
 .../org/apache/paimon/flink/SystemTableITCase.java |  66 +++++++++
 7 files changed, 381 insertions(+), 6 deletions(-)

diff --git a/docs/content/maintenance/system-tables.md 
b/docs/content/maintenance/system-tables.md
index 462f8c27f..0246d6faf 100644
--- a/docs/content/maintenance/system-tables.md
+++ b/docs/content/maintenance/system-tables.md
@@ -406,4 +406,23 @@ SELECT * FROM T$statistics;
 1 rows in set
 */
 ```
+### Binlog Table
+
+You can streaming or batch query the binlog through binlog table. In this 
system table,
+the update before and update after will be packed in one row. 
+
+```
+/*
++------------------+----------------------+-----------------------+
+|     rowkind      |       column_0       |       column_1        |
++------------------+----------------------+-----------------------+
+|        +I        |       [col_0]        |       [col_1]         |
++------------------+----------------------+-----------------------+
+|        +U        | [col_0_ub, col_0_ua] | [col_1_ub, col_1_ua]  |
++------------------+----------------------+-----------------------+
+|        -D        |       [col_0]        |       [col_1]         |
++------------------+----------------------+-----------------------+
+*/
+```
+
 
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/reader/PackChangelogReader.java 
b/paimon-common/src/main/java/org/apache/paimon/reader/PackChangelogReader.java
new file mode 100644
index 000000000..a60780ff5
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/reader/PackChangelogReader.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.reader;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.function.BiFunction;
+
+/** The reader which will pack the update before and update after message 
together. */
+public class PackChangelogReader implements RecordReader<InternalRow> {
+
+    private final RecordReader<InternalRow> reader;
+    private final BiFunction<InternalRow, InternalRow, InternalRow> function;
+    private final InternalRowSerializer serializer;
+    private boolean initialized = false;
+
+    public PackChangelogReader(
+            RecordReader<InternalRow> reader,
+            BiFunction<InternalRow, InternalRow, InternalRow> function,
+            RowType rowType) {
+        this.reader = reader;
+        this.function = function;
+        this.serializer = new InternalRowSerializer(rowType);
+    }
+
+    @Nullable
+    @Override
+    public RecordIterator<InternalRow> readBatch() throws IOException {
+        if (!initialized) {
+            initialized = true;
+            return new InternRecordIterator(reader, function, serializer);
+        }
+        return null;
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+    }
+
+    private static class InternRecordIterator implements 
RecordIterator<InternalRow> {
+
+        private RecordIterator<InternalRow> currentBatch;
+
+        private final BiFunction<InternalRow, InternalRow, InternalRow> 
function;
+        private final RecordReader<InternalRow> reader;
+        private final InternalRowSerializer serializer;
+        private boolean endOfData;
+
+        public InternRecordIterator(
+                RecordReader<InternalRow> reader,
+                BiFunction<InternalRow, InternalRow, InternalRow> function,
+                InternalRowSerializer serializer) {
+            this.reader = reader;
+            this.function = function;
+            this.serializer = serializer;
+            this.endOfData = false;
+        }
+
+        @Nullable
+        @Override
+        public InternalRow next() throws IOException {
+            InternalRow row1 = nextRow();
+            if (row1 == null) {
+                return null;
+            }
+            InternalRow row2 = null;
+            if (row1.getRowKind() == RowKind.UPDATE_BEFORE) {
+                row1 = serializer.copy(row1);
+                row2 = nextRow();
+            }
+            return function.apply(row1, row2);
+        }
+
+        @Nullable
+        private InternalRow nextRow() throws IOException {
+            InternalRow row = null;
+            while (!endOfData && row == null) {
+                RecordIterator<InternalRow> batch = nextBatch();
+                if (batch == null) {
+                    endOfData = true;
+                    return null;
+                }
+
+                row = batch.next();
+                if (row == null) {
+                    releaseBatch();
+                }
+            }
+            return row;
+        }
+
+        @Nullable
+        private RecordIterator<InternalRow> nextBatch() throws IOException {
+            if (currentBatch == null) {
+                currentBatch = reader.readBatch();
+            }
+            return currentBatch;
+        }
+
+        @Override
+        public void releaseBatch() {
+            if (currentBatch != null) {
+                currentBatch.releaseBatch();
+                currentBatch = null;
+            }
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index e0acd9fb3..7438f9393 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -526,13 +526,13 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
         }
     }
 
-    private class AuditLogRead implements InnerTableRead {
+    class AuditLogRead implements InnerTableRead {
 
-        private final InnerTableRead dataRead;
+        protected final InnerTableRead dataRead;
 
-        private int[] readProjection;
+        protected int[] readProjection;
 
-        private AuditLogRead(InnerTableRead dataRead) {
+        protected AuditLogRead(InnerTableRead dataRead) {
             this.dataRead = dataRead.forceKeepDelete();
             this.readProjection = defaultProjection();
         }
@@ -600,9 +600,9 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
     }
 
     /** A {@link ProjectedRow} which returns row kind when mapping index is 
negative. */
-    private static class AuditLogRow extends ProjectedRow {
+    static class AuditLogRow extends ProjectedRow {
 
-        private AuditLogRow(int[] indexMapping, InternalRow row) {
+        AuditLogRow(int[] indexMapping, InternalRow row) {
             super(indexMapping);
             replaceRow(row);
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
new file mode 100644
index 000000000..96f9f6ed6
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.reader.PackChangelogReader;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
+
+/**
+ * A {@link Table} for reading binlog of table. The binlog format is as below.
+ *
+ * <p>INSERT: [+I, [co1, null], [col2, null]]
+ *
+ * <p>UPDATE: [+U, [co1_ub, col1_ua], [col2_ub, col2_ua]]
+ *
+ * <p>DELETE: [-D, [co1, null], [col2, null]]
+ */
+public class BinlogTable extends AuditLogTable {
+
+    public static final String BINLOG = "binlog";
+
+    private final FileStoreTable wrapped;
+
+    public BinlogTable(FileStoreTable wrapped) {
+        super(wrapped);
+        this.wrapped = wrapped;
+    }
+
+    @Override
+    public String name() {
+        return wrapped.name() + SYSTEM_TABLE_SPLITTER + BINLOG;
+    }
+
+    @Override
+    public RowType rowType() {
+        List<DataField> fields = new ArrayList<>();
+        fields.add(SpecialFields.ROW_KIND);
+        for (DataField field : wrapped.rowType().getFields()) {
+            DataField newField =
+                    new DataField(
+                            field.id(),
+                            field.name(),
+                            new ArrayType(field.type().nullable()), // convert 
to nullable
+                            field.description());
+            fields.add(newField);
+        }
+        return new RowType(fields);
+    }
+
+    @Override
+    public InnerTableRead newRead() {
+        return new BinlogRead(wrapped.newRead());
+    }
+
+    @Override
+    public Table copy(Map<String, String> dynamicOptions) {
+        return new BinlogTable(wrapped.copy(dynamicOptions));
+    }
+
+    private class BinlogRead extends AuditLogRead {
+
+        private BinlogRead(InnerTableRead dataRead) {
+            super(dataRead);
+        }
+
+        @Override
+        public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
+            DataSplit dataSplit = (DataSplit) split;
+            if (dataSplit.isStreaming()) {
+                return new PackChangelogReader(
+                        dataRead.createReader(split),
+                        (row1, row2) ->
+                                new AuditLogRow(
+                                        readProjection,
+                                        convertToArray(
+                                                row1, row2, 
wrapped.rowType().fieldGetters())),
+                        wrapped.rowType());
+            } else {
+                return dataRead.createReader(split)
+                        .transform(
+                                (row) ->
+                                        new AuditLogRow(
+                                                readProjection,
+                                                convertToArray(
+                                                        row,
+                                                        null,
+                                                        
wrapped.rowType().fieldGetters())));
+            }
+        }
+
+        private InternalRow convertToArray(
+                InternalRow row1,
+                @Nullable InternalRow row2,
+                InternalRow.FieldGetter[] fieldGetters) {
+            GenericRow row = new GenericRow(row1.getFieldCount());
+            for (int i = 0; i < row1.getFieldCount(); i++) {
+                Object o1 = fieldGetters[i].getFieldOrNull(row1);
+                Object o2;
+                if (row2 != null) {
+                    o2 = fieldGetters[i].getFieldOrNull(row2);
+                    row.setField(i, new GenericArray(new Object[] {o1, o2}));
+                } else {
+                    row.setField(i, new GenericArray(new Object[] {o1}));
+                }
+            }
+            // If no row2 provided, then follow the row1 kind.
+            if (row2 == null) {
+                row.setRowKind(row1.getRowKind());
+            } else {
+                row.setRowKind(row2.getRowKind());
+            }
+            return row;
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
index a84f41ec1..3d5b21131 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
@@ -41,6 +41,7 @@ import static 
org.apache.paimon.options.CatalogOptions.LINEAGE_META;
 import static 
org.apache.paimon.table.system.AggregationFieldsTable.AGGREGATION_FIELDS;
 import static 
org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS;
 import static org.apache.paimon.table.system.AuditLogTable.AUDIT_LOG;
+import static org.apache.paimon.table.system.BinlogTable.BINLOG;
 import static org.apache.paimon.table.system.BranchesTable.BRANCHES;
 import static org.apache.paimon.table.system.BucketsTable.BUCKETS;
 import static 
org.apache.paimon.table.system.CatalogOptionsTable.CATALOG_OPTIONS;
@@ -77,6 +78,7 @@ public class SystemTableLoader {
                     .put(READ_OPTIMIZED, ReadOptimizedTable::new)
                     .put(AGGREGATION_FIELDS, AggregationFieldsTable::new)
                     .put(STATISTICS, StatisticTable::new)
+                    .put(BINLOG, BinlogTable::new)
                     .build();
 
     public static final List<String> SYSTEM_TABLES = new 
ArrayList<>(SYSTEM_TABLE_LOADERS.keySet());
diff --git a/paimon-flink/paimon-flink-common/pom.xml 
b/paimon-flink/paimon-flink-common/pom.xml
index 4452af266..91222983b 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -162,6 +162,12 @@ under the License.
             <artifactId>iceberg-data</artifactId>
             <version>${iceberg.version}</version>
             <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>parquet-avro</artifactId>
+                    <groupId>org.apache.parquet</groupId>
+                </exclusion>
+            </exclusions>
         </dependency>
     </dependencies>
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SystemTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SystemTableITCase.java
new file mode 100644
index 000000000..771f4acc5
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SystemTableITCase.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.utils.BlockingIterator;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for system table. */
+public class SystemTableITCase extends CatalogTableITCase {
+
+    @Test
+    public void testBinlogTableStreamRead() throws Exception {
+        sql(
+                "CREATE TABLE T (a INT, b INT, primary key (a) NOT ENFORCED) 
with ('changelog-producer' = 'lookup', "
+                        + "'bucket' = '2')");
+        BlockingIterator<Row, Row> iterator =
+                streamSqlBlockIter("SELECT * FROM T$binlog /*+ 
OPTIONS('scan.mode' = 'latest') */");
+        sql("INSERT INTO T VALUES (1, 2)");
+        sql("INSERT INTO T VALUES (1, 3)");
+        sql("INSERT INTO T VALUES (2, 2)");
+        List<Row> rows = iterator.collect(3);
+        assertThat(rows)
+                .containsExactly(
+                        Row.of("+I", new Integer[] {1}, new Integer[] {2}),
+                        Row.of("+U", new Integer[] {1, 1}, new Integer[] {2, 
3}),
+                        Row.of("+I", new Integer[] {2}, new Integer[] {2}));
+        iterator.close();
+    }
+
+    @Test
+    public void testBinlogTableBatchRead() throws Exception {
+        sql(
+                "CREATE TABLE T (a INT, b INT, primary key (a) NOT ENFORCED) 
with ('changelog-producer' = 'lookup', "
+                        + "'bucket' = '2')");
+        sql("INSERT INTO T VALUES (1, 2)");
+        sql("INSERT INTO T VALUES (1, 3)");
+        sql("INSERT INTO T VALUES (2, 2)");
+        List<Row> rows = sql("SELECT * FROM T$binlog /*+ OPTIONS('scan.mode' = 
'latest') */");
+        assertThat(rows)
+                .containsExactly(
+                        Row.of("+I", new Integer[] {1}, new Integer[] {3}),
+                        Row.of("+I", new Integer[] {2}, new Integer[] {2}));
+    }
+}

Reply via email to