This is an automated email from the ASF dual-hosted git repository.

renqs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new e0d6d1d1a [FLINK-35442][cdc-connect][kafka] add key.format and 
partition.strategy option to make sure the same record sending to the same 
partition. (#3522)
e0d6d1d1a is described below

commit e0d6d1d1a8ebf5878479c8453ba070875268611b
Author: Kunni <lvyanquan....@alibaba-inc.com>
AuthorDate: Fri Aug 9 17:29:14 2024 +0800

    [FLINK-35442][cdc-connect][kafka] add key.format and partition.strategy 
option to make sure the same record sending to the same partition. (#3522)
---
 .../docs/connectors/pipeline-connectors/kafka.md   |  14 ++
 .../docs/connectors/pipeline-connectors/kafka.md   |  14 ++
 .../flink-cdc-pipeline-connector-kafka/pom.xml     |   8 ++
 .../cdc/connectors/kafka/json/TableSchemaInfo.java |  46 ++++++-
 .../json/canal/CanalJsonSerializationSchema.java   |  13 +-
 .../debezium/DebeziumJsonSerializationSchema.java  |  11 +-
 .../serialization/CsvSerializationSchema.java      | 117 +++++++++++++++++
 .../serialization/JsonSerializationSchema.java     | 141 +++++++++++++++++++++
 .../cdc/connectors/kafka/sink/KafkaDataSink.java   |  14 +-
 .../kafka/sink/KafkaDataSinkFactory.java           |  13 +-
 .../kafka/sink/KafkaDataSinkOptions.java           |  16 +++
 .../flink/cdc/connectors/kafka/sink/KeyFormat.java |  40 ++++++
 .../kafka/sink/KeySerializationFactory.java        |  74 +++++++++++
 .../connectors/kafka/sink/PartitionStrategy.java   |  41 ++++++
 .../PipelineKafkaRecordSerializationSchema.java    |  42 ++----
 .../connectors/kafka/json/TableSchemaInfoTest.java |   7 +-
 .../serialization/CsvSerializationSchemaTest.java  | 126 ++++++++++++++++++
 .../serialization/JsonSerializationSchemaTest.java | 135 ++++++++++++++++++++
 .../connectors/kafka/sink/KafkaDataSinkITCase.java | 127 ++++++++++++++++++-
 .../flink/cdc/connectors/kafka/sink/KafkaUtil.java |  11 +-
 20 files changed, 946 insertions(+), 64 deletions(-)

diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md 
b/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md
index aa9dc8879..66467b4d5 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md
@@ -87,6 +87,20 @@ Pipeline 连接器配置项
       <td>String</td>
       <td>Sink 的名称。 </td>
     </tr>
+    <tr>
+      <td>partition.strategy</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>定义发送数据到 Kafka 分区的策略, 可以设置的选项有 `all-to-zero`(将所有数据发送到 0 号分区) 以及 
`hash-by-key`(所有数据根据主键的哈希值分发),默认值为 `all-to-zero`。 </td>
+    </tr>
+    <tr>
+      <td>key.format</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>用于序列化 Kafka 消息的键部分数据的格式。可以设置的选项有 `csv` 以及 `json`, 默认值为 `json`。 </td>
+    </tr>
     <tr>
       <td>value.format</td>
       <td>optional</td>
diff --git a/docs/content/docs/connectors/pipeline-connectors/kafka.md 
b/docs/content/docs/connectors/pipeline-connectors/kafka.md
index 57f690666..6bb94bc37 100644
--- a/docs/content/docs/connectors/pipeline-connectors/kafka.md
+++ b/docs/content/docs/connectors/pipeline-connectors/kafka.md
@@ -85,6 +85,20 @@ Pipeline Connector Options
       <td>String</td>
       <td>The name of the sink.</td>
     </tr>
+    <tr>
+      <td>partition.strategy</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Defines the strategy for sending record to kafka topic, available 
options are `all-to-zero`(sending all records to 0 partition) and 
`hash-by-key`(distributing all records by hash of primary keys), default option 
is `all-to-zero`. </td>
+    </tr>
+    <tr>
+      <td>key.format</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Defines the format identifier for encoding key data, available 
options are `csv` and `json`, default option is `json`. </td>
+    </tr>
     <tr>
       <td>value.format</td>
       <td>optional</td>
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml
index 5be032a76..2614f594f 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml
@@ -45,6 +45,14 @@ limitations under the License.
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-csv</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+
         <!-- Test dependencies -->
         <dependency>
             <groupId>org.apache.flink</groupId>
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfo.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfo.java
index a8480d77d..fe764ea85 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfo.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfo.java
@@ -25,6 +25,7 @@ import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.data.binary.BinaryStringData;
 
@@ -39,26 +40,59 @@ import static 
org.apache.flink.cdc.common.types.DataTypeChecks.getScale;
 /** maintain the {@link SerializationSchema} of a specific {@link TableId}. */
 public class TableSchemaInfo {
 
+    private final TableId tableId;
+
     private final Schema schema;
 
+    private final List<Integer> primaryKeyColumnIndexes;
+
     private final List<RecordData.FieldGetter> fieldGetters;
 
     private final SerializationSchema<RowData> serializationSchema;
 
     public TableSchemaInfo(
-            Schema schema, SerializationSchema<RowData> serializationSchema, 
ZoneId zoneId) {
+            TableId tableId,
+            Schema schema,
+            SerializationSchema<RowData> serializationSchema,
+            ZoneId zoneId) {
+        this.tableId = tableId;
         this.schema = schema;
         this.serializationSchema = serializationSchema;
         this.fieldGetters = createFieldGetters(schema, zoneId);
+        primaryKeyColumnIndexes = new ArrayList<>();
+        for (int keyIndex = 0; keyIndex < schema.primaryKeys().size(); 
keyIndex++) {
+            for (int columnIndex = 0; columnIndex < schema.getColumnCount(); 
columnIndex++) {
+                if (schema.getColumns()
+                        .get(columnIndex)
+                        .getName()
+                        .equals(schema.primaryKeys().get(keyIndex))) {
+                    primaryKeyColumnIndexes.add(columnIndex);
+                    break;
+                }
+            }
+        }
     }
 
     /** convert to {@link RowData}, which will be pass to serializationSchema. 
*/
-    public RowData getRowDataFromRecordData(RecordData recordData) {
-        GenericRowData genericRowData = new 
GenericRowData(recordData.getArity());
-        for (int i = 0; i < recordData.getArity(); i++) {
-            genericRowData.setField(i, 
fieldGetters.get(i).getFieldOrNull(recordData));
+    public RowData getRowDataFromRecordData(RecordData recordData, boolean 
primaryKeyOnly) {
+        if (primaryKeyOnly) {
+            GenericRowData genericRowData = new 
GenericRowData(primaryKeyColumnIndexes.size() + 1);
+            genericRowData.setField(0, 
StringData.fromString(tableId.toString()));
+            for (int i = 0; i < primaryKeyColumnIndexes.size(); i++) {
+                genericRowData.setField(
+                        i + 1,
+                        fieldGetters
+                                .get(primaryKeyColumnIndexes.get(i))
+                                .getFieldOrNull(recordData));
+            }
+            return genericRowData;
+        } else {
+            GenericRowData genericRowData = new 
GenericRowData(recordData.getArity());
+            for (int i = 0; i < recordData.getArity(); i++) {
+                genericRowData.setField(i, 
fieldGetters.get(i).getFieldOrNull(recordData));
+            }
+            return genericRowData;
         }
-        return genericRowData;
     }
 
     private static List<RecordData.FieldGetter> createFieldGetters(Schema 
schema, ZoneId zoneId) {
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java
index 0a145cab7..d0c617975 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java
@@ -127,7 +127,8 @@ public class CanalJsonSerializationSchema implements 
SerializationSchema<Event>
             }
             jsonSerializers.put(
                     schemaChangeEvent.tableId(),
-                    new TableSchemaInfo(schema, jsonSerializer, zoneId));
+                    new TableSchemaInfo(
+                            schemaChangeEvent.tableId(), schema, 
jsonSerializer, zoneId));
             return null;
         }
 
@@ -153,7 +154,8 @@ public class CanalJsonSerializationSchema implements 
SerializationSchema<Event>
                                     new RowData[] {
                                         jsonSerializers
                                                 .get(dataChangeEvent.tableId())
-                                                
.getRowDataFromRecordData((dataChangeEvent.after()))
+                                                .getRowDataFromRecordData(
+                                                        
dataChangeEvent.after(), false)
                                     }));
                     reuseGenericRowData.setField(2, OP_INSERT);
                     return jsonSerializers
@@ -168,7 +170,7 @@ public class CanalJsonSerializationSchema implements 
SerializationSchema<Event>
                                         jsonSerializers
                                                 .get(dataChangeEvent.tableId())
                                                 .getRowDataFromRecordData(
-                                                        
(dataChangeEvent.before()))
+                                                        
dataChangeEvent.before(), false)
                                     }));
                     reuseGenericRowData.setField(1, null);
                     reuseGenericRowData.setField(2, OP_DELETE);
@@ -185,7 +187,7 @@ public class CanalJsonSerializationSchema implements 
SerializationSchema<Event>
                                         jsonSerializers
                                                 .get(dataChangeEvent.tableId())
                                                 .getRowDataFromRecordData(
-                                                        
(dataChangeEvent.before()))
+                                                        
dataChangeEvent.before(), false)
                                     }));
                     reuseGenericRowData.setField(
                             1,
@@ -193,7 +195,8 @@ public class CanalJsonSerializationSchema implements 
SerializationSchema<Event>
                                     new RowData[] {
                                         jsonSerializers
                                                 .get(dataChangeEvent.tableId())
-                                                
.getRowDataFromRecordData((dataChangeEvent.after()))
+                                                .getRowDataFromRecordData(
+                                                        
dataChangeEvent.after(), false)
                                     }));
                     reuseGenericRowData.setField(2, OP_UPDATE);
                     return jsonSerializers
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
index ce8afc0db..15cecbc4f 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
@@ -126,7 +126,8 @@ public class DebeziumJsonSerializationSchema implements 
SerializationSchema<Even
             }
             jsonSerializers.put(
                     schemaChangeEvent.tableId(),
-                    new TableSchemaInfo(schema, jsonSerializer, zoneId));
+                    new TableSchemaInfo(
+                            schemaChangeEvent.tableId(), schema, 
jsonSerializer, zoneId));
             return null;
         }
 
@@ -144,7 +145,7 @@ public class DebeziumJsonSerializationSchema implements 
SerializationSchema<Even
                             1,
                             jsonSerializers
                                     .get(dataChangeEvent.tableId())
-                                    
.getRowDataFromRecordData(dataChangeEvent.after()));
+                                    
.getRowDataFromRecordData(dataChangeEvent.after(), false));
                     reuseGenericRowData.setField(2, OP_INSERT);
                     return jsonSerializers
                             .get(dataChangeEvent.tableId())
@@ -155,7 +156,7 @@ public class DebeziumJsonSerializationSchema implements 
SerializationSchema<Even
                             0,
                             jsonSerializers
                                     .get(dataChangeEvent.tableId())
-                                    
.getRowDataFromRecordData(dataChangeEvent.before()));
+                                    
.getRowDataFromRecordData(dataChangeEvent.before(), false));
                     reuseGenericRowData.setField(1, null);
                     reuseGenericRowData.setField(2, OP_DELETE);
                     return jsonSerializers
@@ -168,12 +169,12 @@ public class DebeziumJsonSerializationSchema implements 
SerializationSchema<Even
                             0,
                             jsonSerializers
                                     .get(dataChangeEvent.tableId())
-                                    
.getRowDataFromRecordData(dataChangeEvent.before()));
+                                    
.getRowDataFromRecordData(dataChangeEvent.before(), false));
                     reuseGenericRowData.setField(
                             1,
                             jsonSerializers
                                     .get(dataChangeEvent.tableId())
-                                    
.getRowDataFromRecordData(dataChangeEvent.after()));
+                                    
.getRowDataFromRecordData(dataChangeEvent.after(), false));
                     reuseGenericRowData.setField(2, OP_UPDATE);
                     return jsonSerializers
                             .get(dataChangeEvent.tableId())
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/serialization/CsvSerializationSchema.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/serialization/CsvSerializationSchema.java
new file mode 100644
index 000000000..5e08d92ee
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/serialization/CsvSerializationSchema.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.kafka.serialization;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.OperationType;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Column;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataField;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
+import org.apache.flink.cdc.connectors.kafka.json.TableSchemaInfo;
+import org.apache.flink.formats.csv.CsvRowDataSerializationSchema;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+
+/** A {@link SerializationSchema} to convert {@link Event} into byte of csv 
format. */
+public class CsvSerializationSchema implements SerializationSchema<Event> {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * A map of {@link TableId} and its {@link SerializationSchema} to 
serialize Debezium JSON data.
+     */
+    private final Map<TableId, TableSchemaInfo> csvSerializers;
+
+    private final ZoneId zoneId;
+
+    private InitializationContext context;
+
+    public CsvSerializationSchema(ZoneId zoneId) {
+        this.zoneId = zoneId;
+        csvSerializers = new HashMap<>();
+    }
+
+    @Override
+    public void open(InitializationContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public byte[] serialize(Event event) {
+        if (event instanceof SchemaChangeEvent) {
+            Schema schema;
+            SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
+            if (event instanceof CreateTableEvent) {
+                CreateTableEvent createTableEvent = (CreateTableEvent) event;
+                schema = createTableEvent.getSchema();
+            } else {
+                schema =
+                        SchemaUtils.applySchemaChangeEvent(
+                                
csvSerializers.get(schemaChangeEvent.tableId()).getSchema(),
+                                schemaChangeEvent);
+            }
+            CsvRowDataSerializationSchema csvSerializer = 
buildSerializationForPrimaryKey(schema);
+            try {
+                csvSerializer.open(context);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+            csvSerializers.put(
+                    schemaChangeEvent.tableId(),
+                    new TableSchemaInfo(
+                            schemaChangeEvent.tableId(), schema, 
csvSerializer, zoneId));
+            return null;
+        }
+        DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
+        RecordData recordData =
+                dataChangeEvent.op().equals(OperationType.DELETE)
+                        ? dataChangeEvent.before()
+                        : dataChangeEvent.after();
+        TableSchemaInfo tableSchemaInfo = 
csvSerializers.get(dataChangeEvent.tableId());
+        return tableSchemaInfo
+                .getSerializationSchema()
+                
.serialize(tableSchemaInfo.getRowDataFromRecordData(recordData, true));
+    }
+
+    private CsvRowDataSerializationSchema 
buildSerializationForPrimaryKey(Schema schema) {
+        DataField[] fields = new DataField[schema.primaryKeys().size() + 1];
+        fields[0] = DataTypes.FIELD("TableId", DataTypes.STRING());
+        for (int i = 0; i < schema.primaryKeys().size(); i++) {
+            Column column = 
schema.getColumn(schema.primaryKeys().get(i)).get();
+            fields[i + 1] = DataTypes.FIELD(column.getName(), 
column.getType());
+        }
+        // the row should never be null
+        DataType dataType = DataTypes.ROW(fields).notNull();
+        LogicalType rowType = 
DataTypeUtils.toFlinkDataType(dataType).getLogicalType();
+        return new CsvRowDataSerializationSchema.Builder((RowType) 
rowType).build();
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/serialization/JsonSerializationSchema.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/serialization/JsonSerializationSchema.java
new file mode 100644
index 000000000..5425d444e
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/serialization/JsonSerializationSchema.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.kafka.serialization;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.OperationType;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Column;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataField;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
+import org.apache.flink.cdc.connectors.kafka.json.TableSchemaInfo;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonFormatOptions;
+import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+
+/** A {@link SerializationSchema} to convert {@link Event} into byte of json 
format. */
+public class JsonSerializationSchema implements SerializationSchema<Event> {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * A map of {@link TableId} and its {@link SerializationSchema} to 
serialize Debezium JSON data.
+     */
+    private final Map<TableId, TableSchemaInfo> jsonSerializers;
+
+    private final TimestampFormat timestampFormat;
+
+    private final JsonFormatOptions.MapNullKeyMode mapNullKeyMode;
+
+    private final String mapNullKeyLiteral;
+
+    private final boolean encodeDecimalAsPlainNumber;
+
+    private final ZoneId zoneId;
+
+    private InitializationContext context;
+
+    public JsonSerializationSchema(
+            TimestampFormat timestampFormat,
+            JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
+            String mapNullKeyLiteral,
+            ZoneId zoneId,
+            boolean encodeDecimalAsPlainNumber) {
+        this.timestampFormat = timestampFormat;
+        this.mapNullKeyMode = mapNullKeyMode;
+        this.mapNullKeyLiteral = mapNullKeyLiteral;
+        this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber;
+        this.zoneId = zoneId;
+        jsonSerializers = new HashMap<>();
+    }
+
+    @Override
+    public void open(InitializationContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public byte[] serialize(Event event) {
+        if (event instanceof SchemaChangeEvent) {
+            Schema schema;
+            SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
+            if (event instanceof CreateTableEvent) {
+                CreateTableEvent createTableEvent = (CreateTableEvent) event;
+                schema = createTableEvent.getSchema();
+            } else {
+                schema =
+                        SchemaUtils.applySchemaChangeEvent(
+                                
jsonSerializers.get(schemaChangeEvent.tableId()).getSchema(),
+                                schemaChangeEvent);
+            }
+            JsonRowDataSerializationSchema jsonSerializer = 
buildSerializationForPrimaryKey(schema);
+            try {
+                jsonSerializer.open(context);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+            jsonSerializers.put(
+                    schemaChangeEvent.tableId(),
+                    new TableSchemaInfo(
+                            schemaChangeEvent.tableId(), schema, 
jsonSerializer, zoneId));
+            return null;
+        }
+        DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
+        RecordData recordData =
+                dataChangeEvent.op().equals(OperationType.DELETE)
+                        ? dataChangeEvent.before()
+                        : dataChangeEvent.after();
+        TableSchemaInfo tableSchemaInfo = 
jsonSerializers.get(dataChangeEvent.tableId());
+        return tableSchemaInfo
+                .getSerializationSchema()
+                
.serialize(tableSchemaInfo.getRowDataFromRecordData(recordData, true));
+    }
+
+    private JsonRowDataSerializationSchema 
buildSerializationForPrimaryKey(Schema schema) {
+        DataField[] fields = new DataField[schema.primaryKeys().size() + 1];
+        fields[0] = DataTypes.FIELD("TableId", DataTypes.STRING());
+        for (int i = 0; i < schema.primaryKeys().size(); i++) {
+            Column column = 
schema.getColumn(schema.primaryKeys().get(i)).get();
+            fields[i + 1] = DataTypes.FIELD(column.getName(), 
column.getType());
+        }
+        // the row should never be null
+        DataType dataType = DataTypes.ROW(fields).notNull();
+        LogicalType rowType = 
DataTypeUtils.toFlinkDataType(dataType).getLogicalType();
+        return new JsonRowDataSerializationSchema(
+                (RowType) rowType,
+                timestampFormat,
+                mapNullKeyMode,
+                mapNullKeyLiteral,
+                encodeDecimalAsPlainNumber);
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSink.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSink.java
index 2ab264c5a..2dfc021b3 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSink.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSink.java
@@ -29,7 +29,6 @@ import org.apache.flink.cdc.common.sink.MetadataApplier;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.kafka.sink.KafkaSink;
 import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 
 import org.apache.kafka.clients.producer.ProducerConfig;
 
@@ -46,10 +45,12 @@ public class KafkaDataSink implements DataSink {
 
     final DeliveryGuarantee deliveryGuarantee;
 
-    final FlinkKafkaPartitioner<Event> partitioner;
+    private final PartitionStrategy partitionStrategy;
 
     final ZoneId zoneId;
 
+    final SerializationSchema<Event> keySerialization;
+
     final SerializationSchema<Event> valueSerialization;
 
     final String topic;
@@ -61,16 +62,18 @@ public class KafkaDataSink implements DataSink {
     public KafkaDataSink(
             DeliveryGuarantee deliveryGuarantee,
             Properties kafkaProperties,
-            FlinkKafkaPartitioner<Event> partitioner,
+            PartitionStrategy partitionStrategy,
             ZoneId zoneId,
+            SerializationSchema<Event> keySerialization,
             SerializationSchema<Event> valueSerialization,
             String topic,
             boolean addTableToHeaderEnabled,
             String customHeaders) {
         this.deliveryGuarantee = deliveryGuarantee;
         this.kafkaProperties = kafkaProperties;
-        this.partitioner = partitioner;
+        this.partitionStrategy = partitionStrategy;
         this.zoneId = zoneId;
+        this.keySerialization = keySerialization;
         this.valueSerialization = valueSerialization;
         this.topic = topic;
         this.addTableToHeaderEnabled = addTableToHeaderEnabled;
@@ -90,7 +93,8 @@ public class KafkaDataSink implements DataSink {
                         .setKafkaProducerConfig(kafkaProperties)
                         .setRecordSerializer(
                                 new PipelineKafkaRecordSerializationSchema(
-                                        partitioner,
+                                        partitionStrategy,
+                                        keySerialization,
                                         valueSerialization,
                                         topic,
                                         addTableToHeaderEnabled,
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java
index 243991d1a..d89812913 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java
@@ -28,7 +28,6 @@ import 
org.apache.flink.cdc.connectors.kafka.json.ChangeLogJsonFormatFactory;
 import org.apache.flink.cdc.connectors.kafka.json.JsonSerializationType;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.DeliveryGuarantee;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 
 import java.time.ZoneId;
 import java.util.HashSet;
@@ -37,6 +36,8 @@ import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
 
+import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.KEY_FORMAT;
+import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.PARTITION_STRATEGY;
 import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.PROPERTIES_PREFIX;
 import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_ADD_TABLEID_TO_HEADER_ENABLED;
 import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_CUSTOM_HEADER;
@@ -65,6 +66,9 @@ public class KafkaDataSinkFactory implements DataSinkFactory {
                             context.getPipelineConfiguration()
                                     
.get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE));
         }
+        KeyFormat keyFormat = 
context.getFactoryConfiguration().get(KEY_FORMAT);
+        SerializationSchema<Event> keySerialization =
+                
KeySerializationFactory.createSerializationSchema(configuration, keyFormat, 
zoneId);
         JsonSerializationType jsonSerializationType =
                 
context.getFactoryConfiguration().get(KafkaDataSinkOptions.VALUE_FORMAT);
         SerializationSchema<Event> valueSerialization =
@@ -86,11 +90,14 @@ public class KafkaDataSinkFactory implements 
DataSinkFactory {
                         
.get(KafkaDataSinkOptions.SINK_ADD_TABLEID_TO_HEADER_ENABLED);
         String customHeaders =
                 
context.getFactoryConfiguration().get(KafkaDataSinkOptions.SINK_CUSTOM_HEADER);
+        PartitionStrategy partitionStrategy =
+                
context.getFactoryConfiguration().get(KafkaDataSinkOptions.PARTITION_STRATEGY);
         return new KafkaDataSink(
                 deliveryGuarantee,
                 kafkaProperties,
-                new FlinkFixedPartitioner<>(),
+                partitionStrategy,
                 zoneId,
+                keySerialization,
                 valueSerialization,
                 topic,
                 addTableToHeaderEnabled,
@@ -110,7 +117,9 @@ public class KafkaDataSinkFactory implements 
DataSinkFactory {
     @Override
     public Set<ConfigOption<?>> optionalOptions() {
         Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(KEY_FORMAT);
         options.add(VALUE_FORMAT);
+        options.add(PARTITION_STRATEGY);
         options.add(TOPIC);
         options.add(SINK_ADD_TABLEID_TO_HEADER_ENABLED);
         options.add(SINK_CUSTOM_HEADER);
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java
index e55e149a9..ca82f5c80 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java
@@ -35,6 +35,22 @@ public class KafkaDataSinkOptions {
                     .defaultValue(DeliveryGuarantee.AT_LEAST_ONCE)
                     .withDescription("Optional delivery guarantee when 
committing.");
 
+    public static final ConfigOption<PartitionStrategy> PARTITION_STRATEGY =
+            key("partition.strategy")
+                    .enumType(PartitionStrategy.class)
+                    .defaultValue(PartitionStrategy.ALL_TO_ZERO)
+                    .withDescription(
+                            "Defines the strategy for sending record to kafka 
topic, "
+                                    + "available options are `all-to-zero` and 
`hash-by-key`, default option is `all-to-zero`.");
+
+    public static final ConfigOption<KeyFormat> KEY_FORMAT =
+            key("key.format")
+                    .enumType(KeyFormat.class)
+                    .defaultValue(KeyFormat.JSON)
+                    .withDescription(
+                            "Defines the format identifier for encoding key 
data, "
+                                    + "available options are `csv` and `json`, 
default option is `json`.");
+
     public static final ConfigOption<JsonSerializationType> VALUE_FORMAT =
             key("value.format")
                     .enumType(JsonSerializationType.class)
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KeyFormat.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KeyFormat.java
new file mode 100644
index 000000000..7c065d47f
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KeyFormat.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.kafka.sink;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+/** Enum class for building {@link SerializationSchema} for {@link 
ProducerRecord}. */
+public enum KeyFormat {
+    JSON("json"),
+
+    CSV("csv");
+
+    private final String value;
+
+    KeyFormat(String value) {
+        this.value = value;
+    }
+
+    @Override
+    public String toString() {
+        return value;
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KeySerializationFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KeySerializationFactory.java
new file mode 100644
index 000000000..76132d8e5
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KeySerializationFactory.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.kafka.sink;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.cdc.common.event.Event;
+import 
org.apache.flink.cdc.connectors.kafka.serialization.CsvSerializationSchema;
+import 
org.apache.flink.cdc.connectors.kafka.serialization.JsonSerializationSchema;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonFormatOptions;
+import org.apache.flink.formats.json.JsonFormatOptionsUtil;
+
+import java.time.ZoneId;
+
+import static 
org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
+import static 
org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL;
+
+/**
+ * Format factory for providing configured instances of {@link 
SerializationSchema} to convert
+ * {@link Event} to byte.
+ */
+public class KeySerializationFactory {
+
+    /**
+     * Creates a configured instance of {@link SerializationSchema} to convert 
{@link Event} to
+     * byte.
+     */
+    public static SerializationSchema<Event> createSerializationSchema(
+            ReadableConfig formatOptions, KeyFormat keyFormat, ZoneId zoneId) {
+        switch (keyFormat) {
+            case JSON:
+                {
+                    TimestampFormat timestampFormat =
+                            
JsonFormatOptionsUtil.getTimestampFormat(formatOptions);
+                    JsonFormatOptions.MapNullKeyMode mapNullKeyMode =
+                            
JsonFormatOptionsUtil.getMapNullKeyMode(formatOptions);
+                    String mapNullKeyLiteral = 
formatOptions.get(JSON_MAP_NULL_KEY_LITERAL);
+
+                    final boolean encodeDecimalAsPlainNumber =
+                            formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+                    return new JsonSerializationSchema(
+                            timestampFormat,
+                            mapNullKeyMode,
+                            mapNullKeyLiteral,
+                            zoneId,
+                            encodeDecimalAsPlainNumber);
+                }
+            case CSV:
+                {
+                    return new CsvSerializationSchema(zoneId);
+                }
+            default:
+                {
+                    throw new IllegalArgumentException("UnSupport key format 
of " + keyFormat);
+                }
+        }
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/PartitionStrategy.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/PartitionStrategy.java
new file mode 100644
index 000000000..7638c6635
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/PartitionStrategy.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.kafka.sink;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+/** Partition Strategy for sending {@link ProducerRecord} to kafka partition. 
*/
+public enum PartitionStrategy {
+
+    /** All {@link ProducerRecord} will be sent to partition 0. */
+    ALL_TO_ZERO("all-to-zero"),
+
+    /** {@link ProducerRecord} will be sent to specific partition by primary 
key. */
+    HASH_BY_KEY("hash-by-key");
+
+    private final String value;
+
+    PartitionStrategy(String value) {
+        this.value = value;
+    }
+
+    @Override
+    public String toString() {
+        return value;
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/PipelineKafkaRecordSerializationSchema.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/PipelineKafkaRecordSerializationSchema.java
index bb27753d9..85e5e3f19 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/PipelineKafkaRecordSerializationSchema.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/PipelineKafkaRecordSerializationSchema.java
@@ -23,14 +23,11 @@ import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 
-import javax.annotation.Nullable;
-
 import java.util.HashMap;
 import java.util.Map;
 
@@ -46,7 +43,11 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class PipelineKafkaRecordSerializationSchema
         implements KafkaRecordSerializationSchema<Event> {
-    private final FlinkKafkaPartitioner<Event> partitioner;
+
+    private final Integer partition;
+
+    private final SerializationSchema<Event> keySerialization;
+
     private final SerializationSchema<Event> valueSerialization;
 
     private final String unifiedTopic;
@@ -63,12 +64,13 @@ public class PipelineKafkaRecordSerializationSchema
     public static final String TABLE_NAME_HEADER_KEY = "tableName";
 
     PipelineKafkaRecordSerializationSchema(
-            @Nullable FlinkKafkaPartitioner<Event> partitioner,
+            PartitionStrategy partitionStrategy,
+            SerializationSchema<Event> keySerialization,
             SerializationSchema<Event> valueSerialization,
             String unifiedTopic,
             boolean addTableToHeaderEnabled,
             String customHeaderString) {
-        this.partitioner = partitioner;
+        this.keySerialization = keySerialization;
         this.valueSerialization = checkNotNull(valueSerialization);
         this.unifiedTopic = unifiedTopic;
         this.addTableToHeaderEnabled = addTableToHeaderEnabled;
@@ -87,12 +89,14 @@ public class PipelineKafkaRecordSerializationSchema
                 }
             }
         }
+        partition = partitionStrategy.equals(PartitionStrategy.ALL_TO_ZERO) ? 
0 : null;
     }
 
     @Override
     public ProducerRecord<byte[], byte[]> serialize(
             Event event, KafkaSinkContext context, Long timestamp) {
         ChangeEvent changeEvent = (ChangeEvent) event;
+        final byte[] keySerialized = keySerialization.serialize(event);
         final byte[] valueSerialized = valueSerialization.serialize(event);
         if (event instanceof SchemaChangeEvent) {
             // skip sending SchemaChangeEvent.
@@ -121,37 +125,13 @@ public class PipelineKafkaRecordSerializationSchema
             }
         }
         return new ProducerRecord<>(
-                topic,
-                extractPartition(
-                        changeEvent, valueSerialized, 
context.getPartitionsForTopic(topic)),
-                null,
-                null,
-                valueSerialized,
-                recordHeaders);
+                topic, partition, null, keySerialized, valueSerialized, 
recordHeaders);
     }
 
     @Override
     public void open(
             SerializationSchema.InitializationContext context, 
KafkaSinkContext sinkContext)
             throws Exception {
-        if (partitioner != null) {
-            partitioner.open(
-                    sinkContext.getParallelInstanceId(),
-                    sinkContext.getNumberOfParallelInstances());
-        }
         valueSerialization.open(context);
     }
-
-    private Integer extractPartition(
-            ChangeEvent changeEvent, byte[] valueSerialized, int[] partitions) 
{
-        if (partitioner != null) {
-            return partitioner.partition(
-                    changeEvent,
-                    null,
-                    valueSerialized,
-                    changeEvent.tableId().toString(),
-                    partitions);
-        }
-        return null;
-    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfoTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfoTest.java
index 73f8ad64d..2c2feaae7 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfoTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfoTest.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.cdc.common.data.LocalZonedTimestampData;
 import org.apache.flink.cdc.common.data.TimestampData;
 import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
 import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
@@ -95,7 +96,9 @@ public class TableSchemaInfoTest {
                                 "null_string", 
org.apache.flink.cdc.common.types.DataTypes.STRING())
                         .primaryKey("col1")
                         .build();
-        TableSchemaInfo tableSchemaInfo = new TableSchemaInfo(schema, null, 
ZoneId.of("UTC+8"));
+        TableSchemaInfo tableSchemaInfo =
+                new TableSchemaInfo(
+                        TableId.parse("testDatabase.testTable"), schema, null, 
ZoneId.of("UTC+8"));
         Object[] testData =
                 new Object[] {
                     BinaryStringData.fromString("pk"),
@@ -159,6 +162,6 @@ public class TableSchemaInfoTest {
                         org.apache.flink.table.data.TimestampData.fromInstant(
                                 Instant.parse("2023-01-01T08:00:00.000Z")),
                         null),
-                tableSchemaInfo.getRowDataFromRecordData(recordData));
+                tableSchemaInfo.getRowDataFromRecordData(recordData, false));
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/serialization/CsvSerializationSchemaTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/serialization/CsvSerializationSchemaTest.java
new file mode 100644
index 000000000..92b302f7e
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/serialization/CsvSerializationSchemaTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.kafka.serialization;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.connectors.kafka.json.MockInitializationContext;
+import org.apache.flink.cdc.connectors.kafka.sink.KeyFormat;
+import org.apache.flink.cdc.connectors.kafka.sink.KeySerializationFactory;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.time.ZoneId;
+
+/** Tests for {@link CsvSerializationSchema}. */
+public class CsvSerializationSchemaTest {
+
+    public static final TableId TABLE_1 =
+            TableId.tableId("default_namespace", "default_schema", "table1");
+
+    @Test
+    public void testSerialize() throws Exception {
+        ObjectMapper mapper =
+                JacksonMapperFactory.createObjectMapper()
+                        
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
+        SerializationSchema<Event> serializationSchema =
+                KeySerializationFactory.createSerializationSchema(
+                        new Configuration(), KeyFormat.CSV, 
ZoneId.systemDefault());
+        serializationSchema.open(new MockInitializationContext());
+
+        // create table
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("col1", DataTypes.STRING())
+                        .physicalColumn("col2", DataTypes.STRING())
+                        .primaryKey("col1")
+                        .build();
+        CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1, 
schema);
+        Assertions.assertNull(serializationSchema.serialize(createTableEvent));
+
+        // insert
+        BinaryRecordDataGenerator generator =
+                new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), 
DataTypes.STRING()));
+        DataChangeEvent insertEvent1 =
+                DataChangeEvent.insertEvent(
+                        TABLE_1,
+                        generator.generate(
+                                new Object[] {
+                                    BinaryStringData.fromString("1"),
+                                    BinaryStringData.fromString("1")
+                                }));
+        String expected = "\"default_namespace.default_schema.table1\",1";
+        String actual = new 
String(serializationSchema.serialize(insertEvent1));
+        Assertions.assertEquals(expected, actual);
+
+        DataChangeEvent insertEvent2 =
+                DataChangeEvent.insertEvent(
+                        TABLE_1,
+                        generator.generate(
+                                new Object[] {
+                                    BinaryStringData.fromString("2"),
+                                    BinaryStringData.fromString("2")
+                                }));
+        expected = "\"default_namespace.default_schema.table1\",2";
+        actual = new String(serializationSchema.serialize(insertEvent2));
+        Assertions.assertEquals(expected, actual);
+
+        DataChangeEvent deleteEvent =
+                DataChangeEvent.deleteEvent(
+                        TABLE_1,
+                        generator.generate(
+                                new Object[] {
+                                    BinaryStringData.fromString("2"),
+                                    BinaryStringData.fromString("2")
+                                }));
+        expected = "\"default_namespace.default_schema.table1\",2";
+        actual = new String(serializationSchema.serialize(deleteEvent));
+        Assertions.assertEquals(expected, actual);
+
+        DataChangeEvent updateEvent =
+                DataChangeEvent.updateEvent(
+                        TABLE_1,
+                        generator.generate(
+                                new Object[] {
+                                    BinaryStringData.fromString("1"),
+                                    BinaryStringData.fromString("1")
+                                }),
+                        generator.generate(
+                                new Object[] {
+                                    BinaryStringData.fromString("1"),
+                                    BinaryStringData.fromString("x")
+                                }));
+        expected = "\"default_namespace.default_schema.table1\",1";
+        actual = new String(serializationSchema.serialize(updateEvent));
+        Assertions.assertEquals(expected, actual);
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/serialization/JsonSerializationSchemaTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/serialization/JsonSerializationSchemaTest.java
new file mode 100644
index 000000000..d21f1fb5f
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/serialization/JsonSerializationSchemaTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.kafka.serialization;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.connectors.kafka.json.MockInitializationContext;
+import org.apache.flink.cdc.connectors.kafka.sink.KeyFormat;
+import org.apache.flink.cdc.connectors.kafka.sink.KeySerializationFactory;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.time.ZoneId;
+
+/** Tests for {@link JsonSerializationSchema}. */
+public class JsonSerializationSchemaTest {
+
+    public static final TableId TABLE_1 =
+            TableId.tableId("default_namespace", "default_schema", "table1");
+
+    @Test
+    public void testSerialize() throws Exception {
+        ObjectMapper mapper =
+                JacksonMapperFactory.createObjectMapper()
+                        
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
+        SerializationSchema<Event> serializationSchema =
+                KeySerializationFactory.createSerializationSchema(
+                        new Configuration(), KeyFormat.JSON, 
ZoneId.systemDefault());
+        serializationSchema.open(new MockInitializationContext());
+
+        // create table
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("col1", DataTypes.STRING())
+                        .physicalColumn("col2", DataTypes.STRING())
+                        .primaryKey("col1")
+                        .build();
+        CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1, 
schema);
+        Assertions.assertNull(serializationSchema.serialize(createTableEvent));
+
+        // insert
+        BinaryRecordDataGenerator generator =
+                new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), 
DataTypes.STRING()));
+        DataChangeEvent insertEvent1 =
+                DataChangeEvent.insertEvent(
+                        TABLE_1,
+                        generator.generate(
+                                new Object[] {
+                                    BinaryStringData.fromString("1"),
+                                    BinaryStringData.fromString("1")
+                                }));
+        JsonNode expected =
+                mapper.readTree(
+                        
"{\"TableId\":\"default_namespace.default_schema.table1\",\"col1\":\"1\"}");
+        JsonNode actual = 
mapper.readTree(serializationSchema.serialize(insertEvent1));
+        Assertions.assertEquals(expected, actual);
+
+        DataChangeEvent insertEvent2 =
+                DataChangeEvent.insertEvent(
+                        TABLE_1,
+                        generator.generate(
+                                new Object[] {
+                                    BinaryStringData.fromString("2"),
+                                    BinaryStringData.fromString("2")
+                                }));
+        expected =
+                mapper.readTree(
+                        
"{\"TableId\":\"default_namespace.default_schema.table1\",\"col1\":\"2\"}");
+        actual = mapper.readTree(serializationSchema.serialize(insertEvent2));
+        Assertions.assertEquals(expected, actual);
+
+        DataChangeEvent deleteEvent =
+                DataChangeEvent.deleteEvent(
+                        TABLE_1,
+                        generator.generate(
+                                new Object[] {
+                                    BinaryStringData.fromString("2"),
+                                    BinaryStringData.fromString("2")
+                                }));
+        expected =
+                mapper.readTree(
+                        
"{\"TableId\":\"default_namespace.default_schema.table1\",\"col1\":\"2\"}");
+        actual = mapper.readTree(serializationSchema.serialize(deleteEvent));
+        Assertions.assertEquals(expected, actual);
+
+        DataChangeEvent updateEvent =
+                DataChangeEvent.updateEvent(
+                        TABLE_1,
+                        generator.generate(
+                                new Object[] {
+                                    BinaryStringData.fromString("1"),
+                                    BinaryStringData.fromString("1")
+                                }),
+                        generator.generate(
+                                new Object[] {
+                                    BinaryStringData.fromString("1"),
+                                    BinaryStringData.fromString("x")
+                                }));
+        expected =
+                mapper.readTree(
+                        
"{\"TableId\":\"default_namespace.default_schema.table1\",\"col1\":\"1\"}");
+        actual = mapper.readTree(serializationSchema.serialize(updateEvent));
+        Assertions.assertEquals(expected, actual);
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
index 1ef10603b..7936c0ef3 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
@@ -18,6 +18,7 @@
 package org.apache.flink.cdc.connectors.kafka.sink;
 
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cdc.common.configuration.Configuration;
 import org.apache.flink.cdc.common.data.binary.BinaryStringData;
 import org.apache.flink.cdc.common.event.AddColumnEvent;
@@ -67,9 +68,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
@@ -259,7 +262,7 @@ class KafkaDataSinkITCase extends TestLogger {
         env.execute();
 
         final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
-                drainAllRecordsFromTopic(topic, false);
+                drainAllRecordsFromTopic(topic, false, 0);
         final long recordsCount = 5;
         assertThat(recordsCount).isEqualTo(collectedRecords.size());
         ObjectMapper mapper =
@@ -321,7 +324,7 @@ class KafkaDataSinkITCase extends TestLogger {
         env.execute();
 
         final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
-                drainAllRecordsFromTopic(topic, false);
+                drainAllRecordsFromTopic(topic, false, 0);
         final long recordsCount = 5;
         assertThat(recordsCount).isEqualTo(collectedRecords.size());
         for (ConsumerRecord<byte[], byte[]> consumerRecord : collectedRecords) 
{
@@ -363,6 +366,104 @@ class KafkaDataSinkITCase extends TestLogger {
         checkProducerLeak();
     }
 
+    @Test
+    void testHashByKeyPartitionStrategyUsingJson() throws Exception {
+        final StreamExecutionEnvironment env = new LocalStreamEnvironment();
+        env.enableCheckpointing(1000L);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        final DataStream<Event> source =
+                env.fromCollection(createSourceEvents(), new EventTypeInfo());
+        Map<String, String> config = new HashMap<>();
+        Properties properties = getKafkaClientConfiguration();
+        properties.forEach(
+                (key, value) ->
+                        config.put(
+                                KafkaDataSinkOptions.PROPERTIES_PREFIX + 
key.toString(),
+                                value.toString()));
+        config.put(KafkaDataSinkOptions.KEY_FORMAT.key(), 
KeyFormat.JSON.toString());
+        config.put(
+                KafkaDataSinkOptions.VALUE_FORMAT.key(),
+                JsonSerializationType.CANAL_JSON.toString());
+        source.sinkTo(
+                ((FlinkSinkProvider)
+                                (new KafkaDataSinkFactory()
+                                        .createDataSink(
+                                                new 
FactoryHelper.DefaultContext(
+                                                        
Configuration.fromMap(config),
+                                                        
Configuration.fromMap(new HashMap<>()),
+                                                        
this.getClass().getClassLoader()))
+                                        .getEventSinkProvider()))
+                        .getSink());
+        env.execute();
+
+        final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
+                drainAllRecordsFromTopic(topic, false);
+        final long recordsCount = 5;
+        assertThat(recordsCount).isEqualTo(collectedRecords.size());
+        for (ConsumerRecord<byte[], byte[]> consumerRecord : collectedRecords) 
{
+            assertThat(
+                            consumerRecord
+                                    .headers()
+                                    .headers(
+                                            
PipelineKafkaRecordSerializationSchema
+                                                    .TABLE_NAME_HEADER_KEY)
+                                    .iterator())
+                    .isExhausted();
+        }
+        ObjectMapper mapper =
+                JacksonMapperFactory.createObjectMapper()
+                        
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
+        List<Tuple2<JsonNode, JsonNode>> expected =
+                Arrays.asList(
+                        Tuple2.of(
+                                mapper.readTree(
+                                        String.format(
+                                                
"{\"TableId\":\"%s\",\"col1\":\"1\"}",
+                                                table1.toString())),
+                                mapper.readTree(
+                                        String.format(
+                                                
"{\"old\":null,\"data\":[{\"col1\":\"1\",\"col2\":\"1\"}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]})",
+                                                table1.getTableName()))),
+                        Tuple2.of(
+                                mapper.readTree(
+                                        String.format(
+                                                
"{\"TableId\":\"%s\",\"col1\":\"2\"}",
+                                                table1.toString())),
+                                mapper.readTree(
+                                        String.format(
+                                                
"{\"old\":null,\"data\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]})",
+                                                table1.getTableName()))),
+                        Tuple2.of(
+                                mapper.readTree(
+                                        String.format(
+                                                
"{\"TableId\":\"%s\",\"col1\":\"3\"}",
+                                                table1.toString())),
+                                mapper.readTree(
+                                        String.format(
+                                                
"{\"old\":null,\"data\":[{\"col1\":\"3\",\"col2\":\"3\"}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]})",
+                                                table1.getTableName()))),
+                        Tuple2.of(
+                                mapper.readTree(
+                                        String.format(
+                                                
"{\"TableId\":\"%s\",\"col1\":\"1\"}",
+                                                table1.toString())),
+                                mapper.readTree(
+                                        String.format(
+                                                
"{\"old\":[{\"col1\":\"1\",\"newCol3\":\"1\"}],\"data\":null,\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]})",
+                                                table1.getTableName()))),
+                        Tuple2.of(
+                                mapper.readTree(
+                                        String.format(
+                                                
"{\"TableId\":\"%s\",\"col1\":\"2\"}",
+                                                table1.toString())),
+                                mapper.readTree(
+                                        String.format(
+                                                
"{\"old\":[{\"col1\":\"2\",\"newCol3\":\"\"}],\"data\":[{\"col1\":\"2\",\"newCol3\":\"x\"}],\"type\":\"UPDATE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}",
+                                                table1.getTableName()))));
+        
assertThat(deserializeKeyValues(collectedRecords)).containsAll(expected);
+        checkProducerLeak();
+    }
+
     @Test
     void testTopicAndHeaderOption() throws Exception {
         final StreamExecutionEnvironment env = new LocalStreamEnvironment();
@@ -392,7 +493,7 @@ class KafkaDataSinkITCase extends TestLogger {
         env.execute();
 
         final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
-                drainAllRecordsFromTopic("test_topic", false);
+                drainAllRecordsFromTopic("test_topic", false, 0);
         final long recordsCount = 5;
         assertThat(recordsCount).isEqualTo(collectedRecords.size());
         for (ConsumerRecord<byte[], byte[]> consumerRecord : collectedRecords) 
{
@@ -460,9 +561,13 @@ class KafkaDataSinkITCase extends TestLogger {
     }
 
     private List<ConsumerRecord<byte[], byte[]>> drainAllRecordsFromTopic(
-            String topic, boolean committed) {
+            String topic, boolean committed, int... partitionArr) {
         Properties properties = getKafkaClientConfiguration();
-        return KafkaUtil.drainAllRecordsFromTopic(topic, properties, 
committed);
+        Set<Integer> partitions = new HashSet<>();
+        for (int partition : partitionArr) {
+            partitions.add(partition);
+        }
+        return KafkaUtil.drainAllRecordsFromTopic(topic, properties, 
committed, partitions);
     }
 
     private void checkProducerLeak() throws InterruptedException {
@@ -486,6 +591,18 @@ class KafkaDataSinkITCase extends TestLogger {
                         + 
leaks.stream().map(this::format).collect(Collectors.joining("\n\n")));
     }
 
+    private static List<Tuple2<JsonNode, JsonNode>> deserializeKeyValues(
+            List<ConsumerRecord<byte[], byte[]>> records) throws IOException {
+        ObjectMapper mapper =
+                JacksonMapperFactory.createObjectMapper()
+                        
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
+        List<Tuple2<JsonNode, JsonNode>> result = new ArrayList<>();
+        for (ConsumerRecord<byte[], byte[]> record : records) {
+            result.add(Tuple2.of(mapper.readTree(record.key()), 
mapper.readTree(record.value())));
+        }
+        return result;
+    }
+
     private static List<JsonNode> 
deserializeValues(List<ConsumerRecord<byte[], byte[]>> records)
             throws IOException {
         ObjectMapper mapper =
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaUtil.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaUtil.java
index d1f36f657..775b5b8a0 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaUtil.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaUtil.java
@@ -115,13 +115,14 @@ public class KafkaUtil {
      * @throws KafkaException
      */
     public static List<ConsumerRecord<byte[], byte[]>> 
drainAllRecordsFromTopic(
-            String topic, Properties properties, boolean committed) throws 
KafkaException {
+            String topic, Properties properties, boolean committed, 
Set<Integer> partitions)
+            throws KafkaException {
         final Properties consumerConfig = new Properties();
         consumerConfig.putAll(properties);
         consumerConfig.put(
                 ConsumerConfig.ISOLATION_LEVEL_CONFIG,
                 committed ? "read_committed" : "read_uncommitted");
-        return drainAllRecordsFromTopic(topic, consumerConfig);
+        return drainAllRecordsFromTopic(topic, consumerConfig, partitions);
     }
 
     /**
@@ -137,13 +138,17 @@ public class KafkaUtil {
      * @throws KafkaException
      */
     public static List<ConsumerRecord<byte[], byte[]>> 
drainAllRecordsFromTopic(
-            String topic, Properties properties) throws KafkaException {
+            String topic, Properties properties, Set<Integer> partitions) 
throws KafkaException {
         final Properties consumerConfig = new Properties();
         consumerConfig.putAll(properties);
         consumerConfig.put("key.deserializer", 
ByteArrayDeserializer.class.getName());
         consumerConfig.put("value.deserializer", 
ByteArrayDeserializer.class.getName());
         try (KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerConfig)) {
             Set<TopicPartition> topicPartitions = getAllPartitions(consumer, 
topic);
+            if (!partitions.isEmpty()) {
+                topicPartitions.removeIf(
+                        topicPartition -> 
!partitions.contains(topicPartition.partition()));
+            }
             Map<TopicPartition, Long> endOffsets = 
consumer.endOffsets(topicPartitions);
             consumer.assign(topicPartitions);
             consumer.seekToBeginning(topicPartitions);

Reply via email to