This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 89f97c7ba [cdc] support aliyun-json when sinking data from kafka with 
paimon-flink-action (#4570)
89f97c7ba is described below

commit 89f97c7baf59b72a5bc747e6f5391973fac5d3f5
Author: JackeyLee007 <[email protected]>
AuthorDate: Thu Nov 21 22:22:56 2024 +0800

    [cdc] support aliyun-json when sinking data from kafka with 
paimon-flink-action (#4570)
---
 .../cdc/format/aliyun/AliyunDataFormat.java}       |  21 +-
 .../format/aliyun/AliyunDataFormatFactory.java}    |  23 +-
 .../cdc/format/aliyun/AliyunFieldParser.java       | 117 ++++++++++
 .../cdc/format/aliyun/AliyunRecordParser.java      | 260 +++++++++++++++++++++
 .../MessageQueueCdcTimestampExtractor.java         |   4 +
 .../services/org.apache.paimon.factories.Factory   |   1 +
 .../format/aliyun/AliyunJsonRecordParserTest.java  | 167 +++++++++++++
 .../kafka/aliyun/table/event/event-delete.txt}     |  14 +-
 .../kafka/aliyun/table/event/event-insert.txt}     |  14 +-
 .../aliyun/table/event/event-update-in-one.txt}    |  14 +-
 10 files changed, 579 insertions(+), 56 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunDataFormat.java
similarity index 58%
copy from 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
copy to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunDataFormat.java
index efe5e12b1..ccbacdc2a 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunDataFormat.java
@@ -16,16 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink;
+package org.apache.paimon.flink.action.cdc.format.aliyun;
 
-import org.apache.paimon.flink.sink.StoreSinkWrite;
+import org.apache.paimon.flink.action.cdc.format.AbstractJsonDataFormat;
+import org.apache.paimon.flink.action.cdc.format.RecordParserFactory;
 
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
-
-/** Placeholder class for new feature introduced since flink 1.19. Should 
never be used. */
-public class ProcessRecordAttributesUtil {
-    public static void processWithWrite(RecordAttributes recordAttributes, 
StoreSinkWrite write) {}
+/**
+ * Supports the message queue's debezium json data format and provides 
definitions for the message
+ * queue's record json deserialization class and parsing class {@link 
AliyunRecordParser}.
+ */
+public class AliyunDataFormat extends AbstractJsonDataFormat {
 
-    public static void processWithOutput(RecordAttributes recordAttributes, 
Output output) {}
+    @Override
+    protected RecordParserFactory parser() {
+        return AliyunRecordParser::new;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunDataFormatFactory.java
similarity index 59%
copy from 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
copy to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunDataFormatFactory.java
index efe5e12b1..a07e2f205 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunDataFormatFactory.java
@@ -16,16 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink;
+package org.apache.paimon.flink.action.cdc.format.aliyun;
 
-import org.apache.paimon.flink.sink.StoreSinkWrite;
+import org.apache.paimon.flink.action.cdc.format.DataFormat;
+import org.apache.paimon.flink.action.cdc.format.DataFormatFactory;
 
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
+/** Factory to create {@link AliyunDataFormat}. */
+public class AliyunDataFormatFactory implements DataFormatFactory {
 
-/** Placeholder class for new feature introduced since flink 1.19. Should 
never be used. */
-public class ProcessRecordAttributesUtil {
-    public static void processWithWrite(RecordAttributes recordAttributes, 
StoreSinkWrite write) {}
+    public static final String IDENTIFIER = "aliyun-json";
 
-    public static void processWithOutput(RecordAttributes recordAttributes, 
Output output) {}
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public DataFormat create() {
+        return new AliyunDataFormat();
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunFieldParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunFieldParser.java
new file mode 100644
index 000000000..824ed9145
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunFieldParser.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.aliyun;
+
+/** Converts some special types such as enum、set、geometry. */
+public class AliyunFieldParser {
+
+    protected static byte[] convertGeoType2WkbArray(byte[] mysqlGeomBytes) {
+        int sridLength = 4;
+        boolean hasSrid = false;
+        for (int i = 0; i < sridLength; ++i) {
+            if (mysqlGeomBytes[i] != 0) {
+                hasSrid = true;
+                break;
+            }
+        }
+        byte[] wkb;
+        if (hasSrid) {
+            wkb = new byte[mysqlGeomBytes.length];
+            // byteOrder + geometry
+            System.arraycopy(mysqlGeomBytes, 4, wkb, 0, 5);
+            // srid
+            System.arraycopy(mysqlGeomBytes, 0, wkb, 5, 4);
+            // geometry
+            System.arraycopy(mysqlGeomBytes, 9, wkb, 9, wkb.length - 9);
+
+            // set srid flag
+            if (wkb[0] == 0) {
+                // big endian
+                wkb[1] = (byte) (wkb[1] + 32);
+            } else {
+                wkb[4] = (byte) (wkb[4] + 32);
+            }
+        } else {
+            wkb = new byte[mysqlGeomBytes.length - 4];
+            System.arraycopy(mysqlGeomBytes, 4, wkb, 0, wkb.length);
+        }
+        return wkb;
+    }
+
+    protected static String convertSet(String value, String mysqlType) {
+        // mysql set type value can be filled with more than one, value is a 
bit string conversion
+        // from the long
+        int indexes = Integer.parseInt(value);
+        return getSetValuesByIndex(mysqlType, indexes);
+    }
+
+    protected static String convertEnum(String value, String mysqlType) {
+        int elementIndex = Integer.parseInt(value);
+        // enum('a','b','c')
+        return getEnumValueByIndex(mysqlType, elementIndex);
+    }
+
+    protected static String getEnumValueByIndex(String mysqlType, int 
elementIndex) {
+        String[] options = extractEnumValueByIndex(mysqlType);
+
+        return options[elementIndex - 1];
+    }
+
+    protected static String getSetValuesByIndex(String mysqlType, int indexes) 
{
+        String[] options = extractSetValuesByIndex(mysqlType);
+
+        StringBuilder sb = new StringBuilder();
+        sb.append("[");
+        int index = 0;
+        boolean first = true;
+        int optionLen = options.length;
+
+        while (indexes != 0L) {
+            if (indexes % 2L != 0) {
+                if (first) {
+                    first = false;
+                } else {
+                    sb.append(',');
+                }
+                if (index < optionLen) {
+                    sb.append(options[index]);
+                } else {
+                    throw new RuntimeException(
+                            String.format(
+                                    "extractSetValues from 
mysqlType[%s],index:%d failed",
+                                    mysqlType, indexes));
+                }
+            }
+            ++index;
+            indexes = indexes >>> 1;
+        }
+        sb.append("]");
+        return sb.toString();
+    }
+
+    private static String[] extractSetValuesByIndex(String mysqlType) {
+        // set('x','y')
+        return mysqlType.substring(5, mysqlType.length() - 
2).split("'\\s*,\\s*'");
+    }
+
+    private static String[] extractEnumValueByIndex(String mysqlType) {
+        // enum('x','y')
+        return mysqlType.substring(6, mysqlType.length() - 
2).split("'\\s*,\\s*'");
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java
new file mode 100644
index 000000000..e31b282a7
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.aliyun;
+
+import org.apache.paimon.flink.action.cdc.ComputedColumn;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
+import org.apache.paimon.flink.action.cdc.format.AbstractJsonRecordParser;
+import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.paimon.utils.JsonSerdeUtil.getNodeAs;
+import static org.apache.paimon.utils.JsonSerdeUtil.isNull;
+
+/**
+ * The {@code CanalRecordParser} class is responsible for parsing records from 
the Canal-JSON
+ * format. Canal is a database binlog multi-platform consumer, which is used 
to synchronize data
+ * across databases. This parser extracts relevant information from the 
Canal-JSON format and
+ * transforms it into a list of {@link RichCdcMultiplexRecord} objects, which 
represent the changes
+ * captured in the database.
+ *
+ * <p>The class handles different types of database operations such as INSERT, 
UPDATE, and DELETE,
+ * and generates corresponding {@link RichCdcMultiplexRecord} objects for each 
operation.
+ *
+ * <p>Additionally, the parser supports schema extraction, which can be used 
to understand the
+ * structure of the incoming data and its corresponding field types.
+ */
+public class AliyunRecordParser extends AbstractJsonRecordParser {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AliyunRecordParser.class);
+
+    private static final String FIELD_IS_DDL = "isDdl";
+    private static final String FIELD_TYPE = "op";
+
+    private static final String OP_UPDATE_BEFORE = "UPDATE_BEFORE";
+    private static final String OP_UPDATE_AFTER = "UPDATE_AFTER";
+    private static final String OP_INSERT = "INSERT";
+    private static final String OP_DELETE = "DELETE";
+
+    private static final String FIELD_PAYLOAD = "payload";
+    private static final String FIELD_BEFORE = "before";
+    private static final String FIELD_AFTER = "after";
+    private static final String FIELD_COLUMN = "dataColumn";
+
+    private static final String FIELD_SCHEMA = "schema";
+    private static final String FIELD_PK = "primaryKey";
+
+    @Override
+    protected boolean isDDL() {
+        JsonNode node = root.get(FIELD_IS_DDL);
+        return !isNull(node) && node.asBoolean();
+    }
+
+    public AliyunRecordParser(TypeMapping typeMapping, List<ComputedColumn> 
computedColumns) {
+        super(typeMapping, computedColumns);
+    }
+
+    @Override
+    protected String primaryField() {
+        return "schema.primaryKey";
+    }
+
+    @Override
+    protected String dataField() {
+        return "payload.dataColumn";
+    }
+
+    @Override
+    protected List<String> extractPrimaryKeys() {
+        JsonNode schemaNode = root.get(FIELD_SCHEMA);
+        checkNotNull(schemaNode, FIELD_SCHEMA);
+        ArrayNode pkNode = getNodeAs(schemaNode, FIELD_PK, ArrayNode.class);
+        List<String> pkFields = new ArrayList<>();
+        pkNode.forEach(
+                pk -> {
+                    if (isNull(pk)) {
+                        throw new IllegalArgumentException(
+                                String.format("Primary key cannot be null: 
%s", pk));
+                    }
+
+                    pkFields.add(pk.asText());
+                });
+        return pkFields;
+    }
+
+    @Override
+    public List<RichCdcMultiplexRecord> extractRecords() {
+        if (isDDL()) {
+            return Collections.emptyList();
+        }
+
+        List<RichCdcMultiplexRecord> records = new ArrayList<>();
+
+        JsonNode payload = root.get(FIELD_PAYLOAD);
+        checkNotNull(payload, FIELD_PAYLOAD);
+
+        String type = payload.get(FIELD_TYPE).asText();
+
+        RowKind rowKind = null;
+        String field = null;
+        switch (type) {
+            case OP_UPDATE_BEFORE:
+                rowKind = RowKind.UPDATE_BEFORE;
+                field = FIELD_BEFORE;
+                break;
+            case OP_UPDATE_AFTER:
+                rowKind = RowKind.UPDATE_AFTER;
+                field = FIELD_AFTER;
+                break;
+            case OP_INSERT:
+                rowKind = RowKind.INSERT;
+                field = FIELD_AFTER;
+                break;
+            case OP_DELETE:
+                rowKind = RowKind.DELETE;
+                field = FIELD_BEFORE;
+                break;
+            default:
+                throw new UnsupportedOperationException("Unknown record 
operation: " + type);
+        }
+
+        JsonNode container = payload.get(field);
+        checkNotNull(container, String.format("%s.%s", FIELD_PAYLOAD, field));
+
+        JsonNode data = getNodeAs(container, FIELD_COLUMN, JsonNode.class);
+        checkNotNull(data, String.format("%s.%s.%s", FIELD_PAYLOAD, field, 
FIELD_COLUMN));
+
+        processRecord(data, rowKind, records);
+
+        return records;
+    }
+
+    @Override
+    protected Map<String, String> extractRowData(JsonNode record, 
RowType.Builder rowTypeBuilder) {
+
+        Map<String, Object> recordMap =
+                JsonSerdeUtil.convertValue(record, new 
TypeReference<Map<String, Object>>() {});
+        Map<String, String> rowData = new HashMap<>();
+
+        fillDefaultTypes(record, rowTypeBuilder);
+        for (Map.Entry<String, Object> entry : recordMap.entrySet()) {
+            rowData.put(entry.getKey(), Objects.toString(entry.getValue(), 
null));
+        }
+
+        evalComputedColumns(rowData, rowTypeBuilder);
+        return rowData;
+    }
+
+    @Override
+    protected String format() {
+        return "aliyun-json";
+    }
+
+    @Nullable
+    @Override
+    protected String getTableName() {
+        JsonNode schemaNode = root.get(FIELD_SCHEMA);
+        if (isNull(schemaNode)) {
+            return null;
+        }
+        JsonNode sourceNode = schemaNode.get("source");
+        if (isNull(sourceNode)) {
+            return null;
+        }
+
+        JsonNode tableNode = sourceNode.get("tableName");
+        if (isNull(tableNode)) {
+            return null;
+        }
+        return tableNode.asText();
+    }
+
+    @Nullable
+    @Override
+    protected String getDatabaseName() {
+        JsonNode schemaNode = root.get(FIELD_SCHEMA);
+        if (isNull(schemaNode)) {
+            return null;
+        }
+        JsonNode sourceNode = schemaNode.get("source");
+        if (isNull(sourceNode)) {
+            return null;
+        }
+        JsonNode databaseNode = sourceNode.get("dbName");
+        if (isNull(databaseNode)) {
+            return null;
+        }
+        return databaseNode.asText();
+    }
+
+    private Map<JsonNode, JsonNode> matchOldRecords(ArrayNode newData, 
ArrayNode oldData) {
+        return IntStream.range(0, newData.size())
+                .boxed()
+                .collect(Collectors.toMap(newData::get, oldData::get));
+    }
+
+    private String transformValue(@Nullable String oldValue, String shortType, 
String mySqlType) {
+        if (oldValue == null) {
+            return null;
+        }
+
+        if (MySqlTypeUtils.isSetType(shortType)) {
+            return AliyunFieldParser.convertSet(oldValue, mySqlType);
+        }
+
+        if (MySqlTypeUtils.isEnumType(shortType)) {
+            return AliyunFieldParser.convertEnum(oldValue, mySqlType);
+        }
+
+        if (MySqlTypeUtils.isGeoType(shortType)) {
+            try {
+                byte[] wkb =
+                        AliyunFieldParser.convertGeoType2WkbArray(
+                                
oldValue.getBytes(StandardCharsets.ISO_8859_1));
+                return MySqlTypeUtils.convertWkbArray(wkb);
+            } catch (Exception e) {
+                throw new IllegalArgumentException(
+                        String.format("Failed to convert %s to geometry 
JSON.", oldValue), e);
+            }
+        }
+        return oldValue;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/MessageQueueCdcTimestampExtractor.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/MessageQueueCdcTimestampExtractor.java
index 8a9a28453..5bf2fefc1 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/MessageQueueCdcTimestampExtractor.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/MessageQueueCdcTimestampExtractor.java
@@ -54,6 +54,10 @@ public class MessageQueueCdcTimestampExtractor implements 
CdcTimestampExtractor
         } else if (JsonSerdeUtil.isNodeExists(record, "source", "connector")) {
             // Dbz json
             return JsonSerdeUtil.extractValue(record, Long.class, "ts_ms");
+        } else if (JsonSerdeUtil.isNodeExists(record, "payload", "timestamp")) 
{
+            // Aliyun json
+            return JsonSerdeUtil.extractValue(
+                    record, Long.class, "payload", "timestamp", "systemTime");
         }
         throw new RuntimeException(
                 String.format(
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 17b8b29a2..1b30c7ab6 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -27,6 +27,7 @@ 
org.apache.paimon.flink.action.cdc.mongodb.MongoDBSyncDatabaseActionFactory
 org.apache.paimon.flink.action.cdc.postgres.PostgresSyncTableActionFactory
 
 ### message queue data format factories
+org.apache.paimon.flink.action.cdc.format.aliyun.AliyunDataFormatFactory
 org.apache.paimon.flink.action.cdc.format.canal.CanalDataFormatFactory
 
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumAvroDataFormatFactory
 
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumJsonDataFormatFactory
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunJsonRecordParserTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunJsonRecordParserTest.java
new file mode 100644
index 000000000..f06268d70
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunJsonRecordParserTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.aliyun;
+
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
+import org.apache.paimon.flink.action.cdc.kafka.KafkaActionITCaseBase;
+import 
org.apache.paimon.flink.action.cdc.watermark.MessageQueueCdcTimestampExtractor;
+import org.apache.paimon.flink.sink.cdc.CdcRecord;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.RowKind;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/** Test for AliyunJsonRecordParser. */
+public class AliyunJsonRecordParserTest extends KafkaActionITCaseBase {
+
+    private static final Logger log = 
LoggerFactory.getLogger(AliyunJsonRecordParserTest.class);
+    private static List<String> insertList = new ArrayList<>();
+    private static List<String> updateList = new ArrayList<>();
+    private static List<String> deleteList = new ArrayList<>();
+
+    private static ObjectMapper objMapper = new ObjectMapper();
+
+    @Before
+    public void setup() {
+        String insertRes = "kafka/aliyun/table/event/event-insert.txt";
+        String updateRes = "kafka/aliyun/table/event/event-update-in-one.txt";
+        String deleteRes = "kafka/aliyun/table/event/event-delete.txt";
+        URL url;
+        try {
+            url = 
AliyunJsonRecordParserTest.class.getClassLoader().getResource(insertRes);
+            Files.readAllLines(Paths.get(url.toURI())).stream()
+                    .filter(this::isRecordLine)
+                    .forEach(e -> insertList.add(e));
+
+            url = 
AliyunJsonRecordParserTest.class.getClassLoader().getResource(updateRes);
+            Files.readAllLines(Paths.get(url.toURI())).stream()
+                    .filter(this::isRecordLine)
+                    .forEach(e -> updateList.add(e));
+
+            url = 
AliyunJsonRecordParserTest.class.getClassLoader().getResource(deleteRes);
+            Files.readAllLines(Paths.get(url.toURI())).stream()
+                    .filter(this::isRecordLine)
+                    .forEach(e -> deleteList.add(e));
+
+        } catch (Exception e) {
+            log.error("Fail to init aliyun-json cases", e);
+        }
+    }
+
+    @Test
+    public void extractInsertRecord() throws Exception {
+        AliyunRecordParser parser =
+                new AliyunRecordParser(TypeMapping.defaultMapping(), 
Collections.emptyList());
+        for (String json : insertList) {
+            // 将json解析为JsonNode对象
+            JsonNode rootNode = objMapper.readValue(json, JsonNode.class);
+            CdcSourceRecord cdcRecord = new CdcSourceRecord(rootNode);
+            Schema schema = parser.buildSchema(cdcRecord);
+            Assert.assertEquals(schema.primaryKeys(), Arrays.asList("id"));
+
+            List<RichCdcMultiplexRecord> records = parser.extractRecords();
+            Assert.assertEquals(records.size(), 1);
+
+            CdcRecord result = records.get(0).toRichCdcRecord().toCdcRecord();
+            Assert.assertEquals(result.kind(), RowKind.INSERT);
+
+            String dbName = parser.getDatabaseName();
+            Assert.assertEquals(dbName, "bigdata_test");
+
+            String tableName = parser.getTableName();
+            Assert.assertEquals(tableName, "sync_test_table");
+
+            MessageQueueCdcTimestampExtractor extractor = new 
MessageQueueCdcTimestampExtractor();
+            Assert.assertTrue(extractor.extractTimestamp(cdcRecord) > 0);
+        }
+    }
+
+    @Test
+    public void extractUpdateRecord() throws Exception {
+        AliyunRecordParser parser =
+                new AliyunRecordParser(TypeMapping.defaultMapping(), 
Collections.emptyList());
+        for (String json : updateList) {
+            // 将json解析为JsonNode对象
+            JsonNode jsonNode = objMapper.readValue(json, JsonNode.class);
+            CdcSourceRecord cdcRecord = new CdcSourceRecord(jsonNode);
+            Schema schema = parser.buildSchema(cdcRecord);
+            Assert.assertEquals(schema.primaryKeys(), Arrays.asList("id"));
+
+            List<RichCdcMultiplexRecord> records = parser.extractRecords();
+            Assert.assertEquals(records.size(), 1);
+
+            CdcRecord result = records.get(0).toRichCdcRecord().toCdcRecord();
+            Assert.assertEquals(result.kind(), RowKind.UPDATE_AFTER);
+
+            String dbName = parser.getDatabaseName();
+            Assert.assertEquals(dbName, "bigdata_test");
+
+            String tableName = parser.getTableName();
+            Assert.assertEquals(tableName, "sync_test_table");
+
+            MessageQueueCdcTimestampExtractor extractor = new 
MessageQueueCdcTimestampExtractor();
+            Assert.assertTrue(extractor.extractTimestamp(cdcRecord) > 0);
+        }
+    }
+
+    @Test
+    public void extractDeleteRecord() throws Exception {
+        AliyunRecordParser parser =
+                new AliyunRecordParser(TypeMapping.defaultMapping(), 
Collections.emptyList());
+        for (String json : deleteList) {
+            // 将json解析为JsonNode对象
+            JsonNode jsonNode = objMapper.readValue(json, JsonNode.class);
+            CdcSourceRecord cdcRecord = new CdcSourceRecord(jsonNode);
+            Schema schema = parser.buildSchema(cdcRecord);
+            Assert.assertEquals(schema.primaryKeys(), Arrays.asList("id"));
+
+            List<RichCdcMultiplexRecord> records = parser.extractRecords();
+            Assert.assertEquals(records.size(), 1);
+
+            CdcRecord result = records.get(0).toRichCdcRecord().toCdcRecord();
+            Assert.assertEquals(result.kind(), RowKind.DELETE);
+
+            String dbName = parser.getDatabaseName();
+            Assert.assertEquals(dbName, "bigdata_test");
+
+            String tableName = parser.getTableName();
+            Assert.assertEquals(tableName, "sync_test_table");
+
+            MessageQueueCdcTimestampExtractor extractor = new 
MessageQueueCdcTimestampExtractor();
+            Assert.assertTrue(extractor.extractTimestamp(cdcRecord) > 0);
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-delete.txt
similarity index 59%
copy from 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
copy to 
paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-delete.txt
index efe5e12b1..ebae6608a 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-delete.txt
@@ -16,16 +16,4 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink;
-
-import org.apache.paimon.flink.sink.StoreSinkWrite;
-
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
-
-/** Placeholder class for new feature introduced since flink 1.19. Should 
never be used. */
-public class ProcessRecordAttributesUtil {
-    public static void processWithWrite(RecordAttributes recordAttributes, 
StoreSinkWrite write) {}
-
-    public static void processWithOutput(RecordAttributes recordAttributes, 
Output output) {}
-}
+{"schema":{"dataColumn":[{"name":"id","type":"LONG"},{"name":"val","type":"DOUBLE"},{"name":"name","type":"STRING"},{"name":"create_time","type":"DATE"}],"primaryKey":["id"],"source":{"dbType":"MySQL","dbName":"bigdata_test","tableName":"sync_test_table"}},"payload":{"before":{"dataColumn":{"id":1,"val":"1.100000","name":"a","create_time":1731661114000}},"after":null,"sequenceId":"1731663842292000000","timestamp":{"eventTime":1731662085000,"systemTime":1731663848953,"checkpointTime":1731
 [...]
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-insert.txt
similarity index 59%
copy from 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
copy to 
paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-insert.txt
index efe5e12b1..d1cd34e5e 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-insert.txt
@@ -16,16 +16,4 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink;
-
-import org.apache.paimon.flink.sink.StoreSinkWrite;
-
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
-
-/** Placeholder class for new feature introduced since flink 1.19. Should 
never be used. */
-public class ProcessRecordAttributesUtil {
-    public static void processWithWrite(RecordAttributes recordAttributes, 
StoreSinkWrite write) {}
-
-    public static void processWithOutput(RecordAttributes recordAttributes, 
Output output) {}
-}
+{"payload":{"after":{"dataColumn":{"create_time":1731661114000,"id":2,"name":"a","val":"1.100000"}},"before":null,"ddl":null,"op":"INSERT","sequenceId":"-1","timestamp":{"checkpointTime":-1,"eventTime":-1,"systemTime":1731661820245}},"schema":{"dataColumn":[{"name":"id","type":"LONG"},{"name":"val","type":"DOUBLE"},{"name":"name","type":"STRING"},{"name":"create_time","type":"DATE"}],"primaryKey":["id"],"source":{"dbName":"bigdata_test","dbType":"MySQL","tableName":"sync_test_table"}},"v
 [...]
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-update-in-one.txt
similarity index 56%
rename from 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
rename to 
paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-update-in-one.txt
index efe5e12b1..9acf6309c 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-update-in-one.txt
@@ -16,16 +16,4 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink;
-
-import org.apache.paimon.flink.sink.StoreSinkWrite;
-
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
-
-/** Placeholder class for new feature introduced since flink 1.19. Should 
never be used. */
-public class ProcessRecordAttributesUtil {
-    public static void processWithWrite(RecordAttributes recordAttributes, 
StoreSinkWrite write) {}
-
-    public static void processWithOutput(RecordAttributes recordAttributes, 
Output output) {}
-}
+ 
{"schema":{"dataColumn":[{"name":"id","type":"LONG"},{"name":"val","type":"DOUBLE"},{"name":"name","type":"STRING"},{"name":"create_time","type":"DATE"}],"primaryKey":["id"],"source":{"dbType":"MySQL","dbName":"bigdata_test","tableName":"sync_test_table"}},"payload":{"before":{"dataColumn":{"id":2,"val":"1.100000","name":"a","create_time":1731661114000}},"after":{"dataColumn":{"id":2,"val":"2.200000","name":"a","create_time":1731661114000}},"sequenceId":"1731663842292000001","timestamp"
 [...]
\ No newline at end of file


Reply via email to