This is an automated email from the ASF dual-hosted git repository.
lvyanquan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new be7d37489 [FLINK-39757] Fix kafka sink could not serialize debezium
json with column default values (#4416)
be7d37489 is described below
commit be7d37489f217b218e5cb2fb74ae2e07bb786197
Author: haruki <[email protected]>
AuthorDate: Sun May 31 16:51:11 2026 +0800
[FLINK-39757] Fix kafka sink could not serialize debezium json with column
default values (#4416)
When a column has a non-string default value expression, Debezium JSON
serialization fails.
This fix adds a convertDefaultValue method that converts the string default
value expression to the correct Java type matching the Debezium schema,
supporting BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DECIMAL,
DATE, TIME, TIMESTAMP, BINARY, VARBINARY, and other types. Corresponding test
cases are also added.
Co-authored-by: 春栖 <[email protected]>
---
.../debezium/DebeziumJsonSerializationSchema.java | 61 ++++++++-
.../DebeziumJsonSerializationSchemaTest.java | 138 +++++++++++++++++++++
2 files changed, 198 insertions(+), 1 deletion(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
index c70b11744..cbda93dfd 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
@@ -53,7 +53,11 @@ import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.storage.ConverterConfig;
import org.apache.kafka.connect.storage.ConverterType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.List;
@@ -80,6 +84,9 @@ import static
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDa
public class DebeziumJsonSerializationSchema implements
SerializationSchema<Event> {
private static final long serialVersionUID = 1L;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DebeziumJsonSerializationSchema.class);
+
private static final StringData OP_INSERT = StringData.fromString("c"); //
insert
private static final StringData OP_DELETE = StringData.fromString("d"); //
delete
private static final StringData OP_UPDATE = StringData.fromString("u"); //
update
@@ -260,7 +267,11 @@ public class DebeziumJsonSerializationSchema implements
SerializationSchema<Even
field.required();
}
if (column.getDefaultValueExpression() != null) {
- field.defaultValue(column.getDefaultValueExpression());
+ Object convertedDefault =
+ convertDefaultValue(column.getDefaultValueExpression(),
columnType);
+ if (convertedDefault != null) {
+ field.defaultValue(convertedDefault);
+ }
}
if (column.getComment() != null) {
field.doc(column.getComment());
@@ -268,6 +279,54 @@ public class DebeziumJsonSerializationSchema implements
SerializationSchema<Even
return field;
}
+ /**
+ * Convert a default value expression string to the Java object matching
the Debezium schema
+ * type.
+ */
+ private static Object convertDefaultValue(
+ String defaultValueExpression,
org.apache.flink.cdc.common.types.DataType columnType) {
+ try {
+ switch (columnType.getTypeRoot()) {
+ case BOOLEAN:
+ return Boolean.parseBoolean(defaultValueExpression);
+ case TINYINT:
+ case SMALLINT:
+ return Short.parseShort(defaultValueExpression);
+ case INTEGER:
+ case DATE:
+ return Integer.parseInt(defaultValueExpression);
+ case BIGINT:
+ case TIME_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_TIME_ZONE:
+ return Long.parseLong(defaultValueExpression);
+ case FLOAT:
+ return Float.parseFloat(defaultValueExpression);
+ case DOUBLE:
+ return Double.parseDouble(defaultValueExpression);
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) columnType;
+ return new BigDecimal(defaultValueExpression)
+ .setScale(decimalType.getScale(),
RoundingMode.HALF_UP);
+ case BINARY:
+ case VARBINARY:
+ return defaultValueExpression.getBytes();
+ case CHAR:
+ case VARCHAR:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ default:
+ return defaultValueExpression;
+ }
+ } catch (NumberFormatException e) {
+ LOG.warn(
+ "Failed to convert default value '{}' for type {},
skipping default value.",
+ defaultValueExpression,
+ columnType.getTypeRoot(),
+ e);
+ return null;
+ }
+ }
+
private static SchemaBuilder convertCDCDataTypeToDebeziumDataType(
org.apache.flink.cdc.common.types.DataType columnType) {
final SchemaBuilder field;
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
index c2e026f77..8ad302560 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
@@ -341,6 +341,144 @@ class DebeziumJsonSerializationSchemaTest {
assertThat(rowNode.has("f2")).isTrue();
}
+ @Test
+ void testSerializeWithNonStringDefaultValues() throws Exception {
+ ObjectMapper mapper =
+ JacksonMapperFactory.createObjectMapper()
+
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
+ Map<String, String> properties = new HashMap<>();
+ properties.put("include-schema.enabled", "true");
+ Configuration configuration = Configuration.fromMap(properties);
+ SerializationSchema<Event> serializationSchema =
+ ChangeLogJsonFormatFactory.createSerializationSchema(
+ configuration, JsonSerializationType.DEBEZIUM_JSON,
ZoneId.systemDefault());
+ serializationSchema.open(new MockInitializationContext());
+
+ // create table covering all types supported by convertDefaultValue
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("_boolean", DataTypes.BOOLEAN(), null,
"true")
+ .physicalColumn("_tinyint", DataTypes.TINYINT(), null,
"1")
+ .physicalColumn("_smallint", DataTypes.SMALLINT(),
null, "5")
+ .physicalColumn("_int", DataTypes.INT(), null, "10")
+ .physicalColumn("_bigint", DataTypes.BIGINT(), null,
"0")
+ .physicalColumn("_float", DataTypes.FLOAT(), null,
"1.5")
+ .physicalColumn("_double", DataTypes.DOUBLE(), null,
"3.14")
+ .physicalColumn("_decimal", DataTypes.DECIMAL(10, 2),
null, "99.99")
+ .physicalColumn("_char", DataTypes.CHAR(5), null,
"abc")
+ .physicalColumn("_varchar", DataTypes.VARCHAR(10),
null, "hello")
+ .physicalColumn("_string", DataTypes.STRING(), null,
"unknown")
+ .physicalColumn("_date", DataTypes.DATE(), null, "100")
+ .physicalColumn("_time", DataTypes.TIME(), null,
"200000")
+ .physicalColumn(
+ "_timestamp", DataTypes.TIMESTAMP(), null,
"1672531200000000")
+ .physicalColumn(
+ "_timestamp_3", DataTypes.TIMESTAMP(3), null,
"1672531200000")
+ .physicalColumn(
+ "_timestamp_ltz",
+ DataTypes.TIMESTAMP_LTZ(),
+ null,
+ "2023-01-01T00:00:00Z")
+ .physicalColumn("_binary", DataTypes.BINARY(3), null,
"bin")
+ .primaryKey("_bigint")
+ .build();
+
+ RowType rowType =
+ RowType.of(
+ DataTypes.BOOLEAN(),
+ DataTypes.TINYINT(),
+ DataTypes.SMALLINT(),
+ DataTypes.INT(),
+ DataTypes.BIGINT(),
+ DataTypes.FLOAT(),
+ DataTypes.DOUBLE(),
+ DataTypes.DECIMAL(10, 2),
+ DataTypes.CHAR(5),
+ DataTypes.VARCHAR(10),
+ DataTypes.STRING(),
+ DataTypes.DATE(),
+ DataTypes.TIME(),
+ DataTypes.TIMESTAMP(),
+ DataTypes.TIMESTAMP(3),
+ DataTypes.TIMESTAMP_LTZ(),
+ DataTypes.BINARY(3));
+
+ CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1,
schema);
+ // This should not throw - previously would fail with
+ // "Invalid Java object for schema with type INT64: class
java.lang.String"
+ assertThat(serializationSchema.serialize(createTableEvent)).isNull();
+
+ BinaryRecordDataGenerator generator = new
BinaryRecordDataGenerator(rowType);
+
+ DataChangeEvent insertEvent =
+ DataChangeEvent.insertEvent(
+ TABLE_1,
+ generator.generate(
+ new Object[] {
+ true,
+ (byte) 1,
+ (short) 7,
+ 42,
+ 1L,
+ 2.5f,
+ 9.99,
+ DecimalData.fromBigDecimal(new
BigDecimal("123.45"), 10, 2),
+ BinaryStringData.fromString("test1"),
+ BinaryStringData.fromString("test2"),
+ BinaryStringData.fromString("test3"),
+ DateData.fromEpochDay(100),
+ TimeData.fromNanoOfDay(200_000_000L),
+ TimestampData.fromTimestamp(
+
java.sql.Timestamp.valueOf("2023-01-01 00:00:00.000")),
+ TimestampData.fromTimestamp(
+
java.sql.Timestamp.valueOf("2023-01-01 00:00:00")),
+ LocalZonedTimestampData.fromInstant(
+
Instant.parse("2023-01-01T00:00:00.000Z")),
+ new byte[] {1, 2, 3}
+ }));
+
+ byte[] serialized = serializationSchema.serialize(insertEvent);
+ JsonNode actual = mapper.readTree(serialized);
+
+ String fieldsSchema =
+
"{\"type\":\"boolean\",\"optional\":true,\"default\":true,\"field\":\"_boolean\"},"
+ +
"{\"type\":\"int16\",\"optional\":true,\"default\":1,\"field\":\"_tinyint\"},"
+ +
"{\"type\":\"int16\",\"optional\":true,\"default\":5,\"field\":\"_smallint\"},"
+ +
"{\"type\":\"int32\",\"optional\":true,\"default\":10,\"field\":\"_int\"},"
+ +
"{\"type\":\"int64\",\"optional\":true,\"default\":0,\"field\":\"_bigint\"},"
+ +
"{\"type\":\"float\",\"optional\":true,\"default\":1.5,\"field\":\"_float\"},"
+ +
"{\"type\":\"double\",\"optional\":true,\"default\":3.14,\"field\":\"_double\"},"
+ +
"{\"type\":\"bytes\",\"optional\":true,\"name\":\"org.apache.kafka.connect.data.Decimal\",\"version\":1,\"parameters\":{\"scale\":\"2\",\"connect.decimal.precision\":\"10\"},\"default\":\"Jw8=\",\"field\":\"_decimal\"},"
+ +
"{\"type\":\"string\",\"optional\":true,\"default\":\"abc\",\"field\":\"_char\"},"
+ +
"{\"type\":\"string\",\"optional\":true,\"default\":\"hello\",\"field\":\"_varchar\"},"
+ +
"{\"type\":\"string\",\"optional\":true,\"default\":\"unknown\",\"field\":\"_string\"},"
+ +
"{\"type\":\"int32\",\"optional\":true,\"name\":\"io.debezium.time.Date\",\"version\":1,\"default\":100,\"field\":\"_date\"},"
+ +
"{\"type\":\"int64\",\"optional\":true,\"name\":\"io.debezium.time.MicroTime\",\"version\":1,\"default\":200000,\"field\":\"_time\"},"
+ +
"{\"type\":\"int64\",\"optional\":true,\"name\":\"io.debezium.time.MicroTimestamp\",\"version\":1,\"default\":1672531200000000,\"field\":\"_timestamp\"},"
+ +
"{\"type\":\"int64\",\"optional\":true,\"name\":\"io.debezium.time.Timestamp\",\"version\":1,\"default\":1672531200000,\"field\":\"_timestamp_3\"},"
+ +
"{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.time.ZonedTimestamp\",\"version\":1,\"default\":\"2023-01-01T00:00:00Z\",\"field\":\"_timestamp_ltz\"},"
+ +
"{\"type\":\"bytes\",\"optional\":true,\"name\":\"io.debezium.data.Bits\",\"version\":1,\"parameters\":{\"length\":\"3\"},\"default\":\"Ymlu\",\"field\":\"_binary\"}";
+ JsonNode expected =
+ mapper.readTree(
+ "{\"schema\":{\"type\":\"struct\",\"fields\":["
+ + "{\"type\":\"struct\",\"fields\":["
+ + fieldsSchema
+ + "],\"optional\":true,\"field\":\"before\"},"
+ + "{\"type\":\"struct\",\"fields\":["
+ + fieldsSchema
+ + "],\"optional\":true,\"field\":\"after\"}"
+ + "],\"optional\":false},"
+ + "\"payload\":{\"before\":null,\"after\":"
+ +
"{\"_boolean\":true,\"_tinyint\":1,\"_smallint\":7,\"_int\":42,\"_bigint\":1,"
+ +
"\"_float\":2.5,\"_double\":9.99,\"_decimal\":123.45,"
+ +
"\"_char\":\"test1\",\"_varchar\":\"test2\",\"_string\":\"test3\","
+ +
"\"_date\":\"1970-04-11\",\"_time\":\"00:00:00\","
+ + "\"_timestamp\":\"2023-01-01
00:00:00\",\"_timestamp_3\":\"2023-01-01 00:00:00\","
+ + "\"_timestamp_ltz\":\"2023-01-01
00:00:00Z\",\"_binary\":\"AQID\"},"
+ +
"\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}}");
+ assertThat(actual).isEqualTo(expected);
+ }
+
@Test
void testSerializeWithSchemaComplexTypes() throws Exception {
ObjectMapper mapper =