This is an automated email from the ASF dual-hosted git repository.

lvyanquan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new be7d37489 [FLINK-39757] Fix kafka sink could not serialize debezium 
json with column default values (#4416)
be7d37489 is described below

commit be7d37489f217b218e5cb2fb74ae2e07bb786197
Author: haruki <[email protected]>
AuthorDate: Sun May 31 16:51:11 2026 +0800

    [FLINK-39757] Fix kafka sink could not serialize debezium json with column 
default values (#4416)
    
    When a column has a non-string default value expression, Debezium JSON 
serialization fails.
    This fix adds a convertDefaultValue method that converts the string default 
value expression to the correct Java type matching the Debezium schema, 
supporting BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DECIMAL, 
DATE, TIME, TIMESTAMP, BINARY, VARBINARY, and other types. Corresponding test 
cases are also added.
    
    Co-authored-by: 春栖 <[email protected]>
---
 .../debezium/DebeziumJsonSerializationSchema.java  |  61 ++++++++-
 .../DebeziumJsonSerializationSchemaTest.java       | 138 +++++++++++++++++++++
 2 files changed, 198 insertions(+), 1 deletion(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
index c70b11744..cbda93dfd 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
@@ -53,7 +53,11 @@ import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.json.JsonConverter;
 import org.apache.kafka.connect.storage.ConverterConfig;
 import org.apache.kafka.connect.storage.ConverterType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.math.BigDecimal;
+import java.math.RoundingMode;
 import java.time.ZoneId;
 import java.util.HashMap;
 import java.util.List;
@@ -80,6 +84,9 @@ import static 
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDa
 public class DebeziumJsonSerializationSchema implements 
SerializationSchema<Event> {
     private static final long serialVersionUID = 1L;
 
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DebeziumJsonSerializationSchema.class);
+
     private static final StringData OP_INSERT = StringData.fromString("c"); // 
insert
     private static final StringData OP_DELETE = StringData.fromString("d"); // 
delete
     private static final StringData OP_UPDATE = StringData.fromString("u"); // 
update
@@ -260,7 +267,11 @@ public class DebeziumJsonSerializationSchema implements 
SerializationSchema<Even
             field.required();
         }
         if (column.getDefaultValueExpression() != null) {
-            field.defaultValue(column.getDefaultValueExpression());
+            Object convertedDefault =
+                    convertDefaultValue(column.getDefaultValueExpression(), 
columnType);
+            if (convertedDefault != null) {
+                field.defaultValue(convertedDefault);
+            }
         }
         if (column.getComment() != null) {
             field.doc(column.getComment());
@@ -268,6 +279,54 @@ public class DebeziumJsonSerializationSchema implements 
SerializationSchema<Even
         return field;
     }
 
+    /**
+     * Convert a default value expression string to the Java object matching 
the Debezium schema
+     * type.
+     */
+    private static Object convertDefaultValue(
+            String defaultValueExpression, 
org.apache.flink.cdc.common.types.DataType columnType) {
+        try {
+            switch (columnType.getTypeRoot()) {
+                case BOOLEAN:
+                    return Boolean.parseBoolean(defaultValueExpression);
+                case TINYINT:
+                case SMALLINT:
+                    return Short.parseShort(defaultValueExpression);
+                case INTEGER:
+                case DATE:
+                    return Integer.parseInt(defaultValueExpression);
+                case BIGINT:
+                case TIME_WITHOUT_TIME_ZONE:
+                case TIMESTAMP_WITHOUT_TIME_ZONE:
+                case TIMESTAMP_WITH_TIME_ZONE:
+                    return Long.parseLong(defaultValueExpression);
+                case FLOAT:
+                    return Float.parseFloat(defaultValueExpression);
+                case DOUBLE:
+                    return Double.parseDouble(defaultValueExpression);
+                case DECIMAL:
+                    DecimalType decimalType = (DecimalType) columnType;
+                    return new BigDecimal(defaultValueExpression)
+                            .setScale(decimalType.getScale(), 
RoundingMode.HALF_UP);
+                case BINARY:
+                case VARBINARY:
+                    return defaultValueExpression.getBytes();
+                case CHAR:
+                case VARCHAR:
+                case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                default:
+                    return defaultValueExpression;
+            }
+        } catch (NumberFormatException e) {
+            LOG.warn(
+                    "Failed to convert default value '{}' for type {}, 
skipping default value.",
+                    defaultValueExpression,
+                    columnType.getTypeRoot(),
+                    e);
+            return null;
+        }
+    }
+
     private static SchemaBuilder convertCDCDataTypeToDebeziumDataType(
             org.apache.flink.cdc.common.types.DataType columnType) {
         final SchemaBuilder field;
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
index c2e026f77..8ad302560 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
@@ -341,6 +341,144 @@ class DebeziumJsonSerializationSchemaTest {
         assertThat(rowNode.has("f2")).isTrue();
     }
 
+    @Test
+    void testSerializeWithNonStringDefaultValues() throws Exception {
+        ObjectMapper mapper =
+                JacksonMapperFactory.createObjectMapper()
+                        
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
+        Map<String, String> properties = new HashMap<>();
+        properties.put("include-schema.enabled", "true");
+        Configuration configuration = Configuration.fromMap(properties);
+        SerializationSchema<Event> serializationSchema =
+                ChangeLogJsonFormatFactory.createSerializationSchema(
+                        configuration, JsonSerializationType.DEBEZIUM_JSON, 
ZoneId.systemDefault());
+        serializationSchema.open(new MockInitializationContext());
+
+        // create table covering all types supported by convertDefaultValue
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("_boolean", DataTypes.BOOLEAN(), null, 
"true")
+                        .physicalColumn("_tinyint", DataTypes.TINYINT(), null, 
"1")
+                        .physicalColumn("_smallint", DataTypes.SMALLINT(), 
null, "5")
+                        .physicalColumn("_int", DataTypes.INT(), null, "10")
+                        .physicalColumn("_bigint", DataTypes.BIGINT(), null, 
"0")
+                        .physicalColumn("_float", DataTypes.FLOAT(), null, 
"1.5")
+                        .physicalColumn("_double", DataTypes.DOUBLE(), null, 
"3.14")
+                        .physicalColumn("_decimal", DataTypes.DECIMAL(10, 2), 
null, "99.99")
+                        .physicalColumn("_char", DataTypes.CHAR(5), null, 
"abc")
+                        .physicalColumn("_varchar", DataTypes.VARCHAR(10), 
null, "hello")
+                        .physicalColumn("_string", DataTypes.STRING(), null, 
"unknown")
+                        .physicalColumn("_date", DataTypes.DATE(), null, "100")
+                        .physicalColumn("_time", DataTypes.TIME(), null, 
"200000")
+                        .physicalColumn(
+                                "_timestamp", DataTypes.TIMESTAMP(), null, 
"1672531200000000")
+                        .physicalColumn(
+                                "_timestamp_3", DataTypes.TIMESTAMP(3), null, 
"1672531200000")
+                        .physicalColumn(
+                                "_timestamp_ltz",
+                                DataTypes.TIMESTAMP_LTZ(),
+                                null,
+                                "2023-01-01T00:00:00Z")
+                        .physicalColumn("_binary", DataTypes.BINARY(3), null, 
"bin")
+                        .primaryKey("_bigint")
+                        .build();
+
+        RowType rowType =
+                RowType.of(
+                        DataTypes.BOOLEAN(),
+                        DataTypes.TINYINT(),
+                        DataTypes.SMALLINT(),
+                        DataTypes.INT(),
+                        DataTypes.BIGINT(),
+                        DataTypes.FLOAT(),
+                        DataTypes.DOUBLE(),
+                        DataTypes.DECIMAL(10, 2),
+                        DataTypes.CHAR(5),
+                        DataTypes.VARCHAR(10),
+                        DataTypes.STRING(),
+                        DataTypes.DATE(),
+                        DataTypes.TIME(),
+                        DataTypes.TIMESTAMP(),
+                        DataTypes.TIMESTAMP(3),
+                        DataTypes.TIMESTAMP_LTZ(),
+                        DataTypes.BINARY(3));
+
+        CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1, 
schema);
+        // This should not throw - previously would fail with
+        // "Invalid Java object for schema with type INT64: class 
java.lang.String"
+        assertThat(serializationSchema.serialize(createTableEvent)).isNull();
+
+        BinaryRecordDataGenerator generator = new 
BinaryRecordDataGenerator(rowType);
+
+        DataChangeEvent insertEvent =
+                DataChangeEvent.insertEvent(
+                        TABLE_1,
+                        generator.generate(
+                                new Object[] {
+                                    true,
+                                    (byte) 1,
+                                    (short) 7,
+                                    42,
+                                    1L,
+                                    2.5f,
+                                    9.99,
+                                    DecimalData.fromBigDecimal(new 
BigDecimal("123.45"), 10, 2),
+                                    BinaryStringData.fromString("test1"),
+                                    BinaryStringData.fromString("test2"),
+                                    BinaryStringData.fromString("test3"),
+                                    DateData.fromEpochDay(100),
+                                    TimeData.fromNanoOfDay(200_000_000L),
+                                    TimestampData.fromTimestamp(
+                                            
java.sql.Timestamp.valueOf("2023-01-01 00:00:00.000")),
+                                    TimestampData.fromTimestamp(
+                                            
java.sql.Timestamp.valueOf("2023-01-01 00:00:00")),
+                                    LocalZonedTimestampData.fromInstant(
+                                            
Instant.parse("2023-01-01T00:00:00.000Z")),
+                                    new byte[] {1, 2, 3}
+                                }));
+
+        byte[] serialized = serializationSchema.serialize(insertEvent);
+        JsonNode actual = mapper.readTree(serialized);
+
+        String fieldsSchema =
+                
"{\"type\":\"boolean\",\"optional\":true,\"default\":true,\"field\":\"_boolean\"},"
+                        + 
"{\"type\":\"int16\",\"optional\":true,\"default\":1,\"field\":\"_tinyint\"},"
+                        + 
"{\"type\":\"int16\",\"optional\":true,\"default\":5,\"field\":\"_smallint\"},"
+                        + 
"{\"type\":\"int32\",\"optional\":true,\"default\":10,\"field\":\"_int\"},"
+                        + 
"{\"type\":\"int64\",\"optional\":true,\"default\":0,\"field\":\"_bigint\"},"
+                        + 
"{\"type\":\"float\",\"optional\":true,\"default\":1.5,\"field\":\"_float\"},"
+                        + 
"{\"type\":\"double\",\"optional\":true,\"default\":3.14,\"field\":\"_double\"},"
+                        + 
"{\"type\":\"bytes\",\"optional\":true,\"name\":\"org.apache.kafka.connect.data.Decimal\",\"version\":1,\"parameters\":{\"scale\":\"2\",\"connect.decimal.precision\":\"10\"},\"default\":\"Jw8=\",\"field\":\"_decimal\"},"
+                        + 
"{\"type\":\"string\",\"optional\":true,\"default\":\"abc\",\"field\":\"_char\"},"
+                        + 
"{\"type\":\"string\",\"optional\":true,\"default\":\"hello\",\"field\":\"_varchar\"},"
+                        + 
"{\"type\":\"string\",\"optional\":true,\"default\":\"unknown\",\"field\":\"_string\"},"
+                        + 
"{\"type\":\"int32\",\"optional\":true,\"name\":\"io.debezium.time.Date\",\"version\":1,\"default\":100,\"field\":\"_date\"},"
+                        + 
"{\"type\":\"int64\",\"optional\":true,\"name\":\"io.debezium.time.MicroTime\",\"version\":1,\"default\":200000,\"field\":\"_time\"},"
+                        + 
"{\"type\":\"int64\",\"optional\":true,\"name\":\"io.debezium.time.MicroTimestamp\",\"version\":1,\"default\":1672531200000000,\"field\":\"_timestamp\"},"
+                        + 
"{\"type\":\"int64\",\"optional\":true,\"name\":\"io.debezium.time.Timestamp\",\"version\":1,\"default\":1672531200000,\"field\":\"_timestamp_3\"},"
+                        + 
"{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.time.ZonedTimestamp\",\"version\":1,\"default\":\"2023-01-01T00:00:00Z\",\"field\":\"_timestamp_ltz\"},"
+                        + 
"{\"type\":\"bytes\",\"optional\":true,\"name\":\"io.debezium.data.Bits\",\"version\":1,\"parameters\":{\"length\":\"3\"},\"default\":\"Ymlu\",\"field\":\"_binary\"}";
+        JsonNode expected =
+                mapper.readTree(
+                        "{\"schema\":{\"type\":\"struct\",\"fields\":["
+                                + "{\"type\":\"struct\",\"fields\":["
+                                + fieldsSchema
+                                + "],\"optional\":true,\"field\":\"before\"},"
+                                + "{\"type\":\"struct\",\"fields\":["
+                                + fieldsSchema
+                                + "],\"optional\":true,\"field\":\"after\"}"
+                                + "],\"optional\":false},"
+                                + "\"payload\":{\"before\":null,\"after\":"
+                                + 
"{\"_boolean\":true,\"_tinyint\":1,\"_smallint\":7,\"_int\":42,\"_bigint\":1,"
+                                + 
"\"_float\":2.5,\"_double\":9.99,\"_decimal\":123.45,"
+                                + 
"\"_char\":\"test1\",\"_varchar\":\"test2\",\"_string\":\"test3\","
+                                + 
"\"_date\":\"1970-04-11\",\"_time\":\"00:00:00\","
+                                + "\"_timestamp\":\"2023-01-01 
00:00:00\",\"_timestamp_3\":\"2023-01-01 00:00:00\","
+                                + "\"_timestamp_ltz\":\"2023-01-01 
00:00:00Z\",\"_binary\":\"AQID\"},"
+                                + 
"\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}}");
+        assertThat(actual).isEqualTo(expected);
+    }
+
     @Test
     void testSerializeWithSchemaComplexTypes() throws Exception {
         ObjectMapper mapper =

Reply via email to