This is an automated email from the ASF dual-hosted git repository.

renqs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d6154f12 [FLINK-35791][kafka] Add database and table info of Canal / 
Debezium json format for Kafka sink (#3461)
9d6154f12 is described below

commit 9d6154f12307da24551a6f24b5be72313d01837c
Author: Kunni <lvyanquan....@alibaba-inc.com>
AuthorDate: Thu Aug 8 16:06:43 2024 +0800

    [FLINK-35791][kafka] Add database and table info of Canal / Debezium json 
format for Kafka sink (#3461)
---
 .../docs/connectors/pipeline-connectors/kafka.md   | 41 +++++++++++++++
 .../docs/connectors/pipeline-connectors/kafka.md   | 41 +++++++++++++++
 .../json/canal/CanalJsonSerializationSchema.java   | 25 +++++++--
 .../debezium/DebeziumJsonSerializationSchema.java  | 21 ++++++--
 .../canal/CanalJsonSerializationSchemaTest.java    |  8 +--
 .../DebeziumJsonSerializationSchemaTest.java       |  8 +--
 .../connectors/kafka/sink/KafkaDataSinkITCase.java | 60 ++++++++++++++++------
 7 files changed, 173 insertions(+), 31 deletions(-)

diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md 
b/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md
index 94f859106..aa9dc8879 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md
@@ -140,6 +140,47 @@ Pipeline 连接器配置项
 * 如果配置了 `topic` 参数,所有的消息都会发送到这一个主题。
 * 写入 Kafka 的 topic 如果不存在,则会默认创建。
 
+### 输出格式
+对于不同的内置 `value.format` 选项,输出的格式也是不同的:
+#### debezium-json
+参考 [Debezium 
docs](https://debezium.io/documentation/reference/1.9/connectors/mysql.html), 
debezium-json 格式会包含 `before`,`after`,`op`,`source` 几个元素, 但是 `ts_ms` 字段并不会包含在 
`source` 元素中。    
+一个输出的示例是:
+```json
+{
+  "before": null,
+  "after": {
+    "col1": "1",
+    "col2": "1"
+  },
+  "op": "c",
+  "source": {
+    "db": "default_namespace",
+    "table": "table1"
+  }
+}
+```
+
+#### canal-json
+参考 [Canal | Apache 
Flink](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/canal/#available-metadata),
 canal-json 格式会包含 `old`,`data`,`type`,`database`,`table`,`pkNames` 几个元素, 但是 
`ts` 并不会包含在其中。   
+一个输出的示例是:
+```json
+{
+    "old": null,
+    "data": [
+        {
+            "col1": "1",
+            "col2": "1"
+        }
+    ],
+    "type": "INSERT",
+    "database": "default_schema",
+    "table": "table1",
+    "pkNames": [
+        "col1"
+    ]
+}
+```
+
 数据类型映射
 ----------------
 <div class="wy-table-responsive">
diff --git a/docs/content/docs/connectors/pipeline-connectors/kafka.md 
b/docs/content/docs/connectors/pipeline-connectors/kafka.md
index 96f599509..57f690666 100644
--- a/docs/content/docs/connectors/pipeline-connectors/kafka.md
+++ b/docs/content/docs/connectors/pipeline-connectors/kafka.md
@@ -138,6 +138,47 @@ Usage Notes
 
 * If the written topic of Kafka is not existed, we will create one 
automatically.
 
+### Output Format
+For different built-in `value.format` options, the output format is different:
+#### debezium-json
+Refer to [Debezium 
docs](https://debezium.io/documentation/reference/1.9/connectors/mysql.html), 
debezium-json format will contains `before`,`after`,`op`,`source` elements, but 
`ts_ms` is not included in `source`.    
+An output example is:
+```json
+{
+  "before": null,
+  "after": {
+    "col1": "1",
+    "col2": "1"
+  },
+  "op": "c",
+  "source": {
+    "db": "default_namespace",
+    "table": "table1"
+  }
+}
+```
+
+#### canal-json
+Refer to [Canal | Apache 
Flink](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/canal/#available-metadata),
 canal-json format will contains 
`old`,`data`,`type`,`database`,`table`,`pkNames` elements, but `ts` is not 
included.      
+An output example is:
+```json
+{
+    "old": null,
+    "data": [
+        {
+            "col1": "1",
+            "col2": "1"
+        }
+    ],
+    "type": "INSERT",
+    "database": "default_schema",
+    "table": "table1",
+    "pkNames": [
+        "col1"
+    ]
+}
+```
+
 Data Type Mapping
 ----------------
 <div class="wy-table-responsive">
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java
index 78548e31b..0a145cab7 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java
@@ -94,7 +94,7 @@ public class CanalJsonSerializationSchema implements 
SerializationSchema<Event>
     @Override
     public void open(InitializationContext context) {
         this.context = context;
-        reuseGenericRowData = new GenericRowData(3);
+        reuseGenericRowData = new GenericRowData(6);
     }
 
     @Override
@@ -132,6 +132,17 @@ public class CanalJsonSerializationSchema implements 
SerializationSchema<Event>
         }
 
         DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
+        reuseGenericRowData.setField(
+                3, 
StringData.fromString(dataChangeEvent.tableId().getSchemaName()));
+        reuseGenericRowData.setField(
+                4, 
StringData.fromString(dataChangeEvent.tableId().getTableName()));
+        reuseGenericRowData.setField(
+                5,
+                new GenericArrayData(
+                        
jsonSerializers.get(dataChangeEvent.tableId()).getSchema().primaryKeys()
+                                .stream()
+                                .map(StringData::fromString)
+                                .toArray()));
         try {
             switch (dataChangeEvent.op()) {
                 case INSERT:
@@ -200,14 +211,20 @@ public class CanalJsonSerializationSchema implements 
SerializationSchema<Event>
         }
     }
 
+    /**
+     * Refer to <a
+     * 
href="https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/canal/#available-metadata";>Canal
+     * | Apache Flink</a> for more details.
+     */
     private static RowType createJsonRowType(DataType databaseSchema) {
-        // Canal JSON contains other information, e.g. "database", "ts"
-        // but we don't need them
         return (RowType)
                 DataTypes.ROW(
                                 DataTypes.FIELD("old", 
DataTypes.ARRAY(databaseSchema)),
                                 DataTypes.FIELD("data", 
DataTypes.ARRAY(databaseSchema)),
-                                DataTypes.FIELD("type", DataTypes.STRING()))
+                                DataTypes.FIELD("type", DataTypes.STRING()),
+                                DataTypes.FIELD("database", 
DataTypes.STRING()),
+                                DataTypes.FIELD("table", DataTypes.STRING()),
+                                DataTypes.FIELD("pkNames", 
DataTypes.ARRAY(DataTypes.STRING())))
                         .getLogicalType();
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
index 2f305ce42..ce8afc0db 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
@@ -92,7 +92,7 @@ public class DebeziumJsonSerializationSchema implements 
SerializationSchema<Even
 
     @Override
     public void open(InitializationContext context) {
-        reuseGenericRowData = new GenericRowData(3);
+        reuseGenericRowData = new GenericRowData(4);
         this.context = context;
     }
 
@@ -131,6 +131,11 @@ public class DebeziumJsonSerializationSchema implements 
SerializationSchema<Even
         }
 
         DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
+        reuseGenericRowData.setField(
+                3,
+                GenericRowData.of(
+                        
StringData.fromString(dataChangeEvent.tableId().getSchemaName()),
+                        
StringData.fromString(dataChangeEvent.tableId().getTableName())));
         try {
             switch (dataChangeEvent.op()) {
                 case INSERT:
@@ -185,14 +190,22 @@ public class DebeziumJsonSerializationSchema implements 
SerializationSchema<Even
         }
     }
 
+    /**
+     * Refer to <a
+     * 
href="https://debezium.io/documentation/reference/1.9/connectors/mysql.html";>Debezium
+     * docs</a> for more details.
+     */
     private static RowType createJsonRowType(DataType databaseSchema) {
-        // Debezium JSON contains some other information, e.g. "source", 
"ts_ms"
-        // but we don't need them.
         return (RowType)
                 DataTypes.ROW(
                                 DataTypes.FIELD("before", databaseSchema),
                                 DataTypes.FIELD("after", databaseSchema),
-                                DataTypes.FIELD("op", DataTypes.STRING()))
+                                DataTypes.FIELD("op", DataTypes.STRING()),
+                                DataTypes.FIELD(
+                                        "source",
+                                        DataTypes.ROW(
+                                                DataTypes.FIELD("db", 
DataTypes.STRING()),
+                                                DataTypes.FIELD("table", 
DataTypes.STRING()))))
                         .getLogicalType();
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java
index c6335e6d6..362354c6e 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java
@@ -83,7 +83,7 @@ public class CanalJsonSerializationSchemaTest {
                                 }));
         JsonNode expected =
                 mapper.readTree(
-                        
"{\"old\":null,\"data\":[{\"col1\":\"1\",\"col2\":\"1\"}],\"type\":\"INSERT\"}");
+                        
"{\"old\":null,\"data\":[{\"col1\":\"1\",\"col2\":\"1\"}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"table1\",\"pkNames\":[\"col1\"]}");
         JsonNode actual = 
mapper.readTree(serializationSchema.serialize(insertEvent1));
         Assertions.assertEquals(expected, actual);
 
@@ -97,7 +97,7 @@ public class CanalJsonSerializationSchemaTest {
                                 }));
         expected =
                 mapper.readTree(
-                        
"{\"old\":null,\"data\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"type\":\"INSERT\"}");
+                        
"{\"old\":null,\"data\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"table1\",\"pkNames\":[\"col1\"]}");
         actual = mapper.readTree(serializationSchema.serialize(insertEvent2));
         Assertions.assertEquals(expected, actual);
 
@@ -111,7 +111,7 @@ public class CanalJsonSerializationSchemaTest {
                                 }));
         expected =
                 mapper.readTree(
-                        
"{\"old\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"data\":null,\"type\":\"DELETE\"}");
+                        
"{\"old\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"data\":null,\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"table1\",\"pkNames\":[\"col1\"]}");
         actual = mapper.readTree(serializationSchema.serialize(deleteEvent));
         Assertions.assertEquals(expected, actual);
 
@@ -130,7 +130,7 @@ public class CanalJsonSerializationSchemaTest {
                                 }));
         expected =
                 mapper.readTree(
-                        
"{\"old\":[{\"col1\":\"1\",\"col2\":\"1\"}],\"data\":[{\"col1\":\"1\",\"col2\":\"x\"}],\"type\":\"UPDATE\"}");
+                        
"{\"old\":[{\"col1\":\"1\",\"col2\":\"1\"}],\"data\":[{\"col1\":\"1\",\"col2\":\"x\"}],\"type\":\"UPDATE\",\"database\":\"default_schema\",\"table\":\"table1\",\"pkNames\":[\"col1\"]}");
         actual = mapper.readTree(serializationSchema.serialize(updateEvent));
         Assertions.assertEquals(expected, actual);
     }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
index 0be02b9b3..f2b36e4ef 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
@@ -81,7 +81,7 @@ public class DebeziumJsonSerializationSchemaTest {
                                 }));
         JsonNode expected =
                 mapper.readTree(
-                        
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\"}");
+                        
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}");
         JsonNode actual = 
mapper.readTree(serializationSchema.serialize(insertEvent1));
         Assertions.assertEquals(expected, actual);
         DataChangeEvent insertEvent2 =
@@ -94,7 +94,7 @@ public class DebeziumJsonSerializationSchemaTest {
                                 }));
         expected =
                 mapper.readTree(
-                        
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\"}");
+                        
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}");
         actual = mapper.readTree(serializationSchema.serialize(insertEvent2));
         Assertions.assertEquals(expected, actual);
         DataChangeEvent deleteEvent =
@@ -107,7 +107,7 @@ public class DebeziumJsonSerializationSchemaTest {
                                 }));
         expected =
                 mapper.readTree(
-                        
"{\"before\":{\"col1\":\"2\",\"col2\":\"2\"},\"after\":null,\"op\":\"d\"}");
+                        
"{\"before\":{\"col1\":\"2\",\"col2\":\"2\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}");
         actual = mapper.readTree(serializationSchema.serialize(deleteEvent));
         Assertions.assertEquals(expected, actual);
         DataChangeEvent updateEvent =
@@ -125,7 +125,7 @@ public class DebeziumJsonSerializationSchemaTest {
                                 }));
         expected =
                 mapper.readTree(
-                        
"{\"before\":{\"col1\":\"1\",\"col2\":\"1\"},\"after\":{\"col1\":\"1\",\"col2\":\"x\"},\"op\":\"u\"}");
+                        
"{\"before\":{\"col1\":\"1\",\"col2\":\"1\"},\"after\":{\"col1\":\"1\",\"col2\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}");
         actual = mapper.readTree(serializationSchema.serialize(updateEvent));
         Assertions.assertEquals(expected, actual);
     }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
index 2e2656596..1ef10603b 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
@@ -268,15 +268,25 @@ class KafkaDataSinkITCase extends TestLogger {
         List<JsonNode> expected =
                 Arrays.asList(
                         mapper.readTree(
-                                
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\"}"),
+                                String.format(
+                                        
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+                                        table1.getTableName())),
                         mapper.readTree(
-                                
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\"}"),
+                                String.format(
+                                        
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+                                        table1.getTableName())),
                         mapper.readTree(
-                                
"{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\"}"),
+                                String.format(
+                                        
"{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+                                        table1.getTableName())),
                         mapper.readTree(
-                                
"{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\"}"),
+                                String.format(
+                                        
"{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+                                        table1.getTableName())),
                         mapper.readTree(
-                                
"{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\"}"));
+                                String.format(
+                                        
"{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+                                        table1.getTableName())));
         assertThat(deserializeValues(collectedRecords)).containsAll(expected);
         checkProducerLeak();
     }
@@ -330,15 +340,25 @@ class KafkaDataSinkITCase extends TestLogger {
         List<JsonNode> expected =
                 Arrays.asList(
                         mapper.readTree(
-                                
"{\"old\":null,\"data\":[{\"col1\":\"1\",\"col2\":\"1\"}],\"type\":\"INSERT\"}"),
+                                String.format(
+                                        
"{\"old\":null,\"data\":[{\"col1\":\"1\",\"col2\":\"1\"}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}",
+                                        table1.getTableName())),
                         mapper.readTree(
-                                
"{\"old\":null,\"data\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"type\":\"INSERT\"}"),
+                                String.format(
+                                        
"{\"old\":null,\"data\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}",
+                                        table1.getTableName())),
                         mapper.readTree(
-                                
"{\"old\":null,\"data\":[{\"col1\":\"3\",\"col2\":\"3\"}],\"type\":\"INSERT\"}"),
+                                String.format(
+                                        
"{\"old\":null,\"data\":[{\"col1\":\"3\",\"col2\":\"3\"}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}",
+                                        table1.getTableName())),
                         mapper.readTree(
-                                
"{\"old\":[{\"col1\":\"1\",\"newCol3\":\"1\"}],\"data\":null,\"type\":\"DELETE\"}"),
+                                String.format(
+                                        
"{\"old\":[{\"col1\":\"1\",\"newCol3\":\"1\"}],\"data\":null,\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}",
+                                        table1.getTableName())),
                         mapper.readTree(
-                                
"{\"old\":[{\"col1\":\"2\",\"newCol3\":\"\"}],\"data\":[{\"col1\":\"2\",\"newCol3\":\"x\"}],\"type\":\"UPDATE\"}"));
+                                String.format(
+                                        
"{\"old\":[{\"col1\":\"2\",\"newCol3\":\"\"}],\"data\":[{\"col1\":\"2\",\"newCol3\":\"x\"}],\"type\":\"UPDATE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}",
+                                        table1.getTableName())));
         assertThat(deserializeValues(collectedRecords)).containsAll(expected);
         checkProducerLeak();
     }
@@ -416,15 +436,25 @@ class KafkaDataSinkITCase extends TestLogger {
         List<JsonNode> expected =
                 Arrays.asList(
                         mapper.readTree(
-                                
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\"}"),
+                                String.format(
+                                        
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+                                        table1.getTableName())),
                         mapper.readTree(
-                                
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\"}"),
+                                String.format(
+                                        
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+                                        table1.getTableName())),
                         mapper.readTree(
-                                
"{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\"}"),
+                                String.format(
+                                        
"{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+                                        table1.getTableName())),
                         mapper.readTree(
-                                
"{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\"}"),
+                                String.format(
+                                        
"{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+                                        table1.getTableName())),
                         mapper.readTree(
-                                
"{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\"}"));
+                                String.format(
+                                        
"{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+                                        table1.getTableName())));
         assertThat(deserializeValues(collectedRecords)).containsAll(expected);
         checkProducerLeak();
     }

Reply via email to