This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c1399be49 [flink] Fixed deserialization failure when kafka cdc records
contain nested structures (#2526)
c1399be49 is described below
commit c1399be49f881327a20f0a8bc117f32bc69d9a79
Author: Kerwin <[email protected]>
AuthorDate: Thu Dec 21 11:19:29 2023 +0800
[flink] Fixed deserialization failure when kafka cdc records contain nested
structures (#2526)
---
.../java/org/apache/paimon/utils/TypeUtils.java | 16 ++++++++++
.../flink/action/cdc/format/RecordParser.java | 29 ++++++++++++++---
.../kafka/KafkaDebeziumSyncTableActionITCase.java | 37 ++++++++++++++++++++++
.../debezium/table/nestedtype/debezium-data-1.txt | 19 +++++++++++
4 files changed, 96 insertions(+), 5 deletions(-)
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
index 6b3fb7000..cd15ad383 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
@@ -229,4 +229,20 @@ public class TypeUtils {
return t1.copy(true).equals(t2.copy(true));
}
}
+
+ public static boolean isBasicType(Object obj) {
+ Class<?> clazz = obj.getClass();
+ return clazz.isPrimitive() || isWrapperType(clazz) ||
clazz.equals(String.class);
+ }
+
+ private static boolean isWrapperType(Class<?> clazz) {
+ return clazz.equals(Boolean.class)
+ || clazz.equals(Character.class)
+ || clazz.equals(Byte.class)
+ || clazz.equals(Short.class)
+ || clazz.equals(Integer.class)
+ || clazz.equals(Long.class)
+ || clazz.equals(Float.class)
+ || clazz.equals(Double.class);
+ }
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
index 0593fc1e9..2588aad7a 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
@@ -27,7 +27,9 @@ import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.JsonSerdeUtil;
+import org.apache.paimon.utils.TypeUtils;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -42,10 +44,10 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.Collections;
-import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -141,10 +143,27 @@ public abstract class RecordParser implements
FlatMapFunction<String, RichCdcMul
protected Map<String, String> extractRowData(
JsonNode record, LinkedHashMap<String, DataType> paimonFieldTypes)
{
paimonFieldTypes.putAll(fillDefaultStringTypes(record));
- Map<String, String> recordMap =
- OBJECT_MAPPER.convertValue(record, new
TypeReference<Map<String, String>>() {});
-
- Map<String, String> rowData = new HashMap<>(recordMap);
+ Map<String, Object> recordMap =
+ OBJECT_MAPPER.convertValue(record, new
TypeReference<Map<String, Object>>() {});
+ Map<String, String> rowData =
+ recordMap.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ entry -> {
+ if
(Objects.nonNull(entry.getValue())
+ &&
!TypeUtils.isBasicType(entry.getValue())) {
+ try {
+ return OBJECT_MAPPER
+ .writer()
+
.writeValueAsString(entry.getValue());
+ } catch
(JsonProcessingException e) {
+ LOG.error("Failed to
deserialize record.", e);
+ return
Objects.toString(entry.getValue());
+ }
+ }
+ return
Objects.toString(entry.getValue(), null);
+ }));
evalComputedColumns(rowData, paimonFieldTypes);
return rowData;
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
index d9cf2a8ea..6dcb4b1b7 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
@@ -181,4 +181,41 @@ public class KafkaDebeziumSyncTableActionITCase extends
KafkaActionITCaseBase {
rowType,
Arrays.asList("id", "_year"));
}
+
+ @Test
+ @Timeout(60)
+ public void testRecordWithNestedDataType() throws Exception {
+ String topic = "nested_type";
+ createTestTopic(topic, 1, 1);
+
+ List<String> lines =
readLines("kafka/debezium/table/nestedtype/debezium-data-1.txt");
+ try {
+ writeRecordsToKafka(topic, lines);
+ } catch (Exception e) {
+ throw new Exception("Failed to write canal data to Kafka.", e);
+ }
+
+ Map<String, String> kafkaConfig = getBasicKafkaConfig();
+ kafkaConfig.put(VALUE_FORMAT.key(), "debezium-json");
+ kafkaConfig.put(TOPIC.key(), topic);
+ KafkaSyncTableAction action =
+ syncTableActionBuilder(kafkaConfig)
+ .withPrimaryKeys("id")
+ .withTableConfig(getBasicTableConfig())
+ .build();
+ runActionWithDefaultEnv(action);
+
+ FileStoreTable table = getFileStoreTable(tableName);
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING().notNull(), DataTypes.STRING(),
DataTypes.STRING()
+ },
+ new String[] {"id", "name", "row"});
+ List<String> primaryKeys = Collections.singletonList("id");
+ List<String> expected =
+ Collections.singletonList("+I[101, hammer,
{\"row_key\":\"value\"}]");
+ waitForResult(expected, table, rowType, primaryKeys);
+ }
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/nestedtype/debezium-data-1.txt
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/nestedtype/debezium-data-1.txt
new file mode 100644
index 000000000..2394d9410
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/nestedtype/debezium-data-1.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"before": null, "after": {"id": 101, "name": "hammer", "row":
{"row_key":"value"} }, "source": {"version": "1.9.7.Final", "connector":
"mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot":
"false", "db": "test", "sequence": null, "table": "product", "server_id": 0,
"gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null},
"op": "c", "ts_ms": 1596684883000, "transaction": null}