This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 9128a6bbd [Kafka] Clean up code in the 
`flink-cdc-pipeline-connector-kafka` module that is compatible with older 
versions of Flink. (#4347)
9128a6bbd is described below

commit 9128a6bbd099aafbab9bbdd647953c6b312b368b
Author: Pei Yu <[email protected]>
AuthorDate: Sun Mar 29 13:48:33 2026 +0800

    [Kafka] Clean up code in the `flink-cdc-pipeline-connector-kafka` module 
that is compatible with older versions of Flink. (#4347)
    
    Signed-off-by: Pei Yu <[email protected]>
---
 .../kafka/json/ChangeLogJsonFormatFactory.java     |   3 +-
 .../json/canal/CanalJsonSerializationSchema.java   |   3 +-
 .../DebeziumJsonRowDataSerializationSchema.java    |   3 +-
 .../serialization/JsonSerializationSchema.java     |   3 +-
 .../kafka/sink/KeySerializationFactory.java        |   4 +-
 .../utils/JsonRowDataSerializationSchemaUtils.java | 143 ---------------------
 6 files changed, 5 insertions(+), 154 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java
index a6dcdcd1e..88644de30 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java
@@ -22,7 +22,6 @@ import 
org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.cdc.common.event.Event;
 import 
org.apache.flink.cdc.connectors.kafka.json.canal.CanalJsonSerializationSchema;
 import 
org.apache.flink.cdc.connectors.kafka.json.debezium.DebeziumJsonSerializationSchema;
-import 
org.apache.flink.cdc.connectors.kafka.utils.JsonRowDataSerializationSchemaUtils;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.formats.json.JsonFormatOptions;
@@ -70,7 +69,7 @@ public class ChangeLogJsonFormatFactory {
                 formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
 
         final boolean ignoreNullFields =
-                
JsonRowDataSerializationSchemaUtils.enableIgnoreNullFields(formatOptions);
+                formatOptions.get(JsonFormatOptions.ENCODE_IGNORE_NULL_FIELDS);
 
         switch (type) {
             case DEBEZIUM_JSON:
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java
index c95b310e9..a9a0209a6 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java
@@ -27,7 +27,6 @@ import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
 import org.apache.flink.cdc.common.utils.SchemaUtils;
 import org.apache.flink.cdc.connectors.kafka.json.TableSchemaInfo;
-import 
org.apache.flink.cdc.connectors.kafka.utils.JsonRowDataSerializationSchemaUtils;
 import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.formats.json.JsonFormatOptions;
 import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
@@ -119,7 +118,7 @@ public class CanalJsonSerializationSchema implements 
SerializationSchema<Event>
             LogicalType rowType =
                     
DataTypeUtils.toFlinkDataType(schema.toRowDataType()).getLogicalType();
             JsonRowDataSerializationSchema jsonSerializer =
-                    
JsonRowDataSerializationSchemaUtils.createSerializationSchema(
+                    new JsonRowDataSerializationSchema(
                             createJsonRowType(fromLogicalToDataType(rowType)),
                             timestampFormat,
                             mapNullKeyMode,
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonRowDataSerializationSchema.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonRowDataSerializationSchema.java
index 349dee4b8..e26b0659f 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonRowDataSerializationSchema.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonRowDataSerializationSchema.java
@@ -18,7 +18,6 @@
 package org.apache.flink.cdc.connectors.kafka.json.debezium;
 
 import org.apache.flink.api.common.serialization.SerializationSchema;
-import 
org.apache.flink.cdc.connectors.kafka.utils.JsonRowDataSerializationSchemaUtils;
 import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.formats.json.JsonFormatOptions;
 import org.apache.flink.formats.json.JsonParserRowDataDeserializationSchema;
@@ -90,7 +89,7 @@ public class DebeziumJsonRowDataSerializationSchema 
implements SerializationSche
         this.mapNullKeyLiteral = mapNullKeyLiteral;
         this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber;
         this.runtimeConverter =
-                
JsonRowDataSerializationSchemaUtils.createRowDataToJsonConverters(
+                new RowDataToJsonConverters(
                                 timestampFormat,
                                 mapNullKeyMode,
                                 mapNullKeyLiteral,
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/serialization/JsonSerializationSchema.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/serialization/JsonSerializationSchema.java
index a9a0d845f..2ffd114cf 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/serialization/JsonSerializationSchema.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/serialization/JsonSerializationSchema.java
@@ -33,7 +33,6 @@ import org.apache.flink.cdc.common.types.DataTypes;
 import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
 import org.apache.flink.cdc.common.utils.SchemaUtils;
 import org.apache.flink.cdc.connectors.kafka.json.TableSchemaInfo;
-import 
org.apache.flink.cdc.connectors.kafka.utils.JsonRowDataSerializationSchemaUtils;
 import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.formats.json.JsonFormatOptions;
 import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
@@ -136,7 +135,7 @@ public class JsonSerializationSchema implements 
SerializationSchema<Event> {
         // the row should never be null
         DataType dataType = DataTypes.ROW(fields).notNull();
         LogicalType rowType = 
DataTypeUtils.toFlinkDataType(dataType).getLogicalType();
-        return JsonRowDataSerializationSchemaUtils.createSerializationSchema(
+        return new JsonRowDataSerializationSchema(
                 (RowType) rowType,
                 timestampFormat,
                 mapNullKeyMode,
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KeySerializationFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KeySerializationFactory.java
index 339e0a106..8ff76cf9a 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KeySerializationFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KeySerializationFactory.java
@@ -21,7 +21,6 @@ import 
org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.cdc.common.event.Event;
 import 
org.apache.flink.cdc.connectors.kafka.serialization.CsvSerializationSchema;
 import 
org.apache.flink.cdc.connectors.kafka.serialization.JsonSerializationSchema;
-import 
org.apache.flink.cdc.connectors.kafka.utils.JsonRowDataSerializationSchemaUtils;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.formats.json.JsonFormatOptions;
@@ -56,8 +55,7 @@ public class KeySerializationFactory {
                     final boolean encodeDecimalAsPlainNumber =
                             formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
                     final boolean ignoreNullFields =
-                            
JsonRowDataSerializationSchemaUtils.enableIgnoreNullFields(
-                                    formatOptions);
+                            
formatOptions.get(JsonFormatOptions.ENCODE_IGNORE_NULL_FIELDS);
                     return new JsonSerializationSchema(
                             timestampFormat,
                             mapNullKeyMode,
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/utils/JsonRowDataSerializationSchemaUtils.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/utils/JsonRowDataSerializationSchemaUtils.java
deleted file mode 100644
index 0cc850349..000000000
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/utils/JsonRowDataSerializationSchemaUtils.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.connectors.kafka.utils;
-
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.formats.common.TimestampFormat;
-import org.apache.flink.formats.json.JsonFormatOptions;
-import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
-import org.apache.flink.formats.json.RowDataToJsonConverters;
-import org.apache.flink.table.types.logical.RowType;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Field;
-import java.util.Arrays;
-
-/**
- * Utils for creating JsonRowDataSerializationSchema.TODO: Remove this class 
after bump to Flink
- * 1.20 or higher.
- */
-public class JsonRowDataSerializationSchemaUtils {
-
-    /**
-     * In flink>=1.20, the constructor of JsonRowDataSerializationSchema has 6 
parameters, and in
-     * flink<1.20, the constructor of JsonRowDataSerializationSchema has 5 
parameters.
-     */
-    public static JsonRowDataSerializationSchema createSerializationSchema(
-            RowType rowType,
-            TimestampFormat timestampFormat,
-            JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
-            String mapNullKeyLiteral,
-            boolean encodeDecimalAsPlainNumber,
-            boolean ignoreNullFields) {
-        try {
-            Class<?>[] fullParams =
-                    new Class[] {
-                        RowType.class,
-                        TimestampFormat.class,
-                        JsonFormatOptions.MapNullKeyMode.class,
-                        String.class,
-                        boolean.class,
-                        boolean.class
-                    };
-
-            Object[] fullParamValues =
-                    new Object[] {
-                        rowType,
-                        timestampFormat,
-                        mapNullKeyMode,
-                        mapNullKeyLiteral,
-                        encodeDecimalAsPlainNumber,
-                        ignoreNullFields
-                    };
-
-            for (int i = fullParams.length; i >= 5; i--) {
-                try {
-                    Constructor<?> constructor =
-                            
JsonRowDataSerializationSchema.class.getConstructor(
-                                    Arrays.copyOfRange(fullParams, 0, i));
-
-                    return (JsonRowDataSerializationSchema)
-                            
constructor.newInstance(Arrays.copyOfRange(fullParamValues, 0, i));
-                } catch (NoSuchMethodException ignored) {
-                }
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(
-                    "Failed to create JsonRowDataSerializationSchema,please 
check your Flink version is 1.19 or 1.20.",
-                    e);
-        }
-        throw new RuntimeException(
-                "Failed to find appropriate constructor for 
JsonRowDataSerializationSchema,please check your Flink version is 1.19 or 
1.20.");
-    }
-
-    /**
-     * In flink>=1.20, the constructor of RowDataToJsonConverters has 4 
parameters, and in
-     * flink<1.20, the constructor of RowDataToJsonConverters has 3 parameters.
-     */
-    public static RowDataToJsonConverters createRowDataToJsonConverters(
-            TimestampFormat timestampFormat,
-            JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
-            String mapNullKeyLiteral,
-            boolean ignoreNullFields) {
-        try {
-            Class<?>[] fullParams =
-                    new Class[] {
-                        TimestampFormat.class,
-                        JsonFormatOptions.MapNullKeyMode.class,
-                        String.class,
-                        boolean.class
-                    };
-
-            Object[] fullParamValues =
-                    new Object[] {
-                        timestampFormat, mapNullKeyMode, mapNullKeyLiteral, 
ignoreNullFields
-                    };
-
-            for (int i = fullParams.length; i >= 3; i--) {
-                try {
-                    Constructor<?> constructor =
-                            RowDataToJsonConverters.class.getConstructor(
-                                    Arrays.copyOfRange(fullParams, 0, i));
-
-                    return (RowDataToJsonConverters)
-                            
constructor.newInstance(Arrays.copyOfRange(fullParamValues, 0, i));
-                } catch (NoSuchMethodException ignored) {
-                }
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(
-                    "Failed to create RowDataToJsonConverters,please check 
your Flink version is 1.19 or 1.20.",
-                    e);
-        }
-        throw new RuntimeException(
-                "Failed to find appropriate constructor for 
RowDataToJsonConverters,please check your Flink version is 1.19 or 1.20.");
-    }
-
-    /** flink>=1.20 only has the ENCODE_IGNORE_NULL_FIELDS parameter. */
-    public static boolean enableIgnoreNullFields(ReadableConfig formatOptions) 
{
-        try {
-            Field field = 
JsonFormatOptions.class.getField("ENCODE_IGNORE_NULL_FIELDS");
-            ConfigOption<Boolean> encodeOption = (ConfigOption<Boolean>) 
field.get(null);
-            return formatOptions.get(encodeOption);
-        } catch (NoSuchFieldException | IllegalAccessException e) {
-            return false;
-        }
-    }
-}

Reply via email to