This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 89f97c7ba [cdc] support aliyun-json when sinking data from kafka with
paimon-flink-action (#4570)
89f97c7ba is described below
commit 89f97c7baf59b72a5bc747e6f5391973fac5d3f5
Author: JackeyLee007 <[email protected]>
AuthorDate: Thu Nov 21 22:22:56 2024 +0800
[cdc] support aliyun-json when sinking data from kafka with
paimon-flink-action (#4570)
---
.../cdc/format/aliyun/AliyunDataFormat.java} | 21 +-
.../format/aliyun/AliyunDataFormatFactory.java} | 23 +-
.../cdc/format/aliyun/AliyunFieldParser.java | 117 ++++++++++
.../cdc/format/aliyun/AliyunRecordParser.java | 260 +++++++++++++++++++++
.../MessageQueueCdcTimestampExtractor.java | 4 +
.../services/org.apache.paimon.factories.Factory | 1 +
.../format/aliyun/AliyunJsonRecordParserTest.java | 167 +++++++++++++
.../kafka/aliyun/table/event/event-delete.txt} | 14 +-
.../kafka/aliyun/table/event/event-insert.txt} | 14 +-
.../aliyun/table/event/event-update-in-one.txt} | 14 +-
10 files changed, 579 insertions(+), 56 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunDataFormat.java
similarity index 58%
copy from
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
copy to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunDataFormat.java
index efe5e12b1..ccbacdc2a 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunDataFormat.java
@@ -16,16 +16,19 @@
* limitations under the License.
*/
-package org.apache.paimon.flink;
+package org.apache.paimon.flink.action.cdc.format.aliyun;
-import org.apache.paimon.flink.sink.StoreSinkWrite;
+import org.apache.paimon.flink.action.cdc.format.AbstractJsonDataFormat;
+import org.apache.paimon.flink.action.cdc.format.RecordParserFactory;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
-
-/** Placeholder class for new feature introduced since flink 1.19. Should
never be used. */
-public class ProcessRecordAttributesUtil {
- public static void processWithWrite(RecordAttributes recordAttributes,
StoreSinkWrite write) {}
+/**
+ * Supports the message queue's debezium json data format and provides
definitions for the message
+ * queue's record json deserialization class and parsing class {@link
AliyunRecordParser}.
+ */
+public class AliyunDataFormat extends AbstractJsonDataFormat {
- public static void processWithOutput(RecordAttributes recordAttributes,
Output output) {}
+ @Override
+ protected RecordParserFactory parser() {
+ return AliyunRecordParser::new;
+ }
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunDataFormatFactory.java
similarity index 59%
copy from
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
copy to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunDataFormatFactory.java
index efe5e12b1..a07e2f205 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunDataFormatFactory.java
@@ -16,16 +16,23 @@
* limitations under the License.
*/
-package org.apache.paimon.flink;
+package org.apache.paimon.flink.action.cdc.format.aliyun;
-import org.apache.paimon.flink.sink.StoreSinkWrite;
+import org.apache.paimon.flink.action.cdc.format.DataFormat;
+import org.apache.paimon.flink.action.cdc.format.DataFormatFactory;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
+/** Factory to create {@link AliyunDataFormat}. */
+public class AliyunDataFormatFactory implements DataFormatFactory {
-/** Placeholder class for new feature introduced since flink 1.19. Should
never be used. */
-public class ProcessRecordAttributesUtil {
- public static void processWithWrite(RecordAttributes recordAttributes,
StoreSinkWrite write) {}
+ public static final String IDENTIFIER = "aliyun-json";
- public static void processWithOutput(RecordAttributes recordAttributes,
Output output) {}
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public DataFormat create() {
+ return new AliyunDataFormat();
+ }
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunFieldParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunFieldParser.java
new file mode 100644
index 000000000..824ed9145
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunFieldParser.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.aliyun;
+
+/** Converts some special types such as enum、set、geometry. */
+public class AliyunFieldParser {
+
+ protected static byte[] convertGeoType2WkbArray(byte[] mysqlGeomBytes) {
+ int sridLength = 4;
+ boolean hasSrid = false;
+ for (int i = 0; i < sridLength; ++i) {
+ if (mysqlGeomBytes[i] != 0) {
+ hasSrid = true;
+ break;
+ }
+ }
+ byte[] wkb;
+ if (hasSrid) {
+ wkb = new byte[mysqlGeomBytes.length];
+ // byteOrder + geometry
+ System.arraycopy(mysqlGeomBytes, 4, wkb, 0, 5);
+ // srid
+ System.arraycopy(mysqlGeomBytes, 0, wkb, 5, 4);
+ // geometry
+ System.arraycopy(mysqlGeomBytes, 9, wkb, 9, wkb.length - 9);
+
+ // set srid flag
+ if (wkb[0] == 0) {
+ // big endian
+ wkb[1] = (byte) (wkb[1] + 32);
+ } else {
+ wkb[4] = (byte) (wkb[4] + 32);
+ }
+ } else {
+ wkb = new byte[mysqlGeomBytes.length - 4];
+ System.arraycopy(mysqlGeomBytes, 4, wkb, 0, wkb.length);
+ }
+ return wkb;
+ }
+
+ protected static String convertSet(String value, String mysqlType) {
+ // mysql set type value can be filled with more than one, value is a
bit string conversion
+ // from the long
+ int indexes = Integer.parseInt(value);
+ return getSetValuesByIndex(mysqlType, indexes);
+ }
+
+ protected static String convertEnum(String value, String mysqlType) {
+ int elementIndex = Integer.parseInt(value);
+ // enum('a','b','c')
+ return getEnumValueByIndex(mysqlType, elementIndex);
+ }
+
+ protected static String getEnumValueByIndex(String mysqlType, int
elementIndex) {
+ String[] options = extractEnumValueByIndex(mysqlType);
+
+ return options[elementIndex - 1];
+ }
+
+ protected static String getSetValuesByIndex(String mysqlType, int indexes)
{
+ String[] options = extractSetValuesByIndex(mysqlType);
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("[");
+ int index = 0;
+ boolean first = true;
+ int optionLen = options.length;
+
+ while (indexes != 0L) {
+ if (indexes % 2L != 0) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(',');
+ }
+ if (index < optionLen) {
+ sb.append(options[index]);
+ } else {
+ throw new RuntimeException(
+ String.format(
+ "extractSetValues from
mysqlType[%s],index:%d failed",
+ mysqlType, indexes));
+ }
+ }
+ ++index;
+ indexes = indexes >>> 1;
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+
+ private static String[] extractSetValuesByIndex(String mysqlType) {
+ // set('x','y')
+ return mysqlType.substring(5, mysqlType.length() -
2).split("'\\s*,\\s*'");
+ }
+
+ private static String[] extractEnumValueByIndex(String mysqlType) {
+ // enum('x','y')
+ return mysqlType.substring(6, mysqlType.length() -
2).split("'\\s*,\\s*'");
+ }
+}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java
new file mode 100644
index 000000000..e31b282a7
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.aliyun;
+
+import org.apache.paimon.flink.action.cdc.ComputedColumn;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
+import org.apache.paimon.flink.action.cdc.format.AbstractJsonRecordParser;
+import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.paimon.utils.JsonSerdeUtil.getNodeAs;
+import static org.apache.paimon.utils.JsonSerdeUtil.isNull;
+
+/**
+ * The {@code CanalRecordParser} class is responsible for parsing records from
the Canal-JSON
+ * format. Canal is a database binlog multi-platform consumer, which is used
to synchronize data
+ * across databases. This parser extracts relevant information from the
Canal-JSON format and
+ * transforms it into a list of {@link RichCdcMultiplexRecord} objects, which
represent the changes
+ * captured in the database.
+ *
+ * <p>The class handles different types of database operations such as INSERT,
UPDATE, and DELETE,
+ * and generates corresponding {@link RichCdcMultiplexRecord} objects for each
operation.
+ *
+ * <p>Additionally, the parser supports schema extraction, which can be used
to understand the
+ * structure of the incoming data and its corresponding field types.
+ */
+public class AliyunRecordParser extends AbstractJsonRecordParser {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AliyunRecordParser.class);
+
+ private static final String FIELD_IS_DDL = "isDdl";
+ private static final String FIELD_TYPE = "op";
+
+ private static final String OP_UPDATE_BEFORE = "UPDATE_BEFORE";
+ private static final String OP_UPDATE_AFTER = "UPDATE_AFTER";
+ private static final String OP_INSERT = "INSERT";
+ private static final String OP_DELETE = "DELETE";
+
+ private static final String FIELD_PAYLOAD = "payload";
+ private static final String FIELD_BEFORE = "before";
+ private static final String FIELD_AFTER = "after";
+ private static final String FIELD_COLUMN = "dataColumn";
+
+ private static final String FIELD_SCHEMA = "schema";
+ private static final String FIELD_PK = "primaryKey";
+
+ @Override
+ protected boolean isDDL() {
+ JsonNode node = root.get(FIELD_IS_DDL);
+ return !isNull(node) && node.asBoolean();
+ }
+
+ public AliyunRecordParser(TypeMapping typeMapping, List<ComputedColumn>
computedColumns) {
+ super(typeMapping, computedColumns);
+ }
+
+ @Override
+ protected String primaryField() {
+ return "schema.primaryKey";
+ }
+
+ @Override
+ protected String dataField() {
+ return "payload.dataColumn";
+ }
+
+ @Override
+ protected List<String> extractPrimaryKeys() {
+ JsonNode schemaNode = root.get(FIELD_SCHEMA);
+ checkNotNull(schemaNode, FIELD_SCHEMA);
+ ArrayNode pkNode = getNodeAs(schemaNode, FIELD_PK, ArrayNode.class);
+ List<String> pkFields = new ArrayList<>();
+ pkNode.forEach(
+ pk -> {
+ if (isNull(pk)) {
+ throw new IllegalArgumentException(
+ String.format("Primary key cannot be null:
%s", pk));
+ }
+
+ pkFields.add(pk.asText());
+ });
+ return pkFields;
+ }
+
+ @Override
+ public List<RichCdcMultiplexRecord> extractRecords() {
+ if (isDDL()) {
+ return Collections.emptyList();
+ }
+
+ List<RichCdcMultiplexRecord> records = new ArrayList<>();
+
+ JsonNode payload = root.get(FIELD_PAYLOAD);
+ checkNotNull(payload, FIELD_PAYLOAD);
+
+ String type = payload.get(FIELD_TYPE).asText();
+
+ RowKind rowKind = null;
+ String field = null;
+ switch (type) {
+ case OP_UPDATE_BEFORE:
+ rowKind = RowKind.UPDATE_BEFORE;
+ field = FIELD_BEFORE;
+ break;
+ case OP_UPDATE_AFTER:
+ rowKind = RowKind.UPDATE_AFTER;
+ field = FIELD_AFTER;
+ break;
+ case OP_INSERT:
+ rowKind = RowKind.INSERT;
+ field = FIELD_AFTER;
+ break;
+ case OP_DELETE:
+ rowKind = RowKind.DELETE;
+ field = FIELD_BEFORE;
+ break;
+ default:
+ throw new UnsupportedOperationException("Unknown record
operation: " + type);
+ }
+
+ JsonNode container = payload.get(field);
+ checkNotNull(container, String.format("%s.%s", FIELD_PAYLOAD, field));
+
+ JsonNode data = getNodeAs(container, FIELD_COLUMN, JsonNode.class);
+ checkNotNull(data, String.format("%s.%s.%s", FIELD_PAYLOAD, field,
FIELD_COLUMN));
+
+ processRecord(data, rowKind, records);
+
+ return records;
+ }
+
+ @Override
+ protected Map<String, String> extractRowData(JsonNode record,
RowType.Builder rowTypeBuilder) {
+
+ Map<String, Object> recordMap =
+ JsonSerdeUtil.convertValue(record, new
TypeReference<Map<String, Object>>() {});
+ Map<String, String> rowData = new HashMap<>();
+
+ fillDefaultTypes(record, rowTypeBuilder);
+ for (Map.Entry<String, Object> entry : recordMap.entrySet()) {
+ rowData.put(entry.getKey(), Objects.toString(entry.getValue(),
null));
+ }
+
+ evalComputedColumns(rowData, rowTypeBuilder);
+ return rowData;
+ }
+
+ @Override
+ protected String format() {
+ return "aliyun-json";
+ }
+
+ @Nullable
+ @Override
+ protected String getTableName() {
+ JsonNode schemaNode = root.get(FIELD_SCHEMA);
+ if (isNull(schemaNode)) {
+ return null;
+ }
+ JsonNode sourceNode = schemaNode.get("source");
+ if (isNull(sourceNode)) {
+ return null;
+ }
+
+ JsonNode tableNode = sourceNode.get("tableName");
+ if (isNull(tableNode)) {
+ return null;
+ }
+ return tableNode.asText();
+ }
+
+ @Nullable
+ @Override
+ protected String getDatabaseName() {
+ JsonNode schemaNode = root.get(FIELD_SCHEMA);
+ if (isNull(schemaNode)) {
+ return null;
+ }
+ JsonNode sourceNode = schemaNode.get("source");
+ if (isNull(sourceNode)) {
+ return null;
+ }
+ JsonNode databaseNode = sourceNode.get("dbName");
+ if (isNull(databaseNode)) {
+ return null;
+ }
+ return databaseNode.asText();
+ }
+
+ private Map<JsonNode, JsonNode> matchOldRecords(ArrayNode newData,
ArrayNode oldData) {
+ return IntStream.range(0, newData.size())
+ .boxed()
+ .collect(Collectors.toMap(newData::get, oldData::get));
+ }
+
+ private String transformValue(@Nullable String oldValue, String shortType,
String mySqlType) {
+ if (oldValue == null) {
+ return null;
+ }
+
+ if (MySqlTypeUtils.isSetType(shortType)) {
+ return AliyunFieldParser.convertSet(oldValue, mySqlType);
+ }
+
+ if (MySqlTypeUtils.isEnumType(shortType)) {
+ return AliyunFieldParser.convertEnum(oldValue, mySqlType);
+ }
+
+ if (MySqlTypeUtils.isGeoType(shortType)) {
+ try {
+ byte[] wkb =
+ AliyunFieldParser.convertGeoType2WkbArray(
+
oldValue.getBytes(StandardCharsets.ISO_8859_1));
+ return MySqlTypeUtils.convertWkbArray(wkb);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ String.format("Failed to convert %s to geometry
JSON.", oldValue), e);
+ }
+ }
+ return oldValue;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/MessageQueueCdcTimestampExtractor.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/MessageQueueCdcTimestampExtractor.java
index 8a9a28453..5bf2fefc1 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/MessageQueueCdcTimestampExtractor.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/MessageQueueCdcTimestampExtractor.java
@@ -54,6 +54,10 @@ public class MessageQueueCdcTimestampExtractor implements
CdcTimestampExtractor
} else if (JsonSerdeUtil.isNodeExists(record, "source", "connector")) {
// Dbz json
return JsonSerdeUtil.extractValue(record, Long.class, "ts_ms");
+ } else if (JsonSerdeUtil.isNodeExists(record, "payload", "timestamp"))
{
+ // Aliyun json
+ return JsonSerdeUtil.extractValue(
+ record, Long.class, "payload", "timestamp", "systemTime");
}
throw new RuntimeException(
String.format(
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 17b8b29a2..1b30c7ab6 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -27,6 +27,7 @@
org.apache.paimon.flink.action.cdc.mongodb.MongoDBSyncDatabaseActionFactory
org.apache.paimon.flink.action.cdc.postgres.PostgresSyncTableActionFactory
### message queue data format factories
+org.apache.paimon.flink.action.cdc.format.aliyun.AliyunDataFormatFactory
org.apache.paimon.flink.action.cdc.format.canal.CanalDataFormatFactory
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumAvroDataFormatFactory
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumJsonDataFormatFactory
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunJsonRecordParserTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunJsonRecordParserTest.java
new file mode 100644
index 000000000..f06268d70
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunJsonRecordParserTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.aliyun;
+
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
+import org.apache.paimon.flink.action.cdc.kafka.KafkaActionITCaseBase;
+import
org.apache.paimon.flink.action.cdc.watermark.MessageQueueCdcTimestampExtractor;
+import org.apache.paimon.flink.sink.cdc.CdcRecord;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.RowKind;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/** Test for AliyunJsonRecordParser. */
+public class AliyunJsonRecordParserTest extends KafkaActionITCaseBase {
+
+ private static final Logger log =
LoggerFactory.getLogger(AliyunJsonRecordParserTest.class);
+ private static List<String> insertList = new ArrayList<>();
+ private static List<String> updateList = new ArrayList<>();
+ private static List<String> deleteList = new ArrayList<>();
+
+ private static ObjectMapper objMapper = new ObjectMapper();
+
+ @Before
+ public void setup() {
+ String insertRes = "kafka/aliyun/table/event/event-insert.txt";
+ String updateRes = "kafka/aliyun/table/event/event-update-in-one.txt";
+ String deleteRes = "kafka/aliyun/table/event/event-delete.txt";
+ URL url;
+ try {
+ url =
AliyunJsonRecordParserTest.class.getClassLoader().getResource(insertRes);
+ Files.readAllLines(Paths.get(url.toURI())).stream()
+ .filter(this::isRecordLine)
+ .forEach(e -> insertList.add(e));
+
+ url =
AliyunJsonRecordParserTest.class.getClassLoader().getResource(updateRes);
+ Files.readAllLines(Paths.get(url.toURI())).stream()
+ .filter(this::isRecordLine)
+ .forEach(e -> updateList.add(e));
+
+ url =
AliyunJsonRecordParserTest.class.getClassLoader().getResource(deleteRes);
+ Files.readAllLines(Paths.get(url.toURI())).stream()
+ .filter(this::isRecordLine)
+ .forEach(e -> deleteList.add(e));
+
+ } catch (Exception e) {
+ log.error("Fail to init aliyun-json cases", e);
+ }
+ }
+
+ @Test
+ public void extractInsertRecord() throws Exception {
+ AliyunRecordParser parser =
+ new AliyunRecordParser(TypeMapping.defaultMapping(),
Collections.emptyList());
+ for (String json : insertList) {
+ // 将json解析为JsonNode对象
+ JsonNode rootNode = objMapper.readValue(json, JsonNode.class);
+ CdcSourceRecord cdcRecord = new CdcSourceRecord(rootNode);
+ Schema schema = parser.buildSchema(cdcRecord);
+ Assert.assertEquals(schema.primaryKeys(), Arrays.asList("id"));
+
+ List<RichCdcMultiplexRecord> records = parser.extractRecords();
+ Assert.assertEquals(records.size(), 1);
+
+ CdcRecord result = records.get(0).toRichCdcRecord().toCdcRecord();
+ Assert.assertEquals(result.kind(), RowKind.INSERT);
+
+ String dbName = parser.getDatabaseName();
+ Assert.assertEquals(dbName, "bigdata_test");
+
+ String tableName = parser.getTableName();
+ Assert.assertEquals(tableName, "sync_test_table");
+
+ MessageQueueCdcTimestampExtractor extractor = new
MessageQueueCdcTimestampExtractor();
+ Assert.assertTrue(extractor.extractTimestamp(cdcRecord) > 0);
+ }
+ }
+
+ @Test
+ public void extractUpdateRecord() throws Exception {
+ AliyunRecordParser parser =
+ new AliyunRecordParser(TypeMapping.defaultMapping(),
Collections.emptyList());
+ for (String json : updateList) {
+ // 将json解析为JsonNode对象
+ JsonNode jsonNode = objMapper.readValue(json, JsonNode.class);
+ CdcSourceRecord cdcRecord = new CdcSourceRecord(jsonNode);
+ Schema schema = parser.buildSchema(cdcRecord);
+ Assert.assertEquals(schema.primaryKeys(), Arrays.asList("id"));
+
+ List<RichCdcMultiplexRecord> records = parser.extractRecords();
+ Assert.assertEquals(records.size(), 1);
+
+ CdcRecord result = records.get(0).toRichCdcRecord().toCdcRecord();
+ Assert.assertEquals(result.kind(), RowKind.UPDATE_AFTER);
+
+ String dbName = parser.getDatabaseName();
+ Assert.assertEquals(dbName, "bigdata_test");
+
+ String tableName = parser.getTableName();
+ Assert.assertEquals(tableName, "sync_test_table");
+
+ MessageQueueCdcTimestampExtractor extractor = new
MessageQueueCdcTimestampExtractor();
+ Assert.assertTrue(extractor.extractTimestamp(cdcRecord) > 0);
+ }
+ }
+
+ @Test
+ public void extractDeleteRecord() throws Exception {
+ AliyunRecordParser parser =
+ new AliyunRecordParser(TypeMapping.defaultMapping(),
Collections.emptyList());
+ for (String json : deleteList) {
+ // 将json解析为JsonNode对象
+ JsonNode jsonNode = objMapper.readValue(json, JsonNode.class);
+ CdcSourceRecord cdcRecord = new CdcSourceRecord(jsonNode);
+ Schema schema = parser.buildSchema(cdcRecord);
+ Assert.assertEquals(schema.primaryKeys(), Arrays.asList("id"));
+
+ List<RichCdcMultiplexRecord> records = parser.extractRecords();
+ Assert.assertEquals(records.size(), 1);
+
+ CdcRecord result = records.get(0).toRichCdcRecord().toCdcRecord();
+ Assert.assertEquals(result.kind(), RowKind.DELETE);
+
+ String dbName = parser.getDatabaseName();
+ Assert.assertEquals(dbName, "bigdata_test");
+
+ String tableName = parser.getTableName();
+ Assert.assertEquals(tableName, "sync_test_table");
+
+ MessageQueueCdcTimestampExtractor extractor = new
MessageQueueCdcTimestampExtractor();
+ Assert.assertTrue(extractor.extractTimestamp(cdcRecord) > 0);
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-delete.txt
similarity index 59%
copy from
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
copy to
paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-delete.txt
index efe5e12b1..ebae6608a 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-delete.txt
@@ -16,16 +16,4 @@
* limitations under the License.
*/
-package org.apache.paimon.flink;
-
-import org.apache.paimon.flink.sink.StoreSinkWrite;
-
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
-
-/** Placeholder class for new feature introduced since flink 1.19. Should
never be used. */
-public class ProcessRecordAttributesUtil {
- public static void processWithWrite(RecordAttributes recordAttributes,
StoreSinkWrite write) {}
-
- public static void processWithOutput(RecordAttributes recordAttributes,
Output output) {}
-}
+{"schema":{"dataColumn":[{"name":"id","type":"LONG"},{"name":"val","type":"DOUBLE"},{"name":"name","type":"STRING"},{"name":"create_time","type":"DATE"}],"primaryKey":["id"],"source":{"dbType":"MySQL","dbName":"bigdata_test","tableName":"sync_test_table"}},"payload":{"before":{"dataColumn":{"id":1,"val":"1.100000","name":"a","create_time":1731661114000}},"after":null,"sequenceId":"1731663842292000000","timestamp":{"eventTime":1731662085000,"systemTime":1731663848953,"checkpointTime":1731
[...]
\ No newline at end of file
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-insert.txt
similarity index 59%
copy from
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
copy to
paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-insert.txt
index efe5e12b1..d1cd34e5e 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-insert.txt
@@ -16,16 +16,4 @@
* limitations under the License.
*/
-package org.apache.paimon.flink;
-
-import org.apache.paimon.flink.sink.StoreSinkWrite;
-
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
-
-/** Placeholder class for new feature introduced since flink 1.19. Should
never be used. */
-public class ProcessRecordAttributesUtil {
- public static void processWithWrite(RecordAttributes recordAttributes,
StoreSinkWrite write) {}
-
- public static void processWithOutput(RecordAttributes recordAttributes,
Output output) {}
-}
+{"payload":{"after":{"dataColumn":{"create_time":1731661114000,"id":2,"name":"a","val":"1.100000"}},"before":null,"ddl":null,"op":"INSERT","sequenceId":"-1","timestamp":{"checkpointTime":-1,"eventTime":-1,"systemTime":1731661820245}},"schema":{"dataColumn":[{"name":"id","type":"LONG"},{"name":"val","type":"DOUBLE"},{"name":"name","type":"STRING"},{"name":"create_time","type":"DATE"}],"primaryKey":["id"],"source":{"dbName":"bigdata_test","dbType":"MySQL","tableName":"sync_test_table"}},"v
[...]
\ No newline at end of file
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-update-in-one.txt
similarity index 56%
rename from
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
rename to
paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-update-in-one.txt
index efe5e12b1..9acf6309c 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-update-in-one.txt
@@ -16,16 +16,4 @@
* limitations under the License.
*/
-package org.apache.paimon.flink;
-
-import org.apache.paimon.flink.sink.StoreSinkWrite;
-
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
-
-/** Placeholder class for new feature introduced since flink 1.19. Should
never be used. */
-public class ProcessRecordAttributesUtil {
- public static void processWithWrite(RecordAttributes recordAttributes,
StoreSinkWrite write) {}
-
- public static void processWithOutput(RecordAttributes recordAttributes,
Output output) {}
-}
+
{"schema":{"dataColumn":[{"name":"id","type":"LONG"},{"name":"val","type":"DOUBLE"},{"name":"name","type":"STRING"},{"name":"create_time","type":"DATE"}],"primaryKey":["id"],"source":{"dbType":"MySQL","dbName":"bigdata_test","tableName":"sync_test_table"}},"payload":{"before":{"dataColumn":{"id":2,"val":"1.100000","name":"a","create_time":1731661114000}},"after":{"dataColumn":{"id":2,"val":"2.200000","name":"a","create_time":1731661114000}},"sequenceId":"1731663842292000001","timestamp"
[...]
\ No newline at end of file