This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 3457a922b [FLINK-36611][pipeline-connector/kafka] Support output
schema for Kafka debezium-json format
3457a922b is described below
commit 3457a922b5da331243fb3a4a1bc26a1b58e81b36
Author: MOBIN <[email protected]>
AuthorDate: Mon Apr 7 23:04:12 2025 +0800
[FLINK-36611][pipeline-connector/kafka] Support output schema for Kafka
debezium-json format
This closes #3791
---
.../docs/connectors/pipeline-connectors/kafka.md | 88 +++++-
.../docs/connectors/pipeline-connectors/kafka.md | 89 +++++-
.../flink-cdc-pipeline-connector-kafka/pom.xml | 23 ++
.../kafka/json/ChangeLogJsonFormatFactory.java | 13 +-
.../DebeziumJsonRowDataSerializationSchema.java | 163 +++++++++++
.../debezium/DebeziumJsonSerializationSchema.java | 317 +++++++++++++++++----
.../kafka/json/debezium/DebeziumJsonStruct.java | 87 ++++++
.../kafka/sink/KafkaDataSinkFactory.java | 2 +
.../kafka/sink/KafkaDataSinkOptions.java | 7 +
.../utils/JsonRowDataSerializationSchemaUtils.java | 44 +++
.../DebeziumJsonSerializationSchemaTest.java | 117 ++++++++
.../cdc/pipeline/tests/MysqlToKafkaE2eITCase.java | 76 +++++
.../mysqlToKafka/debezium-json-with-schema.txt | 37 +++
13 files changed, 997 insertions(+), 66 deletions(-)
diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md
b/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md
index 136eb4d4d..419499b75 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md
@@ -150,6 +150,13 @@ Pipeline 连接器配置项
<td>String</td>
<td>自定义的上游表名到下游 Kafka Topic 名的映射关系。 每个映射关系由 `;` 分割,上游表的 TableId 和下游
Kafka 的 Topic 名由 `:` 分割。 举个例子,我们可以配置 `sink.tableId-to-topic.mapping` 的值为
`mydb.mytable1:topic1;mydb.mytable2:topic2`。 </td>
</tr>
+ <tr>
+ <td>sink.debezium-json.include-schema.enabled</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>如果配置了这个参数,每条debezium记录都将包含debezium schema信息。
只有当`value.format`为`debezium-json`时才生效。 </td>
+ </tr>
</tbody>
</table>
</div>
@@ -180,6 +187,63 @@ Pipeline 连接器配置项
}
}
```
+当`sink.debezium-json.include-schema.enabled=true`时,输出示例如下:
+```json
+{
+ "schema":{
+ "type":"struct",
+ "fields":[
+ {
+ "type":"struct",
+ "fields":[
+ {
+ "type":"string",
+ "optional":true,
+ "field":"col1"
+ },
+ {
+ "type":"string",
+ "optional":true,
+ "field":"col2"
+ }
+ ],
+ "optional":true,
+ "field":"before"
+ },
+ {
+ "type":"struct",
+ "fields":[
+ {
+ "type":"string",
+ "optional":true,
+ "field":"col1"
+ },
+ {
+ "type":"string",
+ "optional":true,
+ "field":"col2"
+ }
+ ],
+ "optional":true,
+ "field":"after"
+ }
+ ],
+ "optional":false
+ },
+ "payload":{
+ "before": null,
+ "after": {
+ "col1": "1",
+ "col2": "1"
+ },
+ "op": "c",
+ "source": {
+ "db": "default_namespace",
+ "table": "table1"
+ }
+ }
+}
+```
#### canal-json
参考 [Canal | Apache
Flink](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/canal/#available-metadata),
canal-json 格式会包含 `old`,`data`,`type`,`database`,`table`,`pkNames` 几个元素, 但是
`ts` 并不会包含在其中。
@@ -204,12 +268,16 @@ Pipeline 连接器配置项
数据类型映射
----------------
+[Literal
type](https://debezium.io/documentation/reference/3.1/connectors/mysql.html#mysql-data-types):
反映数据的实际存储类型 (对应debezium schema中的type字段)<br>
+[Semantic
type](https://debezium.io/documentation/reference/3.1/connectors/mysql.html#mysql-data-types):
反映数据的逻辑类型 (对应对应debezium schema中的name字段)。
<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left">CDC type</th>
<th class="text-left">JSON type</th>
+ <th class="text-left">Literal type</th>
+ <th class="text-left">Semantic type</th>
<th class="text-left" style="width:60%;">NOTE</th>
</tr>
</thead>
@@ -217,29 +285,35 @@ Pipeline 连接器配置项
<tr>
<td>TINYINT</td>
<td>TINYINT</td>
+ <td>INT16</td>
<td></td>
</tr>
<tr>
<td>SMALLINT</td>
<td>SMALLINT</td>
+ <td>INT16</td>
<td></td>
</tr>
<tr>
<td>INT</td>
<td>INT</td>
+ <td>INT32</td>
<td></td>
</tr>
<tr>
<td>BIGINT</td>
<td>BIGINT</td>
+ <td>INT64</td>
<td></td>
</tr>
<tr>
+ <td>FLOAT</td>
<td>FLOAT</td>
<td>FLOAT</td>
<td></td>
</tr>
<tr>
+ <td>DOUBLE</td>
<td>DOUBLE</td>
<td>DOUBLE</td>
<td></td>
@@ -247,9 +321,12 @@ Pipeline 连接器配置项
<tr>
<td>DECIMAL(p, s)</td>
<td>DECIMAL(p, s)</td>
+ <td>BYTES</td>
+ <td>org.apache.kafka.connect.data.Decimal</td>
<td></td>
</tr>
<tr>
+ <td>BOOLEAN</td>
<td>BOOLEAN</td>
<td>BOOLEAN</td>
<td></td>
@@ -257,26 +334,33 @@ Pipeline 连接器配置项
<tr>
<td>DATE</td>
<td>DATE</td>
+ <td>io.debezium.time.Date</td>
<td></td>
</tr>
<tr>
- <td>TIMESTAMP</td>
- <td>DATETIME</td>
+ <td>TIMESTAMP(p)</td>
+ <td>TIMESTAMP(p)</td>
+ <td>INT64</td>
+ <td>p <=3 io.debezium.time.Timestamp <br>p >3
io.debezium.time.MicroTimestamp </td>
<td></td>
</tr>
<tr>
<td>TIMESTAMP_LTZ</td>
<td>TIMESTAMP_LTZ</td>
+ <td>STRING</td>
+ <td>io.debezium.time.ZonedTimestamp</td>
<td></td>
</tr>
<tr>
<td>CHAR(n)</td>
<td>CHAR(n)</td>
+ <td>STRING</td>
<td></td>
</tr>
<tr>
<td>VARCHAR(n)</td>
<td>VARCHAR(n)</td>
+ <td>STRING</td>
<td></td>
</tr>
</tbody>
diff --git a/docs/content/docs/connectors/pipeline-connectors/kafka.md
b/docs/content/docs/connectors/pipeline-connectors/kafka.md
index 619ade45c..b9cf96e40 100644
--- a/docs/content/docs/connectors/pipeline-connectors/kafka.md
+++ b/docs/content/docs/connectors/pipeline-connectors/kafka.md
@@ -148,6 +148,13 @@ Pipeline Connector Options
<td>String</td>
<td>Custom table mappings for each table from upstream tableId to
downstream Kafka topic. Each mapping is separated by `;`, separate upstream
tableId and downstream Kafka topic by `:`, For example, we can set
`sink.tableId-to-topic.mapping` like
`mydb.mytable1:topic1;mydb.mytable2:topic2`. </td>
</tr>
+ <tr>
+ <td>sink.debezium-json.include-schema.enabled</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>If this parameter is configured, each debezium record will contain
debezium schema information. Is only supported when using debezium-json. </td>
+ </tr>
</tbody>
</table>
</div>
@@ -178,6 +185,63 @@ An output example is:
}
}
```
+When `sink.debezium-json.include-schema.enabled` is true, the output format
will be:
+```json
+{
+ "schema":{
+ "type":"struct",
+ "fields":[
+ {
+ "type":"struct",
+ "fields":[
+ {
+ "type":"string",
+ "optional":true,
+ "field":"col1"
+ },
+ {
+ "type":"string",
+ "optional":true,
+ "field":"col2"
+ }
+ ],
+ "optional":true,
+ "field":"before"
+ },
+ {
+ "type":"struct",
+ "fields":[
+ {
+ "type":"string",
+ "optional":true,
+ "field":"col1"
+ },
+ {
+ "type":"string",
+ "optional":true,
+ "field":"col2"
+ }
+ ],
+ "optional":true,
+ "field":"after"
+ }
+ ],
+ "optional":false
+ },
+ "payload":{
+ "before": null,
+ "after": {
+ "col1": "1",
+ "col2": "1"
+ },
+ "op": "c",
+ "source": {
+ "db": "default_namespace",
+ "table": "table1"
+ }
+ }
+}
+```
#### canal-json
Refer to [Canal | Apache
Flink](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/canal/#available-metadata),
canal-json format will contains
`old`,`data`,`type`,`database`,`table`,`pkNames` elements, but `ts` is not
included.
@@ -202,12 +266,16 @@ An output example is:
Data Type Mapping
----------------
+[Literal
type](https://debezium.io/documentation/reference/3.1/connectors/mysql.html#mysql-data-types):
defines the physical storage format of data (type field of the debezium
schema)<br>
+[Semantic
type](https://debezium.io/documentation/reference/3.1/connectors/mysql.html#mysql-data-types):
defines the logical meaning of data (name field of the debezium schema).
<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left">CDC type</th>
<th class="text-left">JSON type</th>
+ <th class="text-left">Literal type</th>
+ <th class="text-left">Semantic type</th>
<th class="text-left" style="width:60%;">NOTE</th>
</tr>
</thead>
@@ -215,29 +283,35 @@ Data Type Mapping
<tr>
<td>TINYINT</td>
<td>TINYINT</td>
+ <td>INT16</td>
<td></td>
</tr>
<tr>
<td>SMALLINT</td>
<td>SMALLINT</td>
+ <td>INT16</td>
<td></td>
</tr>
<tr>
<td>INT</td>
<td>INT</td>
+ <td>INT32</td>
<td></td>
</tr>
<tr>
<td>BIGINT</td>
<td>BIGINT</td>
+ <td>INT64</td>
<td></td>
</tr>
<tr>
+ <td>FLOAT</td>
<td>FLOAT</td>
<td>FLOAT</td>
<td></td>
</tr>
<tr>
+ <td>DOUBLE</td>
<td>DOUBLE</td>
<td>DOUBLE</td>
<td></td>
@@ -245,9 +319,12 @@ Data Type Mapping
<tr>
<td>DECIMAL(p, s)</td>
<td>DECIMAL(p, s)</td>
+ <td>BYTES</td>
+ <td>org.apache.kafka.connect.data.Decimal</td>
<td></td>
</tr>
<tr>
+ <td>BOOLEAN</td>
<td>BOOLEAN</td>
<td>BOOLEAN</td>
<td></td>
@@ -255,26 +332,34 @@ Data Type Mapping
<tr>
<td>DATE</td>
<td>DATE</td>
+ <td>INT32</td>
+ <td>io.debezium.time.Date</td>
<td></td>
</tr>
<tr>
- <td>TIMESTAMP</td>
- <td>TIMESTAMP</td>
+ <td>TIMESTAMP(p)</td>
+ <td>TIMESTAMP(p)</td>
+ <td>INT64</td>
+ <td>p <=3 io.debezium.time.Timestamp <br>p >3
io.debezium.time.MicroTimestamp </td>
<td></td>
</tr>
<tr>
<td>TIMESTAMP_LTZ</td>
<td>TIMESTAMP_LTZ</td>
+ <td>STRING</td>
+ <td>io.debezium.time.ZonedTimestamp</td>
<td></td>
</tr>
<tr>
<td>CHAR(n)</td>
<td>CHAR(n)</td>
+ <td>STRING</td>
<td></td>
</tr>
<tr>
<td>VARCHAR(n)</td>
<td>VARCHAR(n)</td>
+ <td>STRING</td>
<td></td>
</tr>
</tbody>
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml
index 2c4f474ca..dee1f2f46 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml
@@ -29,6 +29,7 @@ limitations under the License.
<artifactId>flink-cdc-pipeline-connector-kafka</artifactId>
<properties>
+ <version.kafka>3.4.0</version.kafka>
</properties>
<dependencies>
@@ -74,6 +75,25 @@ limitations under the License.
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-core</artifactId>
+ <version>${debezium.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>connect-api</artifactId>
+ <version>${version.kafka}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>connect-json</artifactId>
+ <version>${version.kafka}</version>
+ </dependency>
</dependencies>
<build>
@@ -101,6 +121,9 @@ limitations under the License.
<relocation>
<pattern>org.apache.kafka</pattern>
<shadedPattern>org.apache.flink.kafka.shaded.org.apache.kafka</shadedPattern>
+ <excludes>
+
<exclude>org/apache/kafka/connect/data/**</exclude>
+ </excludes>
</relocation>
<relocation>
<pattern>org.apache.flink.connector.kafka</pattern>
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java
index ec336d1c5..3b55a05ba 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java
@@ -20,6 +20,7 @@ package org.apache.flink.cdc.connectors.kafka.json;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.utils.Preconditions;
import
org.apache.flink.cdc.connectors.kafka.json.canal.CanalJsonSerializationSchema;
import
org.apache.flink.cdc.connectors.kafka.json.debezium.DebeziumJsonSerializationSchema;
import
org.apache.flink.cdc.connectors.kafka.utils.JsonRowDataSerializationSchemaUtils;
@@ -30,6 +31,7 @@ import org.apache.flink.formats.json.JsonFormatOptionsUtil;
import java.time.ZoneId;
+import static
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED;
import static
org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
import static
org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL;
@@ -50,6 +52,14 @@ public class ChangeLogJsonFormatFactory {
*/
public static SerializationSchema<Event> createSerializationSchema(
ReadableConfig formatOptions, JsonSerializationType type, ZoneId
zoneId) {
+ boolean isIncludedDebeziumSchema =
+ Boolean.parseBoolean(
+
formatOptions.toMap().get(SINK_DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED.key()));
+ Preconditions.checkArgument(
+ !(isIncludedDebeziumSchema &&
!type.equals(JsonSerializationType.DEBEZIUM_JSON)),
+ SINK_DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED.key()
+ + " is only supported when using debezium-json.");
+
TimestampFormat timestampFormat =
JsonFormatOptionsUtil.getTimestampFormat(formatOptions);
JsonFormatOptions.MapNullKeyMode mapNullKeyMode =
JsonFormatOptionsUtil.getMapNullKeyMode(formatOptions);
@@ -70,7 +80,8 @@ public class ChangeLogJsonFormatFactory {
mapNullKeyLiteral,
zoneId,
encodeDecimalAsPlainNumber,
- ignoreNullFields);
+ ignoreNullFields,
+ isIncludedDebeziumSchema);
}
case CANAL_JSON:
{
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonRowDataSerializationSchema.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonRowDataSerializationSchema.java
new file mode 100644
index 000000000..349dee4b8
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonRowDataSerializationSchema.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.kafka.json.debezium;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import
org.apache.flink.cdc.connectors.kafka.utils.JsonRowDataSerializationSchemaUtils;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonFormatOptions;
+import org.apache.flink.formats.json.JsonParserRowDataDeserializationSchema;
+import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
+import org.apache.flink.formats.json.RowDataToJsonConverters;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.util.Objects;
+
+/**
+ * Serialization schema that serializes an object of Flink internal data
structure into a JSON
+ * bytes.
+ *
+ * <p>Serializes the input Flink object into a JSON string and converts it
into <code>byte[]</code>.
+ *
+ * <p>Result <code>byte[]</code> messages can be deserialized using {@link
+ * JsonRowDataDeserializationSchema} or {@link
JsonParserRowDataDeserializationSchema}.
+ */
+public class DebeziumJsonRowDataSerializationSchema implements
SerializationSchema<RowData> {
+ private static final long serialVersionUID = 1L;
+
+ /** RowType to generate the runtime converter. */
+ private final RowType rowType;
+
+ /** The converter that converts internal data formats to JsonNode. */
+ private final RowDataToJsonConverters.RowDataToJsonConverter
runtimeConverter;
+
+ /** Object mapper that is used to create output JSON objects. */
+ private transient ObjectMapper mapper;
+
+ /** Reusable object node. */
+ private transient ObjectNode node;
+
+ /** Timestamp format specification which is used to parse timestamp. */
+ private final TimestampFormat timestampFormat;
+
+ /** The handling mode when serializing null keys for map data. */
+ private final JsonFormatOptions.MapNullKeyMode mapNullKeyMode;
+
+ /** The string literal when handling mode for map null key LITERAL. */
+ private final String mapNullKeyLiteral;
+
+ /** Flag indicating whether to serialize all decimals as plain numbers. */
+ private final boolean encodeDecimalAsPlainNumber;
+
+ /** Flag indicating whether to ignore null fields. */
+ private final boolean ignoreNullFields;
+
+ private final boolean isIncludedDebeziumSchema;
+
+ public DebeziumJsonRowDataSerializationSchema(
+ RowType rowType,
+ TimestampFormat timestampFormat,
+ JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
+ String mapNullKeyLiteral,
+ boolean encodeDecimalAsPlainNumber,
+ boolean ignoreNullFields,
+ boolean isIncludedDebeziumSchema) {
+ this.rowType = rowType;
+ this.timestampFormat = timestampFormat;
+ this.mapNullKeyMode = mapNullKeyMode;
+ this.mapNullKeyLiteral = mapNullKeyLiteral;
+ this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber;
+ this.runtimeConverter =
+
JsonRowDataSerializationSchemaUtils.createRowDataToJsonConverters(
+ timestampFormat,
+ mapNullKeyMode,
+ mapNullKeyLiteral,
+ ignoreNullFields)
+ .createConverter(rowType);
+ this.ignoreNullFields = ignoreNullFields;
+ this.isIncludedDebeziumSchema = isIncludedDebeziumSchema;
+ }
+
+ @Override
+ public void open(InitializationContext context) throws Exception {
+ mapper =
+ JacksonMapperFactory.createObjectMapper()
+ .configure(
+
JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN,
+ encodeDecimalAsPlainNumber);
+ }
+
+ @Override
+ public byte[] serialize(RowData row) {
+ if (node == null || ignoreNullFields) {
+ node = mapper.createObjectNode();
+ }
+
+ try {
+ runtimeConverter.convert(mapper, node, row);
+ if (isIncludedDebeziumSchema) {
+ // schema is a nested json string, asText() can return a pure
string without other
+ // escape characters such as "\"
+ String schemaValue = node.get("schema").asText();
+ JsonNode schemaNode = mapper.readTree(schemaValue);
+ node.set("schema", schemaNode);
+ return mapper.writeValueAsBytes(node);
+ }
+ return mapper.writeValueAsBytes(node);
+ } catch (Throwable t) {
+ throw new RuntimeException(String.format("Could not serialize row
'%s'.", row), t);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DebeziumJsonRowDataSerializationSchema that =
(DebeziumJsonRowDataSerializationSchema) o;
+ return rowType.equals(that.rowType)
+ && timestampFormat.equals(that.timestampFormat)
+ && mapNullKeyMode.equals(that.mapNullKeyMode)
+ && mapNullKeyLiteral.equals(that.mapNullKeyLiteral)
+ && encodeDecimalAsPlainNumber ==
that.encodeDecimalAsPlainNumber
+ && ignoreNullFields == that.ignoreNullFields
+ && isIncludedDebeziumSchema == that.isIncludedDebeziumSchema;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ rowType,
+ timestampFormat,
+ mapNullKeyMode,
+ mapNullKeyLiteral,
+ encodeDecimalAsPlainNumber,
+ ignoreNullFields,
+ isIncludedDebeziumSchema);
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
index 238c2121d..b1945b5c1 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
@@ -23,14 +23,15 @@ import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DecimalType;
+import org.apache.flink.cdc.common.types.TimestampType;
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.connectors.kafka.json.TableSchemaInfo;
-import
org.apache.flink.cdc.connectors.kafka.utils.JsonRowDataSerializationSchemaUtils;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonFormatOptions;
-import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
@@ -38,11 +39,33 @@ import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import io.debezium.data.Bits;
+import io.debezium.time.Date;
+import io.debezium.time.MicroTime;
+import io.debezium.time.MicroTimestamp;
+import io.debezium.time.Timestamp;
+import io.debezium.time.ZonedTimestamp;
+import org.apache.kafka.connect.data.Decimal;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.storage.ConverterConfig;
+import org.apache.kafka.connect.storage.ConverterType;
+
import java.time.ZoneId;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.function.BiConsumer;
import static java.lang.String.format;
+import static
org.apache.flink.cdc.connectors.kafka.json.debezium.DebeziumJsonStruct.DebeziumPayload.AFTER;
+import static
org.apache.flink.cdc.connectors.kafka.json.debezium.DebeziumJsonStruct.DebeziumPayload.BEFORE;
+import static
org.apache.flink.cdc.connectors.kafka.json.debezium.DebeziumJsonStruct.DebeziumPayload.OPERATION;
+import static
org.apache.flink.cdc.connectors.kafka.json.debezium.DebeziumJsonStruct.DebeziumPayload.SOURCE;
+import static
org.apache.flink.cdc.connectors.kafka.json.debezium.DebeziumJsonStruct.DebeziumSource.DATABASE;
+import static
org.apache.flink.cdc.connectors.kafka.json.debezium.DebeziumJsonStruct.DebeziumSource.TABLE;
+import static
org.apache.flink.cdc.connectors.kafka.json.debezium.DebeziumJsonStruct.DebeziumStruct.PAYLOAD;
+import static
org.apache.flink.cdc.connectors.kafka.json.debezium.DebeziumJsonStruct.DebeziumStruct.SCHEMA;
import static
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
/**
@@ -65,6 +88,8 @@ public class DebeziumJsonSerializationSchema implements
SerializationSchema<Even
private transient GenericRowData reuseGenericRowData;
+ private transient GenericRowData payloadGenericRowData;
+
private final TimestampFormat timestampFormat;
private final JsonFormatOptions.MapNullKeyMode mapNullKeyMode;
@@ -75,17 +100,24 @@ public class DebeziumJsonSerializationSchema implements
SerializationSchema<Even
private final boolean ignoreNullFields;
+ private final boolean isIncludedDebeziumSchema;
+
private final ZoneId zoneId;
private InitializationContext context;
+ private Map<TableId, String> schemaMap = new HashMap<>();
+
+ JsonConverter jsonConverter;
+
public DebeziumJsonSerializationSchema(
TimestampFormat timestampFormat,
JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
String mapNullKeyLiteral,
ZoneId zoneId,
boolean encodeDecimalAsPlainNumber,
- boolean ignoreNullFields) {
+ boolean ignoreNullFields,
+ boolean isIncludedDebeziumSchema) {
this.timestampFormat = timestampFormat;
this.mapNullKeyMode = mapNullKeyMode;
this.mapNullKeyLiteral = mapNullKeyLiteral;
@@ -93,11 +125,23 @@ public class DebeziumJsonSerializationSchema implements
SerializationSchema<Even
this.zoneId = zoneId;
jsonSerializers = new HashMap<>();
this.ignoreNullFields = ignoreNullFields;
+ this.isIncludedDebeziumSchema = isIncludedDebeziumSchema;
}
@Override
public void open(InitializationContext context) {
- reuseGenericRowData = new GenericRowData(4);
+ if (isIncludedDebeziumSchema) {
+ reuseGenericRowData = new GenericRowData(2);
+ payloadGenericRowData = new GenericRowData(4);
+ reuseGenericRowData.setField(PAYLOAD.getPosition(),
payloadGenericRowData);
+
+ this.jsonConverter = new JsonConverter();
+ final HashMap<String, Object> configs = new HashMap<>(2);
+ configs.put(ConverterConfig.TYPE_CONFIG,
ConverterType.VALUE.getName());
+ jsonConverter.configure(configs);
+ } else {
+ reuseGenericRowData = new GenericRowData(4);
+ }
this.context = context;
}
@@ -115,16 +159,22 @@ public class DebeziumJsonSerializationSchema implements
SerializationSchema<Even
jsonSerializers.get(schemaChangeEvent.tableId()).getSchema(),
schemaChangeEvent);
}
+
+ if (isIncludedDebeziumSchema) {
+ schemaMap.put(schemaChangeEvent.tableId(),
convertSchemaToDebeziumSchema(schema));
+ }
LogicalType rowType =
DataTypeUtils.toFlinkDataType(schema.toRowDataType()).getLogicalType();
- JsonRowDataSerializationSchema jsonSerializer =
-
JsonRowDataSerializationSchemaUtils.createSerializationSchema(
- createJsonRowType(fromLogicalToDataType(rowType)),
+ DebeziumJsonRowDataSerializationSchema jsonSerializer =
+ new DebeziumJsonRowDataSerializationSchema(
+ createJsonRowType(
+ fromLogicalToDataType(rowType),
isIncludedDebeziumSchema),
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
encodeDecimalAsPlainNumber,
- ignoreNullFields);
+ ignoreNullFields,
+ isIncludedDebeziumSchema);
try {
jsonSerializer.open(context);
} catch (Exception e) {
@@ -138,81 +188,226 @@ public class DebeziumJsonSerializationSchema implements
SerializationSchema<Even
}
DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
- reuseGenericRowData.setField(
- 3,
- GenericRowData.of(
-
StringData.fromString(dataChangeEvent.tableId().getSchemaName()),
-
StringData.fromString(dataChangeEvent.tableId().getTableName())));
+ BiConsumer<DataChangeEvent, GenericRowData> converter;
try {
switch (dataChangeEvent.op()) {
case INSERT:
- reuseGenericRowData.setField(0, null);
- reuseGenericRowData.setField(
- 1,
- jsonSerializers
- .get(dataChangeEvent.tableId())
-
.getRowDataFromRecordData(dataChangeEvent.after(), false));
- reuseGenericRowData.setField(2, OP_INSERT);
- return jsonSerializers
- .get(dataChangeEvent.tableId())
- .getSerializationSchema()
- .serialize(reuseGenericRowData);
+ converter = this::convertInsertEventToRowData;
+ break;
case DELETE:
- reuseGenericRowData.setField(
- 0,
- jsonSerializers
- .get(dataChangeEvent.tableId())
-
.getRowDataFromRecordData(dataChangeEvent.before(), false));
- reuseGenericRowData.setField(1, null);
- reuseGenericRowData.setField(2, OP_DELETE);
- return jsonSerializers
- .get(dataChangeEvent.tableId())
- .getSerializationSchema()
- .serialize(reuseGenericRowData);
+ converter = this::convertDeleteEventToRowData;
+ break;
case UPDATE:
case REPLACE:
- reuseGenericRowData.setField(
- 0,
- jsonSerializers
- .get(dataChangeEvent.tableId())
-
.getRowDataFromRecordData(dataChangeEvent.before(), false));
- reuseGenericRowData.setField(
- 1,
- jsonSerializers
- .get(dataChangeEvent.tableId())
-
.getRowDataFromRecordData(dataChangeEvent.after(), false));
- reuseGenericRowData.setField(2, OP_UPDATE);
- return jsonSerializers
- .get(dataChangeEvent.tableId())
- .getSerializationSchema()
- .serialize(reuseGenericRowData);
+ converter = this::convertUpdateEventToRowData;
+ break;
default:
throw new UnsupportedOperationException(
format(
"Unsupported operation '%s' for
OperationType.",
dataChangeEvent.op()));
}
+
+ if (isIncludedDebeziumSchema) {
+ converter.accept(dataChangeEvent, payloadGenericRowData);
+ reuseGenericRowData.setField(
+ SCHEMA.getPosition(),
+
StringData.fromString(schemaMap.get(dataChangeEvent.tableId())));
+ } else {
+ converter.accept(dataChangeEvent, reuseGenericRowData);
+ }
+ return jsonSerializers
+ .get(dataChangeEvent.tableId())
+ .getSerializationSchema()
+ .serialize(reuseGenericRowData);
} catch (Throwable t) {
throw new RuntimeException(format("Could not serialize event
'%s'.", event), t);
}
}
+ /**
+ * convert CDC {@link Schema} to Debezium schema.
+ *
+ * @param schema CDC schema
+ * @return Debezium schema json string
+ */
+ public String convertSchemaToDebeziumSchema(Schema schema) {
+ List<Column> columns = schema.getColumns();
+ SchemaBuilder schemaBuilder = SchemaBuilder.struct();
+ SchemaBuilder beforeBuilder = SchemaBuilder.struct();
+ SchemaBuilder afterBuilder = SchemaBuilder.struct();
+ for (Column column : columns) {
+ SchemaBuilder field = convertCDCDataTypeToDebeziumDataType(column);
+ beforeBuilder.field(column.getName(), field).optional();
+ afterBuilder.field(column.getName(), field).optional();
+ }
+ schemaBuilder.field("before", beforeBuilder);
+ schemaBuilder.field("after", afterBuilder);
+ schemaBuilder.build();
+ return jsonConverter.asJsonSchema(schemaBuilder).toString();
+ }
+
+ private static SchemaBuilder convertCDCDataTypeToDebeziumDataType(Column
column) {
+ org.apache.flink.cdc.common.types.DataType columnType =
column.getType();
+ final SchemaBuilder field;
+ switch (columnType.getTypeRoot()) {
+ case TINYINT:
+ case SMALLINT:
+ field = SchemaBuilder.int16();
+ break;
+ case INTEGER:
+ field = SchemaBuilder.int32();
+ break;
+ case BIGINT:
+ field = SchemaBuilder.int64();
+ break;
+ case DECIMAL:
+ final int decimalPrecision = ((DecimalType)
columnType).getPrecision();
+ final int decimalScale = ((DecimalType) columnType).getScale();
+ field =
+ Decimal.builder(decimalScale)
+ .parameter(
+ "connect.decimal.precision",
+ String.valueOf(decimalPrecision));
+ break;
+ case BOOLEAN:
+ field = SchemaBuilder.bool();
+ break;
+ case FLOAT:
+ field = SchemaBuilder.float32();
+ break;
+ case DOUBLE:
+ field = SchemaBuilder.float64();
+ break;
+ case DATE:
+ field =
SchemaBuilder.int32().name(Date.SCHEMA_NAME).version(1);
+ break;
+ case TIME_WITHOUT_TIME_ZONE:
+ field =
SchemaBuilder.int64().name(MicroTime.SCHEMA_NAME).version(1);
+ break;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_TIME_ZONE:
+ int timestampPrecisionPrecision = ((TimestampType)
columnType).getPrecision();
+ if (timestampPrecisionPrecision > 3) {
+ field =
SchemaBuilder.int64().name(MicroTimestamp.SCHEMA_NAME).version(1);
+ } else {
+ field =
SchemaBuilder.int64().name(Timestamp.SCHEMA_NAME).version(1);
+ }
+ break;
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ field =
SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).version(1);
+ break;
+ case BINARY:
+ case VARBINARY:
+ field =
+ SchemaBuilder.bytes()
+ .name(Bits.LOGICAL_NAME)
+ .parameter(
+ Bits.LENGTH_FIELD,
+ Integer.toString(
+
org.apache.flink.cdc.common.types.DataTypes
+ .getLength(columnType)
+ .orElse(0)))
+ .version(1);
+ break;
+ case CHAR:
+ case VARCHAR:
+ default:
+ field = SchemaBuilder.string();
+ }
+
+ if (columnType.isNullable()) {
+ field.optional();
+ } else {
+ field.required();
+ }
+ if (column.getDefaultValueExpression() != null) {
+ field.defaultValue(column.getDefaultValueExpression());
+ }
+ if (column.getComment() != null) {
+ field.doc(column.getComment());
+ }
+ return field;
+ }
+
+ private void convertInsertEventToRowData(
+ DataChangeEvent dataChangeEvent, GenericRowData genericRowData) {
+ genericRowData.setField(BEFORE.getPosition(), null);
+ genericRowData.setField(
+ AFTER.getPosition(),
+ jsonSerializers
+ .get(dataChangeEvent.tableId())
+ .getRowDataFromRecordData(dataChangeEvent.after(),
false));
+ genericRowData.setField(OPERATION.getPosition(), OP_INSERT);
+ genericRowData.setField(
+ SOURCE.getPosition(),
+ GenericRowData.of(
+
StringData.fromString(dataChangeEvent.tableId().getSchemaName()),
+
StringData.fromString(dataChangeEvent.tableId().getTableName())));
+ }
+
+ private void convertDeleteEventToRowData(
+ DataChangeEvent dataChangeEvent, GenericRowData genericRowData) {
+ genericRowData.setField(
+ BEFORE.getPosition(),
+ jsonSerializers
+ .get(dataChangeEvent.tableId())
+ .getRowDataFromRecordData(dataChangeEvent.before(),
false));
+ genericRowData.setField(AFTER.getPosition(), null);
+ genericRowData.setField(OPERATION.getPosition(), OP_DELETE);
+ genericRowData.setField(
+ SOURCE.getPosition(),
+ GenericRowData.of(
+
StringData.fromString(dataChangeEvent.tableId().getSchemaName()),
+
StringData.fromString(dataChangeEvent.tableId().getTableName())));
+ }
+
+ private void convertUpdateEventToRowData(
+ DataChangeEvent dataChangeEvent, GenericRowData genericRowData) {
+ genericRowData.setField(
+ BEFORE.getPosition(),
+ jsonSerializers
+ .get(dataChangeEvent.tableId())
+ .getRowDataFromRecordData(dataChangeEvent.before(),
false));
+ genericRowData.setField(
+ AFTER.getPosition(),
+ jsonSerializers
+ .get(dataChangeEvent.tableId())
+ .getRowDataFromRecordData(dataChangeEvent.after(),
false));
+ genericRowData.setField(OPERATION.getPosition(), OP_UPDATE);
+ genericRowData.setField(
+ SOURCE.getPosition(),
+ GenericRowData.of(
+
StringData.fromString(dataChangeEvent.tableId().getSchemaName()),
+
StringData.fromString(dataChangeEvent.tableId().getTableName())));
+ }
+
/**
* Refer to <a
*
href="https://debezium.io/documentation/reference/1.9/connectors/mysql.html">Debezium
* docs</a> for more details.
*/
- private static RowType createJsonRowType(DataType databaseSchema) {
- return (RowType)
+ private static RowType createJsonRowType(
+ DataType databaseSchema, boolean isIncludedDebeziumSchema) {
+ DataType payloadRowType =
DataTypes.ROW(
- DataTypes.FIELD("before", databaseSchema),
- DataTypes.FIELD("after", databaseSchema),
- DataTypes.FIELD("op", DataTypes.STRING()),
- DataTypes.FIELD(
- "source",
- DataTypes.ROW(
- DataTypes.FIELD("db",
DataTypes.STRING()),
- DataTypes.FIELD("table",
DataTypes.STRING()))))
- .getLogicalType();
+ DataTypes.FIELD(BEFORE.getFieldName(), databaseSchema),
+ DataTypes.FIELD(AFTER.getFieldName(), databaseSchema),
+ DataTypes.FIELD(OPERATION.getFieldName(),
DataTypes.STRING()),
+ DataTypes.FIELD(
+ SOURCE.getFieldName(),
+ DataTypes.ROW(
+ DataTypes.FIELD(
+ DATABASE.getFieldName(),
DataTypes.STRING()),
+ DataTypes.FIELD(
+ TABLE.getFieldName(),
DataTypes.STRING()))));
+ if (isIncludedDebeziumSchema) {
+ return (RowType)
+ DataTypes.ROW(
+ DataTypes.FIELD(SCHEMA.getFieldName(),
DataTypes.STRING()),
+ DataTypes.FIELD(PAYLOAD.getFieldName(),
payloadRowType))
+ .getLogicalType();
+ }
+ return (RowType) payloadRowType.getLogicalType();
}
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonStruct.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonStruct.java
new file mode 100644
index 000000000..e1c314b9b
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonStruct.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.kafka.json.debezium;
+
+/** Debezium JSON struct. */
+public class DebeziumJsonStruct {
+
+ enum DebeziumStruct {
+ SCHEMA(0, "schema"),
+ PAYLOAD(1, "payload");
+
+ private final int position;
+ private final String fieldName;
+
+ DebeziumStruct(int position, String fieldName) {
+ this.position = position;
+ this.fieldName = fieldName;
+ }
+
+ public int getPosition() {
+ return position;
+ }
+
+ public String getFieldName() {
+ return fieldName;
+ }
+ }
+
+ enum DebeziumPayload {
+ BEFORE(0, "before"),
+ AFTER(1, "after"),
+ OPERATION(2, "op"),
+ SOURCE(3, "source");
+
+ private final int position;
+ private final String fieldName;
+
+ DebeziumPayload(int position, String fieldName) {
+ this.position = position;
+ this.fieldName = fieldName;
+ }
+
+ public int getPosition() {
+ return position;
+ }
+
+ public String getFieldName() {
+ return fieldName;
+ }
+ }
+
+ enum DebeziumSource {
+ DATABASE(0, "db"),
+ TABLE(1, "table");
+
+ private final int position;
+ private final String fieldName;
+
+ DebeziumSource(int position, String fieldName) {
+ this.position = position;
+ this.fieldName = fieldName;
+ }
+
+ public int getPosition() {
+ return position;
+ }
+
+ public String getFieldName() {
+ return fieldName;
+ }
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java
index 6a232849c..05090b30b 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java
@@ -40,6 +40,7 @@ import static
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.PA
import static
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.PROPERTIES_PREFIX;
import static
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_ADD_TABLEID_TO_HEADER_ENABLED;
import static
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_CUSTOM_HEADER;
+import static
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED;
import static
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_TABLE_ID_TO_TOPIC_MAPPING;
import static
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.TOPIC;
import static
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.VALUE_FORMAT;
@@ -131,6 +132,7 @@ public class KafkaDataSinkFactory implements
DataSinkFactory {
options.add(SINK_CUSTOM_HEADER);
options.add(KafkaDataSinkOptions.DELIVERY_GUARANTEE);
options.add(SINK_TABLE_ID_TO_TOPIC_MAPPING);
+ options.add(SINK_DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED);
return options;
}
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java
index 1b7bbf9bd..a8eeed8cf 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java
@@ -100,4 +100,11 @@ public class KafkaDataSinkOptions {
.text(
". For example, we can set
'sink.tableId-to-topic.mappingg' like
'mydb.mytable1:topic1;mydb.mytable2:topic2'.")
.build());
+
+ public static final ConfigOption<Boolean>
SINK_DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED =
+ key("sink.debezium-json.include-schema.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Optional. If this parameter is configured, each
debezium record will contain debezium schema information. Is only supported
when using debezium-json.");
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/utils/JsonRowDataSerializationSchemaUtils.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/utils/JsonRowDataSerializationSchemaUtils.java
index e021e0f1b..0cc850349 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/utils/JsonRowDataSerializationSchemaUtils.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/utils/JsonRowDataSerializationSchemaUtils.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonFormatOptions;
import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
+import org.apache.flink.formats.json.RowDataToJsonConverters;
import org.apache.flink.table.types.logical.RowType;
import java.lang.reflect.Constructor;
@@ -86,6 +87,49 @@ public class JsonRowDataSerializationSchemaUtils {
"Failed to find appropriate constructor for
JsonRowDataSerializationSchema,please check your Flink version is 1.19 or
1.20.");
}
+ /**
+ * In flink>=1.20, the constructor of RowDataToJsonConverters has 4
parameters, and in
+ * flink<1.20, the constructor of RowDataToJsonConverters has 3 parameters.
+ */
+ public static RowDataToJsonConverters createRowDataToJsonConverters(
+ TimestampFormat timestampFormat,
+ JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
+ String mapNullKeyLiteral,
+ boolean ignoreNullFields) {
+ try {
+ Class<?>[] fullParams =
+ new Class[] {
+ TimestampFormat.class,
+ JsonFormatOptions.MapNullKeyMode.class,
+ String.class,
+ boolean.class
+ };
+
+ Object[] fullParamValues =
+ new Object[] {
+ timestampFormat, mapNullKeyMode, mapNullKeyLiteral,
ignoreNullFields
+ };
+
+ for (int i = fullParams.length; i >= 3; i--) {
+ try {
+ Constructor<?> constructor =
+ RowDataToJsonConverters.class.getConstructor(
+ Arrays.copyOfRange(fullParams, 0, i));
+
+ return (RowDataToJsonConverters)
+
constructor.newInstance(Arrays.copyOfRange(fullParamValues, 0, i));
+ } catch (NoSuchMethodException ignored) {
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to create RowDataToJsonConverters,please check
your Flink version is 1.19 or 1.20.",
+ e);
+ }
+ throw new RuntimeException(
+ "Failed to find appropriate constructor for
RowDataToJsonConverters,please check your Flink version is 1.19 or 1.20.");
+ }
+
/** flink>=1.20 only has the ENCODE_IGNORE_NULL_FIELDS parameter. */
public static boolean enableIgnoreNullFields(ReadableConfig formatOptions)
{
try {
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
index f4826ca80..4c9933bf2 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
@@ -18,6 +18,9 @@
package org.apache.flink.cdc.connectors.kafka.json.debezium;
import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
@@ -40,7 +43,13 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.math.BigDecimal;
+import java.time.Instant;
import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+
+import static
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED;
/** Tests for {@link DebeziumJsonSerializationSchema}. */
class DebeziumJsonSerializationSchemaTest {
@@ -129,4 +138,112 @@ class DebeziumJsonSerializationSchemaTest {
actual = mapper.readTree(serializationSchema.serialize(updateEvent));
Assertions.assertThat(actual).isEqualTo(expected);
}
+
+ @Test
+ public void testSerializeWithSchemaAllDataTypes() throws Exception {
+ ObjectMapper mapper =
+ JacksonMapperFactory.createObjectMapper()
+
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
+ Map<String, String> properties = new HashMap<>();
+ properties.put(SINK_DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED.key(),
"true");
+ Configuration configuration = Configuration.fromMap(properties);
+ SerializationSchema<Event> serializationSchema =
+ ChangeLogJsonFormatFactory.createSerializationSchema(
+ configuration, JsonSerializationType.DEBEZIUM_JSON,
ZoneId.systemDefault());
+ serializationSchema.open(new MockInitializationContext());
+ // create table
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("_boolean", DataTypes.BOOLEAN(),
"_boolean comment")
+ .physicalColumn("_binary", DataTypes.BINARY(3))
+ .physicalColumn("_varbinary", DataTypes.VARBINARY(10))
+ .physicalColumn("_bytes", DataTypes.BYTES())
+ .physicalColumn("_tinyint", DataTypes.TINYINT())
+ .physicalColumn("_smallint", DataTypes.SMALLINT())
+ .physicalColumn("_int", DataTypes.INT())
+ .physicalColumn("_bigint", DataTypes.BIGINT())
+ .physicalColumn("_float", DataTypes.FLOAT())
+ .physicalColumn("_double", DataTypes.DOUBLE())
+ .physicalColumn("_decimal", DataTypes.DECIMAL(6, 3))
+ .physicalColumn("_char", DataTypes.CHAR(5))
+ .physicalColumn("_varchar", DataTypes.VARCHAR(10))
+ .physicalColumn("_string", DataTypes.STRING())
+ .physicalColumn("_date", DataTypes.DATE())
+ .physicalColumn("_time", DataTypes.TIME())
+ .physicalColumn("_time_6", DataTypes.TIME(6))
+ .physicalColumn("_timestamp", DataTypes.TIMESTAMP())
+ .physicalColumn("_timestamp_3", DataTypes.TIMESTAMP(3))
+ .physicalColumn("_timestamp_ltz",
DataTypes.TIMESTAMP_LTZ())
+ .physicalColumn("_timestamp_ltz_3",
DataTypes.TIMESTAMP_LTZ(3))
+ .physicalColumn("pt", DataTypes.STRING())
+ .primaryKey("pt")
+ .build();
+
+ RowType rowType =
+ RowType.of(
+ DataTypes.BOOLEAN(),
+ DataTypes.BINARY(3),
+ DataTypes.VARBINARY(10),
+ DataTypes.BYTES(),
+ DataTypes.TINYINT(),
+ DataTypes.SMALLINT(),
+ DataTypes.INT(),
+ DataTypes.BIGINT(),
+ DataTypes.FLOAT(),
+ DataTypes.DOUBLE(),
+ DataTypes.DECIMAL(6, 3),
+ DataTypes.CHAR(5),
+ DataTypes.VARCHAR(10),
+ DataTypes.STRING(),
+ DataTypes.DATE(),
+ DataTypes.TIME(),
+ DataTypes.TIME(6),
+ DataTypes.TIMESTAMP(),
+ DataTypes.TIMESTAMP(3),
+ DataTypes.TIMESTAMP_LTZ(),
+ DataTypes.TIMESTAMP_LTZ(3),
+ DataTypes.STRING());
+
+ CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1,
schema);
+ Assertions.assertNull(serializationSchema.serialize(createTableEvent));
+ BinaryRecordDataGenerator generator = new
BinaryRecordDataGenerator(rowType);
+ // insert
+ DataChangeEvent insertEvent1 =
+ DataChangeEvent.insertEvent(
+ TABLE_1,
+ generator.generate(
+ new Object[] {
+ true,
+ new byte[] {1, 2},
+ new byte[] {3, 4},
+ new byte[] {5, 6, 7},
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ 5.1f,
+ 6.2,
+ DecimalData.fromBigDecimal(new
BigDecimal("7.123"), 6, 3),
+ BinaryStringData.fromString("test1"),
+ BinaryStringData.fromString("test2"),
+ BinaryStringData.fromString("test3"),
+ 100,
+ 200,
+ 300,
+ TimestampData.fromTimestamp(
+
java.sql.Timestamp.valueOf("2023-01-01 00:00:00.000")),
+ TimestampData.fromTimestamp(
+
java.sql.Timestamp.valueOf("2023-01-01 00:00:00")),
+ LocalZonedTimestampData.fromInstant(
+
Instant.parse("2023-01-01T00:00:00.000Z")),
+ LocalZonedTimestampData.fromInstant(
+
Instant.parse("2023-01-01T00:00:00.000Z")),
+ null
+ }));
+ JsonNode expected =
+ mapper.readTree(
+
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"boolean\",\"optional\":true,\"doc\":\"_boolean
comment\",\"field\":\"_boolean\"},{\"type\":\"bytes\",\"optional\":true,\"name\":\"io.debezium.data.Bits\",\"version\":1,\"parameters\":{\"length\":\"3\"},\"field\":\"_binary\"},{\"type\":\"bytes\",\"optional\":true,\"name\":\"io.debezium.data.Bits\",\"version\":1,\"parameters\":{\"length\":\"10\"},\"field\":\"_varbinary\"},{\"t
[...]
+ JsonNode actual =
mapper.readTree(serializationSchema.serialize(insertEvent1));
+ Assertions.assertEquals(expected, actual);
+ }
}
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToKafkaE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToKafkaE2eITCase.java
index bdc313b35..e85924558 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToKafkaE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToKafkaE2eITCase.java
@@ -301,6 +301,82 @@ class MysqlToKafkaE2eITCase extends
PipelineTestEnvironment {
.containsExactlyInAnyOrderElementsOf(deserializeValues(collectedRecords));
}
+ @Test
+ public void testSyncWholeDatabaseWithDebeziumJsonHasSchema() throws
Exception {
+ String pipelineJob =
+ String.format(
+ "source:\n"
+ + " type: mysql\n"
+ + " hostname: %s\n"
+ + " port: 3306\n"
+ + " username: %s\n"
+ + " password: %s\n"
+ + " tables: %s.\\.*\n"
+ + " server-id: 5400-5404\n"
+ + " server-time-zone: UTC\n"
+ + "\n"
+ + "sink:\n"
+ + " type: kafka\n"
+ + " properties.bootstrap.servers:
kafka:9092\n"
+ + " topic: %s\n"
+ + "
sink.debezium-json.include-schema.enabled: true\n"
+ + "\n"
+ + "pipeline:\n"
+ + " parallelism: %d",
+ INTER_CONTAINER_MYSQL_ALIAS,
+ MYSQL_TEST_USER,
+ MYSQL_TEST_PASSWORD,
+ mysqlInventoryDatabase.getDatabaseName(),
+ topic,
+ parallelism);
+ Path mysqlCdcJar =
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
+ Path kafkaCdcJar =
TestUtils.getResource("kafka-cdc-pipeline-connector.jar");
+ Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
+ submitPipelineJob(pipelineJob, mysqlCdcJar, kafkaCdcJar,
mysqlDriverJar);
+ waitUntilJobRunning(Duration.ofSeconds(30));
+ LOG.info("Pipeline job is running");
+ List<ConsumerRecord<byte[], byte[]>> collectedRecords = new
ArrayList<>();
+ int expectedEventCount = 13;
+ waitUntilSpecificEventCount(collectedRecords, expectedEventCount);
+ List<String> expectedRecords =
+
getExpectedRecords("expectedEvents/mysqlToKafka/debezium-json-with-schema.txt");
+
assertThat(expectedRecords).containsAll(deserializeValues(collectedRecords));
+ LOG.info("Begin incremental reading stage.");
+ // generate binlogs
+ String mysqlJdbcUrl =
+ String.format(
+ "jdbc:mysql://%s:%s/%s",
+ MYSQL.getHost(),
+ MYSQL.getDatabasePort(),
+ mysqlInventoryDatabase.getDatabaseName());
+ try (Connection conn =
+ DriverManager.getConnection(
+ mysqlJdbcUrl, MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD);
+ Statement stat = conn.createStatement()) {
+ stat.execute("UPDATE products SET description='18oz carpenter
hammer' WHERE id=106;");
+ stat.execute("UPDATE products SET weight='5.1' WHERE id=107;");
+
+ // modify table schema
+ stat.execute("ALTER TABLE products ADD COLUMN new_col INT;");
+ stat.execute(
+ "INSERT INTO products VALUES (default,'jacket','water
resistent white wind breaker',0.2, null, null, null, 1);"); // 110
+ stat.execute(
+ "INSERT INTO products VALUES (default,'scooter','Big
2-wheel scooter ',5.18, null, null, null, 1);"); // 111
+ stat.execute(
+ "UPDATE products SET description='new water resistent
white wind breaker', weight='0.5' WHERE id=110;");
+ stat.execute("UPDATE products SET weight='5.17' WHERE id=111;");
+ stat.execute("DELETE FROM products WHERE id=111;");
+ } catch (SQLException e) {
+ LOG.error("Update table for CDC failed.", e);
+ throw e;
+ }
+
+ expectedEventCount = 20;
+ waitUntilSpecificEventCount(collectedRecords, expectedEventCount);
+ assertThat(expectedRecords)
+
.containsExactlyInAnyOrderElementsOf(deserializeValues(collectedRecords));
+ }
+
private void waitUntilSpecificEventCount(
List<ConsumerRecord<byte[], byte[]>> actualEvent, int
expectedCount) throws Exception {
boolean result = false;
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/expectedEvents/mysqlToKafka/debezium-json-with-schema.txt
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/expectedEvents/mysqlToKafka/debezium-json-with-schema.txt
new file mode 100644
index 000000000..deeed6cb5
--- /dev/null
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/expectedEvents/mysqlToKafka/debezium-json-with-schema.txt
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"address"},{"type":"string","optional":true,"field":"phone_number"}],"optional":true,"field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional"
[...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"address"},{"type":"string","optional":true,"field":"phone_number"}],"optional":true,"field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional"
[...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"address"},{"type":"string","optional":true,"field":"phone_number"}],"optional":true,"field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional"
[...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"address"},{"type":"string","optional":true,"field":"phone_number"}],"optional":true,"field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional"
[...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"default":"red","field":"enum_c"},{"type":"string","optional":true,"field":"json_c"},{"type":"string","optional":true,"field":"point_c"}],"optional":true,"field":"before"},{"
[...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"default":"red","field":"enum_c"},{"type":"string","optional":true,"field":"json_c"},{"type":"string","optional":true,"field":"point_c"}],"optional":true,"field":"before"},{"
[...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"default":"red","field":"enum_c"},{"type":"string","optional":true,"field":"json_c"},{"type":"string","optional":true,"field":"point_c"}],"optional":true,"field":"before"},{"
[...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"default":"red","field":"enum_c"},{"type":"string","optional":true,"field":"json_c"},{"type":"string","optional":true,"field":"point_c"}],"optional":true,"field":"before"},{"
[...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"default":"red","field":"enum_c"},{"type":"string","optional":true,"field":"json_c"},{"type":"string","optional":true,"field":"point_c"}],"optional":true,"field":"before"},{"
[...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"default":"red","field":"enum_c"},{"type":"string","optional":true,"field":"json_c"},{"type":"string","optional":true,"field":"point_c"}],"optional":true,"field":"before"},{"
[...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"default":"red","field":"enum_c"},{"type":"string","optional":true,"field":"json_c"},{"type":"string","optional":true,"field":"point_c"}],"optional":true,"field":"before"},{"
[...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"default":"red","field":"enum_c"},{"type":"string","optional":true,"field":"json_c"},{"type":"string","optional":true,"field":"point_c"}],"optional":true,"field":"before"},{"
[...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"default":"red","field":"enum_c"},{"type":"string","optional":true,"field":"json_c"},{"type":"string","optional":true,"field":"point_c"}],"optional":true,"field":"before"},{"
[...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"default":"red","field":"enum_c"},{"type":"string","optional":true,"field":"json_c"},{"type":"string","optional":true,"field":"point_c"}],"optional":true,"field":"before"},{"
[...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"default":"red","field":"enum_c"},{"type":"string","optional":true,"field":"json_c"},{"type":"string","optional":true,"field":"point_c"}],"optional":true,"field":"before"},{"
[...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"default":"red","field":"enum_c"},{"type":"string","optional":true,"field":"json_c"},{"type":"string","optional":true,"field":"point_c"},{"type":"int32","optional":true,"fiel
[...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"default":"red","field":"enum_c"},{"type":"string","optional":true,"field":"json_c"},{"type":"string","optional":true,"field":"point_c"},{"type":"int32","optional":true,"fiel
[...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"default":"red","field":"enum_c"},{"type":"string","optional":true,"field":"json_c"},{"type":"string","optional":true,"field":"point_c"},{"type":"int32","optional":true,"fiel
[...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"default":"red","field":"enum_c"},{"type":"string","optional":true,"field":"json_c"},{"type":"string","optional":true,"field":"point_c"},{"type":"int32","optional":true,"fiel
[...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"default":"red","field":"enum_c"},{"type":"string","optional":true,"field":"json_c"},{"type":"string","optional":true,"field":"point_c"},{"type":"int32","optional":true,"fiel
[...]
\ No newline at end of file