This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch FLINK-38730-copy
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git

commit dd836f7e8837206d606be0af096a29e7b4788e70
Author: lvyanquan <[email protected]>
AuthorDate: Mon Dec 8 10:35:32 2025 +0800

    Address comment.
---
 .../kafka/json/canal/CanalJsonSerializationSchema.java           | 5 ++++-
 .../apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java | 9 +++++----
 .../src/main/java/io/debezium/connector/db2/Db2Connection.java   | 3 ++-
 3 files changed, 11 insertions(+), 6 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java
index 78eefc4ea..c95b310e9 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java
@@ -146,7 +146,10 @@ public class CanalJsonSerializationSchema implements 
SerializationSchema<Event>
         reuseGenericRowData.setField(
                 5,
                 new GenericArrayData(
-                        
jsonSerializers.get(dataChangeEvent.tableId()).getSchema().primaryKeys()
+                        jsonSerializers
+                                .get(dataChangeEvent.tableId())
+                                .getSchema()
+                                .primaryKeys()
                                 .stream()
                                 .map(StringData::fromString)
                                 .toArray()));
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
index e5821135d..0b09a560a 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
@@ -109,16 +109,17 @@ public class PaimonWriter<InputT>
     public Collection<MultiTableCommittable> prepareCommit() {
         long startTime = System.currentTimeMillis();
         List<MultiTableCommittable> committables =
-                writes.entrySet()
-                        .parallelStream()
+                writes.entrySet().parallelStream()
                         .flatMap(
                                 entry -> {
                                     try {
                                         // here we set it to 
lastCheckpointId+1 to
                                         // avoid prepareCommit the same 
checkpointId with the first
                                         // round.
-                                        return entry.getValue()
-                                                .prepareCommit(false, 
lastCheckpointId + 1).stream()
+                                        return entry
+                                                .getValue()
+                                                .prepareCommit(false, 
lastCheckpointId + 1)
+                                                .stream()
                                                 .map(
                                                         committable ->
                                                                 
MultiTableCommittable
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/io/debezium/connector/db2/Db2Connection.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/io/debezium/connector/db2/Db2Connection.java
index 616fd7c95..8fe77ede9 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/io/debezium/connector/db2/Db2Connection.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/io/debezium/connector/db2/Db2Connection.java
@@ -399,7 +399,8 @@ public class Db2Connection extends JdbcConnection {
         // final List<Column> columns = 
columnEditors.subList(CHANGE_TABLE_DATA_COLUMN_OFFSET,
         // columnEditors.size() - 1).stream()
         final List<Column> columns =
-                columnEditors.subList(CHANGE_TABLE_DATA_COLUMN_OFFSET, 
columnEditors.size())
+                columnEditors
+                        .subList(CHANGE_TABLE_DATA_COLUMN_OFFSET, 
columnEditors.size())
                         .stream()
                         .map(
                                 c ->

Reply via email to