This is an automated email from the ASF dual-hosted git repository. renqs pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new 9d6154f12 [FLINK-35791][kafka] Add database and table info of Canal / Debezium json format for Kafka sink (#3461) 9d6154f12 is described below commit 9d6154f12307da24551a6f24b5be72313d01837c Author: Kunni <lvyanquan....@alibaba-inc.com> AuthorDate: Thu Aug 8 16:06:43 2024 +0800 [FLINK-35791][kafka] Add database and table info of Canal / Debezium json format for Kafka sink (#3461) --- .../docs/connectors/pipeline-connectors/kafka.md | 41 +++++++++++++++ .../docs/connectors/pipeline-connectors/kafka.md | 41 +++++++++++++++ .../json/canal/CanalJsonSerializationSchema.java | 25 +++++++-- .../debezium/DebeziumJsonSerializationSchema.java | 21 ++++++-- .../canal/CanalJsonSerializationSchemaTest.java | 8 +-- .../DebeziumJsonSerializationSchemaTest.java | 8 +-- .../connectors/kafka/sink/KafkaDataSinkITCase.java | 60 ++++++++++++++++------ 7 files changed, 173 insertions(+), 31 deletions(-) diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md b/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md index 94f859106..aa9dc8879 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md @@ -140,6 +140,47 @@ Pipeline 连接器配置项 * 如果配置了 `topic` 参数,所有的消息都会发送到这一个主题。 * 写入 Kafka 的 topic 如果不存在,则会默认创建。 +### 输出格式 +对于不同的内置 `value.format` 选项,输出的格式也是不同的: +#### debezium-json +参考 [Debezium docs](https://debezium.io/documentation/reference/1.9/connectors/mysql.html), debezium-json 格式会包含 `before`,`after`,`op`,`source` 几个元素, 但是 `ts_ms` 字段并不会包含在 `source` 元素中。 +一个输出的示例是: +```json +{ + "before": null, + "after": { + "col1": "1", + "col2": "1" + }, + "op": "c", + "source": { + "db": "default_namespace", + "table": "table1" + } +} +``` + +#### canal-json +参考 [Canal | Apache Flink](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/canal/#available-metadata), canal-json 格式会包含 `old`,`data`,`type`,`database`,`table`,`pkNames` 几个元素, 但是 `ts` 并不会包含在其中。 +一个输出的示例是: +```json +{ + "old": null, + "data": [ + { + "col1": "1", + "col2": "1" + } + ], + "type": "INSERT", + "database": "default_schema", + "table": "table1", + "pkNames": [ + "col1" + ] +} +``` + 数据类型映射 ---------------- <div class="wy-table-responsive"> diff --git a/docs/content/docs/connectors/pipeline-connectors/kafka.md b/docs/content/docs/connectors/pipeline-connectors/kafka.md index 96f599509..57f690666 100644 --- a/docs/content/docs/connectors/pipeline-connectors/kafka.md +++ b/docs/content/docs/connectors/pipeline-connectors/kafka.md @@ -138,6 +138,47 @@ Usage Notes * If the written topic of Kafka is not existed, we will create one automatically. +### Output Format +For different built-in `value.format` options, the output format is different: +#### debezium-json +Refer to [Debezium docs](https://debezium.io/documentation/reference/1.9/connectors/mysql.html), debezium-json format will contains `before`,`after`,`op`,`source` elements, but `ts_ms` is not included in `source`. +An output example is: +```json +{ + "before": null, + "after": { + "col1": "1", + "col2": "1" + }, + "op": "c", + "source": { + "db": "default_namespace", + "table": "table1" + } +} +``` + +#### canal-json +Refer to [Canal | Apache Flink](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/canal/#available-metadata), canal-json format will contains `old`,`data`,`type`,`database`,`table`,`pkNames` elements, but `ts` is not included. +An output example is: +```json +{ + "old": null, + "data": [ + { + "col1": "1", + "col2": "1" + } + ], + "type": "INSERT", + "database": "default_schema", + "table": "table1", + "pkNames": [ + "col1" + ] +} +``` + Data Type Mapping ---------------- <div class="wy-table-responsive"> diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java index 78548e31b..0a145cab7 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java @@ -94,7 +94,7 @@ public class CanalJsonSerializationSchema implements SerializationSchema<Event> @Override public void open(InitializationContext context) { this.context = context; - reuseGenericRowData = new GenericRowData(3); + reuseGenericRowData = new GenericRowData(6); } @Override @@ -132,6 +132,17 @@ public class CanalJsonSerializationSchema implements SerializationSchema<Event> } DataChangeEvent dataChangeEvent = (DataChangeEvent) event; + reuseGenericRowData.setField( + 3, StringData.fromString(dataChangeEvent.tableId().getSchemaName())); + reuseGenericRowData.setField( + 4, StringData.fromString(dataChangeEvent.tableId().getTableName())); + reuseGenericRowData.setField( + 5, + new GenericArrayData( + jsonSerializers.get(dataChangeEvent.tableId()).getSchema().primaryKeys() + .stream() + .map(StringData::fromString) + .toArray())); try { switch (dataChangeEvent.op()) { case INSERT: @@ -200,14 +211,20 @@ public class CanalJsonSerializationSchema implements SerializationSchema<Event> } } + /** + * Refer to <a + * href="https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/canal/#available-metadata">Canal + * | Apache Flink</a> for more details. + */ private static RowType createJsonRowType(DataType databaseSchema) { - // Canal JSON contains other information, e.g. "database", "ts" - // but we don't need them return (RowType) DataTypes.ROW( DataTypes.FIELD("old", DataTypes.ARRAY(databaseSchema)), DataTypes.FIELD("data", DataTypes.ARRAY(databaseSchema)), - DataTypes.FIELD("type", DataTypes.STRING())) + DataTypes.FIELD("type", DataTypes.STRING()), + DataTypes.FIELD("database", DataTypes.STRING()), + DataTypes.FIELD("table", DataTypes.STRING()), + DataTypes.FIELD("pkNames", DataTypes.ARRAY(DataTypes.STRING()))) .getLogicalType(); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java index 2f305ce42..ce8afc0db 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java @@ -92,7 +92,7 @@ public class DebeziumJsonSerializationSchema implements SerializationSchema<Even @Override public void open(InitializationContext context) { - reuseGenericRowData = new GenericRowData(3); + reuseGenericRowData = new GenericRowData(4); this.context = context; } @@ -131,6 +131,11 @@ public class DebeziumJsonSerializationSchema implements SerializationSchema<Even } DataChangeEvent dataChangeEvent = (DataChangeEvent) event; + reuseGenericRowData.setField( + 3, + GenericRowData.of( + StringData.fromString(dataChangeEvent.tableId().getSchemaName()), + StringData.fromString(dataChangeEvent.tableId().getTableName()))); try { switch (dataChangeEvent.op()) { case INSERT: @@ -185,14 +190,22 @@ public class DebeziumJsonSerializationSchema implements SerializationSchema<Even } } + /** + * Refer to <a + * href="https://debezium.io/documentation/reference/1.9/connectors/mysql.html">Debezium + * docs</a> for more details. + */ private static RowType createJsonRowType(DataType databaseSchema) { - // Debezium JSON contains some other information, e.g. "source", "ts_ms" - // but we don't need them. return (RowType) DataTypes.ROW( DataTypes.FIELD("before", databaseSchema), DataTypes.FIELD("after", databaseSchema), - DataTypes.FIELD("op", DataTypes.STRING())) + DataTypes.FIELD("op", DataTypes.STRING()), + DataTypes.FIELD( + "source", + DataTypes.ROW( + DataTypes.FIELD("db", DataTypes.STRING()), + DataTypes.FIELD("table", DataTypes.STRING())))) .getLogicalType(); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java index c6335e6d6..362354c6e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java @@ -83,7 +83,7 @@ public class CanalJsonSerializationSchemaTest { })); JsonNode expected = mapper.readTree( - "{\"old\":null,\"data\":[{\"col1\":\"1\",\"col2\":\"1\"}],\"type\":\"INSERT\"}"); + "{\"old\":null,\"data\":[{\"col1\":\"1\",\"col2\":\"1\"}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"table1\",\"pkNames\":[\"col1\"]}"); JsonNode actual = mapper.readTree(serializationSchema.serialize(insertEvent1)); Assertions.assertEquals(expected, actual); @@ -97,7 +97,7 @@ public class CanalJsonSerializationSchemaTest { })); expected = mapper.readTree( - "{\"old\":null,\"data\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"type\":\"INSERT\"}"); + "{\"old\":null,\"data\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"table1\",\"pkNames\":[\"col1\"]}"); actual = mapper.readTree(serializationSchema.serialize(insertEvent2)); Assertions.assertEquals(expected, actual); @@ -111,7 +111,7 @@ public class CanalJsonSerializationSchemaTest { })); expected = mapper.readTree( - "{\"old\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"data\":null,\"type\":\"DELETE\"}"); + "{\"old\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"data\":null,\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"table1\",\"pkNames\":[\"col1\"]}"); actual = mapper.readTree(serializationSchema.serialize(deleteEvent)); Assertions.assertEquals(expected, actual); @@ -130,7 +130,7 @@ public class CanalJsonSerializationSchemaTest { })); expected = mapper.readTree( - "{\"old\":[{\"col1\":\"1\",\"col2\":\"1\"}],\"data\":[{\"col1\":\"1\",\"col2\":\"x\"}],\"type\":\"UPDATE\"}"); + "{\"old\":[{\"col1\":\"1\",\"col2\":\"1\"}],\"data\":[{\"col1\":\"1\",\"col2\":\"x\"}],\"type\":\"UPDATE\",\"database\":\"default_schema\",\"table\":\"table1\",\"pkNames\":[\"col1\"]}"); actual = mapper.readTree(serializationSchema.serialize(updateEvent)); Assertions.assertEquals(expected, actual); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java index 0be02b9b3..f2b36e4ef 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java @@ -81,7 +81,7 @@ public class DebeziumJsonSerializationSchemaTest { })); JsonNode expected = mapper.readTree( - "{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\"}"); + "{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}"); JsonNode actual = mapper.readTree(serializationSchema.serialize(insertEvent1)); Assertions.assertEquals(expected, actual); DataChangeEvent insertEvent2 = @@ -94,7 +94,7 @@ public class DebeziumJsonSerializationSchemaTest { })); expected = mapper.readTree( - "{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\"}"); + "{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}"); actual = mapper.readTree(serializationSchema.serialize(insertEvent2)); Assertions.assertEquals(expected, actual); DataChangeEvent deleteEvent = @@ -107,7 +107,7 @@ public class DebeziumJsonSerializationSchemaTest { })); expected = mapper.readTree( - "{\"before\":{\"col1\":\"2\",\"col2\":\"2\"},\"after\":null,\"op\":\"d\"}"); + "{\"before\":{\"col1\":\"2\",\"col2\":\"2\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}"); actual = mapper.readTree(serializationSchema.serialize(deleteEvent)); Assertions.assertEquals(expected, actual); DataChangeEvent updateEvent = @@ -125,7 +125,7 @@ public class DebeziumJsonSerializationSchemaTest { })); expected = mapper.readTree( - "{\"before\":{\"col1\":\"1\",\"col2\":\"1\"},\"after\":{\"col1\":\"1\",\"col2\":\"x\"},\"op\":\"u\"}"); + "{\"before\":{\"col1\":\"1\",\"col2\":\"1\"},\"after\":{\"col1\":\"1\",\"col2\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}"); actual = mapper.readTree(serializationSchema.serialize(updateEvent)); Assertions.assertEquals(expected, actual); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java index 2e2656596..1ef10603b 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java @@ -268,15 +268,25 @@ class KafkaDataSinkITCase extends TestLogger { List<JsonNode> expected = Arrays.asList( mapper.readTree( - "{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\"}"), + String.format( + "{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}", + table1.getTableName())), mapper.readTree( - "{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\"}"), + String.format( + "{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}", + table1.getTableName())), mapper.readTree( - "{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\"}"), + String.format( + "{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}", + table1.getTableName())), mapper.readTree( - "{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\"}"), + String.format( + "{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}", + table1.getTableName())), mapper.readTree( - "{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\"}")); + String.format( + "{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}", + table1.getTableName()))); assertThat(deserializeValues(collectedRecords)).containsAll(expected); checkProducerLeak(); } @@ -330,15 +340,25 @@ class KafkaDataSinkITCase extends TestLogger { List<JsonNode> expected = Arrays.asList( mapper.readTree( - "{\"old\":null,\"data\":[{\"col1\":\"1\",\"col2\":\"1\"}],\"type\":\"INSERT\"}"), + String.format( + "{\"old\":null,\"data\":[{\"col1\":\"1\",\"col2\":\"1\"}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}", + table1.getTableName())), mapper.readTree( - "{\"old\":null,\"data\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"type\":\"INSERT\"}"), + String.format( + "{\"old\":null,\"data\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}", + table1.getTableName())), mapper.readTree( - "{\"old\":null,\"data\":[{\"col1\":\"3\",\"col2\":\"3\"}],\"type\":\"INSERT\"}"), + String.format( + "{\"old\":null,\"data\":[{\"col1\":\"3\",\"col2\":\"3\"}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}", + table1.getTableName())), mapper.readTree( - "{\"old\":[{\"col1\":\"1\",\"newCol3\":\"1\"}],\"data\":null,\"type\":\"DELETE\"}"), + String.format( + "{\"old\":[{\"col1\":\"1\",\"newCol3\":\"1\"}],\"data\":null,\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}", + table1.getTableName())), mapper.readTree( - "{\"old\":[{\"col1\":\"2\",\"newCol3\":\"\"}],\"data\":[{\"col1\":\"2\",\"newCol3\":\"x\"}],\"type\":\"UPDATE\"}")); + String.format( + "{\"old\":[{\"col1\":\"2\",\"newCol3\":\"\"}],\"data\":[{\"col1\":\"2\",\"newCol3\":\"x\"}],\"type\":\"UPDATE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}", + table1.getTableName()))); assertThat(deserializeValues(collectedRecords)).containsAll(expected); checkProducerLeak(); } @@ -416,15 +436,25 @@ class KafkaDataSinkITCase extends TestLogger { List<JsonNode> expected = Arrays.asList( mapper.readTree( - "{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\"}"), + String.format( + "{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}", + table1.getTableName())), mapper.readTree( - "{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\"}"), + String.format( + "{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}", + table1.getTableName())), mapper.readTree( - "{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\"}"), + String.format( + "{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}", + table1.getTableName())), mapper.readTree( - "{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\"}"), + String.format( + "{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}", + table1.getTableName())), mapper.readTree( - "{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\"}")); + String.format( + "{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}", + table1.getTableName()))); assertThat(deserializeValues(collectedRecords)).containsAll(expected); checkProducerLeak(); }