This is an automated email from the ASF dual-hosted git repository. kunni pushed a commit to branch FLINK-38730-copy in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
commit dd836f7e8837206d606be0af096a29e7b4788e70 Author: lvyanquan <[email protected]> AuthorDate: Mon Dec 8 10:35:32 2025 +0800 Address comment. --- .../kafka/json/canal/CanalJsonSerializationSchema.java | 5 ++++- .../apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java | 9 +++++---- .../src/main/java/io/debezium/connector/db2/Db2Connection.java | 3 ++- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java index 78eefc4ea..c95b310e9 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java @@ -146,7 +146,10 @@ public class CanalJsonSerializationSchema implements SerializationSchema<Event> reuseGenericRowData.setField( 5, new GenericArrayData( - jsonSerializers.get(dataChangeEvent.tableId()).getSchema().primaryKeys() + jsonSerializers + .get(dataChangeEvent.tableId()) + .getSchema() + .primaryKeys() .stream() .map(StringData::fromString) .toArray())); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java index e5821135d..0b09a560a 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java @@ -109,16 +109,17 @@ public class PaimonWriter<InputT> public Collection<MultiTableCommittable> prepareCommit() { long startTime = System.currentTimeMillis(); List<MultiTableCommittable> committables = - writes.entrySet() - .parallelStream() + writes.entrySet().parallelStream() .flatMap( entry -> { try { // here we set it to lastCheckpointId+1 to // avoid prepareCommit the same checkpointId with the first // round. - return entry.getValue() - .prepareCommit(false, lastCheckpointId + 1).stream() + return entry + .getValue() + .prepareCommit(false, lastCheckpointId + 1) + .stream() .map( committable -> MultiTableCommittable diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/io/debezium/connector/db2/Db2Connection.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/io/debezium/connector/db2/Db2Connection.java index 616fd7c95..8fe77ede9 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/io/debezium/connector/db2/Db2Connection.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/io/debezium/connector/db2/Db2Connection.java @@ -399,7 +399,8 @@ public class Db2Connection extends JdbcConnection { // final List<Column> columns = columnEditors.subList(CHANGE_TABLE_DATA_COLUMN_OFFSET, // columnEditors.size() - 1).stream() final List<Column> columns = - columnEditors.subList(CHANGE_TABLE_DATA_COLUMN_OFFSET, columnEditors.size()) + columnEditors + .subList(CHANGE_TABLE_DATA_COLUMN_OFFSET, columnEditors.size()) .stream() .map( c ->
