This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c1399be49 [flink] Fixed deserialization failure when kafka cdc records 
contain nested structures (#2526)
c1399be49 is described below

commit c1399be49f881327a20f0a8bc117f32bc69d9a79
Author: Kerwin <[email protected]>
AuthorDate: Thu Dec 21 11:19:29 2023 +0800

    [flink] Fixed deserialization failure when kafka cdc records contain nested 
structures (#2526)
---
 .../java/org/apache/paimon/utils/TypeUtils.java    | 16 ++++++++++
 .../flink/action/cdc/format/RecordParser.java      | 29 ++++++++++++++---
 .../kafka/KafkaDebeziumSyncTableActionITCase.java  | 37 ++++++++++++++++++++++
 .../debezium/table/nestedtype/debezium-data-1.txt  | 19 +++++++++++
 4 files changed, 96 insertions(+), 5 deletions(-)

diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
index 6b3fb7000..cd15ad383 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
@@ -229,4 +229,20 @@ public class TypeUtils {
                 return t1.copy(true).equals(t2.copy(true));
         }
     }
+
+    public static boolean isBasicType(Object obj) {
+        Class<?> clazz = obj.getClass();
+        return clazz.isPrimitive() || isWrapperType(clazz) || 
clazz.equals(String.class);
+    }
+
+    private static boolean isWrapperType(Class<?> clazz) {
+        return clazz.equals(Boolean.class)
+                || clazz.equals(Character.class)
+                || clazz.equals(Byte.class)
+                || clazz.equals(Short.class)
+                || clazz.equals(Integer.class)
+                || clazz.equals(Long.class)
+                || clazz.equals(Float.class)
+                || clazz.equals(Double.class);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
index 0593fc1e9..2588aad7a 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
@@ -27,7 +27,9 @@ import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.utils.JsonSerdeUtil;
+import org.apache.paimon.utils.TypeUtils;
 
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -42,10 +44,10 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
@@ -141,10 +143,27 @@ public abstract class RecordParser implements 
FlatMapFunction<String, RichCdcMul
     protected Map<String, String> extractRowData(
             JsonNode record, LinkedHashMap<String, DataType> paimonFieldTypes) 
{
         paimonFieldTypes.putAll(fillDefaultStringTypes(record));
-        Map<String, String> recordMap =
-                OBJECT_MAPPER.convertValue(record, new 
TypeReference<Map<String, String>>() {});
-
-        Map<String, String> rowData = new HashMap<>(recordMap);
+        Map<String, Object> recordMap =
+                OBJECT_MAPPER.convertValue(record, new 
TypeReference<Map<String, Object>>() {});
+        Map<String, String> rowData =
+                recordMap.entrySet().stream()
+                        .collect(
+                                Collectors.toMap(
+                                        Map.Entry::getKey,
+                                        entry -> {
+                                            if 
(Objects.nonNull(entry.getValue())
+                                                    && 
!TypeUtils.isBasicType(entry.getValue())) {
+                                                try {
+                                                    return OBJECT_MAPPER
+                                                            .writer()
+                                                            
.writeValueAsString(entry.getValue());
+                                                } catch 
(JsonProcessingException e) {
+                                                    LOG.error("Failed to 
deserialize record.", e);
+                                                    return 
Objects.toString(entry.getValue());
+                                                }
+                                            }
+                                            return 
Objects.toString(entry.getValue(), null);
+                                        }));
         evalComputedColumns(rowData, paimonFieldTypes);
         return rowData;
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
index d9cf2a8ea..6dcb4b1b7 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
@@ -181,4 +181,41 @@ public class KafkaDebeziumSyncTableActionITCase extends 
KafkaActionITCaseBase {
                 rowType,
                 Arrays.asList("id", "_year"));
     }
+
+    @Test
+    @Timeout(60)
+    public void testRecordWithNestedDataType() throws Exception {
+        String topic = "nested_type";
+        createTestTopic(topic, 1, 1);
+
+        List<String> lines = 
readLines("kafka/debezium/table/nestedtype/debezium-data-1.txt");
+        try {
+            writeRecordsToKafka(topic, lines);
+        } catch (Exception e) {
+            throw new Exception("Failed to write canal data to Kafka.", e);
+        }
+
+        Map<String, String> kafkaConfig = getBasicKafkaConfig();
+        kafkaConfig.put(VALUE_FORMAT.key(), "debezium-json");
+        kafkaConfig.put(TOPIC.key(), topic);
+        KafkaSyncTableAction action =
+                syncTableActionBuilder(kafkaConfig)
+                        .withPrimaryKeys("id")
+                        .withTableConfig(getBasicTableConfig())
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        FileStoreTable table = getFileStoreTable(tableName);
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(), DataTypes.STRING(), 
DataTypes.STRING()
+                        },
+                        new String[] {"id", "name", "row"});
+        List<String> primaryKeys = Collections.singletonList("id");
+        List<String> expected =
+                Collections.singletonList("+I[101, hammer, 
{\"row_key\":\"value\"}]");
+        waitForResult(expected, table, rowType, primaryKeys);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/nestedtype/debezium-data-1.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/nestedtype/debezium-data-1.txt
new file mode 100644
index 000000000..2394d9410
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/nestedtype/debezium-data-1.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"before": null, "after": {"id": 101, "name": "hammer", "row": 
{"row_key":"value"} }, "source": {"version": "1.9.7.Final", "connector": 
"mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": 
"false", "db": "test", "sequence": null, "table": "product", "server_id": 0, 
"gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, 
"op": "c", "ts_ms": 1596684883000, "transaction": null}

Reply via email to