This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4281b867ac [Feature][connector][kafka] Support read Maxwell format 
message from kafka #4415 (#4428)
4281b867ac is described below

commit 4281b867ac04fbc74e6f8ce14d6c4254884db532
Author: ZhiLin Li <[email protected]>
AuthorDate: Wed Jul 3 17:29:25 2024 +0800

    [Feature][connector][kafka] Support read Maxwell format message from kafka 
#4415 (#4428)
---
 docs/en/connector-v2/formats/canal-json.md         |   2 +-
 docs/en/connector-v2/formats/maxwell-json.md       |  91 ++++++++
 docs/en/connector-v2/formats/ogg-json.md           |   4 +-
 .../seatunnel/kafka/config/MessageFormat.java      |   3 +-
 .../serialize/DefaultSeaTunnelRowSerializer.java   |   3 +
 .../seatunnel/kafka/source/KafkaSourceConfig.java  |   6 +
 .../e2e/connector/kafka/KafkaFormatIT.java         |  83 ++++++++
 .../src/test/resources/maxwell/maxwell_data.txt    |  12 ++
 .../kafkasource_maxwell_cdc_to_pgsql.conf          |  64 ++++++
 .../kafkasource_maxwell_to_kafka.conf              |  58 ++++++
 .../maxwell/MaxWellJsonDeserializationSchema.java  | 228 +++++++++++++++++++++
 .../json/maxwell/MaxWellJsonFormatOptions.java     |  60 ++++++
 .../maxwell/MaxWellJsonSerializationSchema.java    |  86 ++++++++
 .../json/maxwell/MaxWellJsonSerDeSchemaTest.java   | 187 +++++++++++++++++
 .../test/resources/maxwell-data-filter-table.txt   |  20 ++
 15 files changed, 903 insertions(+), 4 deletions(-)

diff --git a/docs/en/connector-v2/formats/canal-json.md 
b/docs/en/connector-v2/formats/canal-json.md
index 02e185bee5..6e133a9a82 100644
--- a/docs/en/connector-v2/formats/canal-json.md
+++ b/docs/en/connector-v2/formats/canal-json.md
@@ -24,7 +24,7 @@ SeaTunnel also supports to encode the INSERT/UPDATE/DELETE 
messages in SeaTunnel
 
 # How to use
 
-## Kafka uses example
+## Kafka Uses Example
 
 Canal provides a unified format for changelog, here is a simple example for an 
update operation captured from a MySQL products table:
 
diff --git a/docs/en/connector-v2/formats/maxwell-json.md 
b/docs/en/connector-v2/formats/maxwell-json.md
new file mode 100644
index 0000000000..5e1c851d9e
--- /dev/null
+++ b/docs/en/connector-v2/formats/maxwell-json.md
@@ -0,0 +1,91 @@
+# MaxWell Format
+
+[Maxwell](https://maxwells-daemon.io/) is a CDC (Changelog Data Capture) tool 
that can stream changes in real-time from MySQL into Kafka, Kinesis and other 
streaming connectors. Maxwell provides a unified format schema for changelog 
and supports to serialize messages using JSON.
+
+Seatunnel supports to interpret MaxWell JSON messages as INSERT/UPDATE/DELETE 
messages into seatunnel system. This is useful in many cases to leverage this 
feature, such as
+
+        synchronizing incremental data from databases to other systems
+        auditing logs
+        real-time materialized views on databases
+        temporal join changing history of a database table and so on.
+
+Seatunnel also supports to encode the INSERT/UPDATE/DELETE messages in 
Seatunnel as MaxWell JSON messages, and emit to storage like Kafka. However, 
currently Seatunnel can’t combine UPDATE_BEFORE and UPDATE_AFTER into a single 
UPDATE message. Therefore, Seatunnel encodes UPDATE_BEFORE and UPDATE_AFTER as 
DELETE and INSERT MaxWell messages.
+
+# Format Options
+
+|              Option              | Default | Required |                      
                                                                           
Description                                                                     
                             |
+|----------------------------------|---------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| format                           | (none)  | yes      | Specify what format 
to use, here should be 'maxwell_json'.                                          
                                                                                
                         |
+| maxwell_json.ignore-parse-errors | false   | no       | Skip fields and rows 
with parse errors instead of failing. Fields are set to null in case of errors. 
                                                                                
                        |
+| maxwell_json.database.include    | (none)  | no       | An optional regular 
expression to only read the specific databases changelog rows by regular 
matching the "database" meta field in the MaxWell record. The pattern string is 
compatible with Java's Pattern. |
+| maxwell_json.table.include       | (none)  | no       | An optional regular 
expression to only read the specific tables changelog rows by regular matching 
the "table" meta field in the MaxWell record. The pattern string is compatible 
with Java's Pattern.       |
+
+# How To Use MaxWell format
+
+## Kafka Uses Example
+
+MaxWell provides a unified format for changelog, here is a simple example for 
an update operation captured from a MySQL products table:
+
+```bash
+{
+    "database":"test",
+    "table":"product",
+    "type":"insert",
+    "ts":1596684904,
+    "xid":7201,
+    "commit":true,
+    "data":{
+        "id":111,
+        "name":"scooter",
+        "description":"Big 2-wheel scooter ",
+        "weight":5.18
+    },
+    "primary_key_columns":[
+        "id"
+    ]
+}
+```
+
+Note: please refer to MaxWell documentation about the meaning of each fields.
+
+The MySQL products table has 4 columns (id, name, description and weight).
+The above JSON message is an update change event on the products table where 
the weight value of the row with id = 111 is changed from 5.18 to 5.15.
+Assuming the messages have been synchronized to Kafka topic products_binlog, 
then we can use the following Seatunnel to consume this topic and interpret the 
change events.
+
+```bash
+env {
+    execution.parallelism = 1
+    job.mode = "BATCH"
+}
+
+source {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "products_binlog"
+    result_table_name = "kafka_name"
+    start_mode = earliest
+    schema = {
+      fields {
+           id = "int"
+           name = "string"
+           description = "string"
+           weight = "string"
+      }
+    },
+    format = maxwell_json
+  }
+
+}
+
+transform {
+}
+
+sink {
+  Kafka {
+    bootstrap.servers = "localhost:9092"
+    topic = "consume-binlog"
+    format = maxwell_json
+  }
+}
+```
+
diff --git a/docs/en/connector-v2/formats/ogg-json.md 
b/docs/en/connector-v2/formats/ogg-json.md
index 4c363c779a..3faeb33c4f 100644
--- a/docs/en/connector-v2/formats/ogg-json.md
+++ b/docs/en/connector-v2/formats/ogg-json.md
@@ -20,9 +20,9 @@ Seatunnel also supports to encode the INSERT/UPDATE/DELETE 
messages in Seatunnel
 | ogg_json.database.include    | (none)  | no       | An optional regular 
expression to only read the specific databases changelog rows by regular 
matching the "database" meta field in the Canal record. The pattern string is 
compatible with Java's Pattern. |
 | ogg_json.table.include       | (none)  | no       | An optional regular 
expression to only read the specific tables changelog rows by regular matching 
the "table" meta field in the Canal record. The pattern string is compatible 
with Java's Pattern.       |
 
-# How to use Ogg format
+# How to Use Ogg format
 
-## Kafka uses example
+## Kafka Uses Example
 
 Ogg provides a unified format for changelog, here is a simple example for an 
update operation captured from a Oracle products table:
 
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
index 18e466e41c..f02cebcbe3 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
@@ -25,5 +25,6 @@ public enum MessageFormat {
     COMPATIBLE_DEBEZIUM_JSON,
     COMPATIBLE_KAFKA_CONNECT_JSON,
     OGG_JSON,
-    AVRO
+    AVRO,
+    MAXWELL_JSON
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
index 51d491002c..d4a77e74b9 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
@@ -31,6 +31,7 @@ import 
org.apache.seatunnel.format.json.JsonSerializationSchema;
 import org.apache.seatunnel.format.json.canal.CanalJsonSerializationSchema;
 import 
org.apache.seatunnel.format.json.debezium.DebeziumJsonSerializationSchema;
 import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+import org.apache.seatunnel.format.json.maxwell.MaxWellJsonSerializationSchema;
 import org.apache.seatunnel.format.json.ogg.OggJsonSerializationSchema;
 import org.apache.seatunnel.format.text.TextSerializationSchema;
 
@@ -226,6 +227,8 @@ public class DefaultSeaTunnelRowSerializer implements 
SeaTunnelRowSerializer {
                 return new OggJsonSerializationSchema(rowType);
             case DEBEZIUM_JSON:
                 return new DebeziumJsonSerializationSchema(rowType);
+            case MAXWELL_JSON:
+                return new MaxWellJsonSerializationSchema(rowType);
             case COMPATIBLE_DEBEZIUM_JSON:
                 return new CompatibleDebeziumJsonSerializationSchema(rowType, 
isKey);
             case AVRO:
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
index 232ee2f0e0..1c782ca6ab 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
@@ -42,6 +42,7 @@ import 
org.apache.seatunnel.format.json.JsonDeserializationSchema;
 import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
 import 
org.apache.seatunnel.format.json.debezium.DebeziumJsonDeserializationSchema;
 import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+import 
org.apache.seatunnel.format.json.maxwell.MaxWellJsonDeserializationSchema;
 import org.apache.seatunnel.format.json.ogg.OggJsonDeserializationSchema;
 import org.apache.seatunnel.format.text.TextDeserializationSchema;
 import org.apache.seatunnel.format.text.constant.TextFormatConstant;
@@ -250,6 +251,11 @@ public class KafkaSourceConfig implements Serializable {
                 return OggJsonDeserializationSchema.builder(catalogTable)
                         .setIgnoreParseErrors(true)
                         .build();
+            case MAXWELL_JSON:
+                return MaxWellJsonDeserializationSchema.builder(catalogTable)
+                        .setIgnoreParseErrors(true)
+                        .build();
+
             case COMPATIBLE_KAFKA_CONNECT_JSON:
                 Boolean keySchemaEnable =
                         readonlyConfig.get(
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java
index 25fa9b4aab..ec7b0173ff 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java
@@ -95,6 +95,12 @@ import static org.awaitility.Awaitility.given;
 public class KafkaFormatIT extends TestSuiteBase implements TestResource {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaFormatIT.class);
+
+    // ---------------------------MaxWell Format 
Parameter---------------------------------------
+    private static final String MAXWELL_DATA_PATH = 
"/maxwell/maxwell_data.txt";
+    private static final String MAXWELL_KAFKA_SOURCE_TOPIC = 
"maxwell-test-cdc_mds";
+    private static final String MAXWELL_KAFKA_SINK_TOPIC = "test-maxwell-sink";
+
     // ---------------------------Ogg Format 
Parameter---------------------------------------
     private static final String OGG_DATA_PATH = "/ogg/ogg_data.txt";
     private static final String OGG_KAFKA_SOURCE_TOPIC = "test-ogg-source";
@@ -239,6 +245,7 @@ public class KafkaFormatIT extends TestSuiteBase implements 
TestResource {
                     {
                         put(CANAL_DATA_PATH, CANAL_KAFKA_SOURCE_TOPIC);
                         put(OGG_DATA_PATH, OGG_KAFKA_SOURCE_TOPIC);
+                        put(MAXWELL_DATA_PATH, MAXWELL_KAFKA_SOURCE_TOPIC);
                         put(COMPATIBLE_DATA_PATH, 
COMPATIBLE_KAFKA_SOURCE_TOPIC);
                         put(DEBEZIUM_DATA_PATH, DEBEZIUM_KAFKA_SOURCE_TOPIC);
                     }
@@ -418,6 +425,25 @@ public class KafkaFormatIT extends TestSuiteBase 
implements TestResource {
         checkCompatibleFormat();
     }
 
+    @TestTemplate
+    public void testFormatMaxWellCheck(TestContainer container)
+            throws IOException, InterruptedException {
+
+        LOG.info("====================== Check MaxWell======================");
+        // check MaxWell to Postgresql
+        Container.ExecResult checkMaxWellResultToKafka =
+                
container.executeJob("/maxwellFormatIT/kafkasource_maxwell_to_kafka.conf");
+        Assertions.assertEquals(
+                0, checkMaxWellResultToKafka.getExitCode(), 
checkMaxWellResultToKafka.getStderr());
+
+        Container.ExecResult checkDataResult =
+                
container.executeJob("/maxwellFormatIT/kafkasource_maxwell_cdc_to_pgsql.conf");
+        Assertions.assertEquals(0, checkDataResult.getExitCode(), 
checkDataResult.getStderr());
+
+        // Check MaxWell
+        checkMaxWellFormat();
+    }
+
     private void checkFormatCanalAndOgg() {
         List<List<Object>> postgreSinkTableList = 
getPostgreSinkTableList(PG_SINK_TABLE1);
         List<List<Object>> checkArraysResult =
@@ -533,6 +559,63 @@ public class KafkaFormatIT extends TestSuiteBase 
implements TestResource {
         Assertions.assertIterableEquals(expected, postgreSinkTableList);
     }
 
+    private void checkMaxWellFormat() {
+        List<String> expectedResult =
+                Arrays.asList(
+                        
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":\"3.14\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":102,\"name\":\"car 
battery\",\"description\":\"12V car 
battery\",\"weight\":\"8.1\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":103,\"name\":\"12-pack drill 
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to 
#3\",\"weight\":\"0.8\"},\"type\":\"INSERT\"}",
+                        
"{\"data\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's 
hammer\",\"weight\":\"0.75\"},\"type\":\"INSERT\"}",
+                        
"{\"data\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's 
hammer\",\"weight\":\"0.875\"},\"type\":\"INSERT\"}",
+                        
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's 
hammer\",\"weight\":\"1.0\"},\"type\":\"INSERT\"}",
+                        
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":\"5.3\"},\"type\":\"INSERT\"}",
+                        
"{\"data\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent 
black wind breaker\",\"weight\":\"0.1\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":109,\"name\":\"spare 
tire\",\"description\":\"24 inch spare 
tire\",\"weight\":\"22.2\"},\"type\":\"INSERT\"}",
+                        
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":\"3.14\"},\"type\":\"DELETE\"}",
+                        
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":\"4.56\"},\"type\":\"INSERT\"}",
+                        
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":\"5.3\"},\"type\":\"DELETE\"}",
+                        
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":\"7.88\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":109,\"name\":\"spare 
tire\",\"description\":\"24 inch spare 
tire\",\"weight\":\"22.2\"},\"type\":\"DELETE\"}");
+
+        ArrayList<String> result = new ArrayList<>();
+        ArrayList<String> topics = new ArrayList<>();
+        topics.add(MAXWELL_KAFKA_SINK_TOPIC);
+        kafkaConsumer.subscribe(topics);
+        await().atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            ConsumerRecords<String, String> consumerRecords =
+                                    
kafkaConsumer.poll(Duration.ofMillis(1000));
+                            for (ConsumerRecord<String, String> record : 
consumerRecords) {
+                                result.add(record.value());
+                            }
+                            Assertions.assertEquals(expectedResult, result);
+                        });
+
+        LOG.info(
+                "==================== start kafka MaxWell format to pg check 
====================");
+
+        List<List<Object>> postgreSinkTableList = 
getPostgreSinkTableList(PG_SINK_TABLE1);
+
+        List<List<Object>> expected =
+                Stream.<List<Object>>of(
+                                Arrays.asList(101, "scooter", "Small 2-wheel 
scooter", "4.56"),
+                                Arrays.asList(102, "car battery", "12V car 
battery", "8.1"),
+                                Arrays.asList(
+                                        103,
+                                        "12-pack drill bits",
+                                        "12-pack of drill bits with sizes 
ranging from #40 to #3",
+                                        "0.8"),
+                                Arrays.asList(104, "hammer", "12oz carpenter's 
hammer", "0.75"),
+                                Arrays.asList(105, "hammer", "14oz carpenter's 
hammer", "0.875"),
+                                Arrays.asList(106, "hammer", "16oz carpenter's 
hammer", "1.0"),
+                                Arrays.asList(107, "rocks", "box of assorted 
rocks", "7.88"),
+                                Arrays.asList(
+                                        108, "jacket", "water resistent black 
wind breaker", "0.1"))
+                        .collect(Collectors.toList());
+        Assertions.assertIterableEquals(expected, postgreSinkTableList);
+    }
+
     private void checkOggFormat() {
         List<String> kafkaExpectedResult =
                 Arrays.asList(
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/maxwell/maxwell_data.txt
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/maxwell/maxwell_data.txt
new file mode 100644
index 0000000000..615c5b4458
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/maxwell/maxwell_data.txt
@@ -0,0 +1,12 @@
+{"database":"maxwell_eal7e6","table":"products","type":"insert","ts":1699253290,"xid":246,"xoffset":0,"data":{"id":101,"name":"scooter","description":"Small
 2-wheel scooter","weight":"3.14"}}
+{"database":"maxwell_eal7e6","table":"products","type":"insert","ts":1699253290,"xid":246,"xoffset":1,"data":{"id":102,"name":"car
 battery","description":"12V car battery","weight":"8.1"}}
+{"database":"maxwell_eal7e6","table":"products","type":"insert","ts":1699253290,"xid":246,"xoffset":2,"data":{"id":103,"name":"12-pack
 drill bits","description":"12-pack of drill bits with sizes ranging from #40 
to #3","weight":"0.8"}}
+{"database":"maxwell_eal7e6","table":"products","type":"insert","ts":1699253290,"xid":246,"xoffset":3,"data":{"id":104,"name":"hammer","description":"12oz
 carpenter's hammer","weight":"0.75"}}
+{"database":"maxwell_eal7e6","table":"products","type":"insert","ts":1699253290,"xid":246,"xoffset":4,"data":{"id":105,"name":"hammer","description":"14oz
 carpenter's hammer","weight":"0.875"}}
+{"database":"maxwell_eal7e6","table":"products","type":"insert","ts":1699253290,"xid":246,"xoffset":5,"data":{"id":106,"name":"hammer","description":"16oz
 carpenter's hammer","weight":"1.0"}}
+{"database":"maxwell_eal7e6","table":"products","type":"insert","ts":1699253290,"xid":246,"xoffset":6,"data":{"id":107,"name":"rocks","description":"box
 of assorted rocks","weight":"5.3"}}
+{"database":"maxwell_eal7e6","table":"products","type":"insert","ts":1699253290,"xid":246,"xoffset":7,"data":{"id":108,"name":"jacket","description":"water
 resistent black wind breaker","weight":"0.1"}}
+{"database":"maxwell_eal7e6","table":"products","type":"insert","ts":1699253290,"xid":246,"commit":true,"data":{"id":109,"name":"spare
 tire","description":"24 inch spare tire","weight":"22.2"}}
+{"database":"maxwell_eal7e6","table":"products","type":"update","ts":1699253290,"xid":248,"commit":true,"data":{"id":101,"name":"scooter","description":"Small
 2-wheel scooter","weight":"4.56"},"old":{"weight":"3.14"}}
+{"database":"maxwell_eal7e6","table":"products","type":"update","ts":1699253290,"xid":250,"commit":true,"data":{"id":107,"name":"rocks","description":"box
 of assorted rocks","weight":"7.88"},"old":{"weight":"5.3"}}
+{"database":"maxwell_eal7e6","table":"products","type":"delete","ts":1699253290,"xid":252,"commit":true,"data":{"id":109,"name":"spare
 tire","description":"24 inch spare tire","weight":"22.2"}}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/maxwellFormatIT/kafkasource_maxwell_cdc_to_pgsql.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/maxwellFormatIT/kafkasource_maxwell_cdc_to_pgsql.conf
new file mode 100644
index 0000000000..ada7122730
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/maxwellFormatIT/kafkasource_maxwell_cdc_to_pgsql.conf
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  Kafka {
+    bootstrap.servers = "kafka_e2e:9092"
+    topic = "maxwell-test-cdc_mds"
+    consumer.group = "maxwell_format_to_pg"
+    start_mode = earliest
+    schema = {
+      fields {
+        id = "int"
+        name = "string"
+        description = "string"
+        weight = "string"
+      }
+    },
+    format = maxwell_json
+  }
+}
+
+
+sink {
+  Jdbc {
+
+    driver = org.postgresql.Driver
+    url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
+    user = test
+    password = test
+    generate_sink_sql = true
+    database = test
+    table = public.sink
+    primary_keys = ["id"]
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/maxwellFormatIT/kafkasource_maxwell_to_kafka.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/maxwellFormatIT/kafkasource_maxwell_to_kafka.conf
new file mode 100644
index 0000000000..b7d25eb9a0
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/maxwellFormatIT/kafkasource_maxwell_to_kafka.conf
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  Kafka {
+    bootstrap.servers = "kafka_e2e:9092"
+    topic = "maxwell-test-cdc_mds"
+    consumer.group = "maxwell_format_to_kafka"
+    start_mode = earliest
+    schema = {
+      fields {
+        id = "int"
+        name = "string"
+        description = "string"
+        weight = "string"
+      }
+    },
+    format = maxwell_json
+  }
+}
+
+sink {
+  Kafka {
+    bootstrap.servers = "kafka_e2e:9092"
+    topic = "test-maxwell-sink"
+    format = maxwell_json
+    partition = 0
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonDeserializationSchema.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonDeserializationSchema.java
new file mode 100644
index 0000000000..a87784353e
--- /dev/null
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonDeserializationSchema.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.maxwell;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+import static java.lang.String.format;
+
+public class MaxWellJsonDeserializationSchema implements 
DeserializationSchema<SeaTunnelRow> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String FIELD_OLD = "old";
+
+    private static final String FIELD_DATA = "data";
+
+    private static final String FIELD_TYPE = "type";
+
+    private static final String OP_INSERT = "insert";
+
+    private static final String OP_UPDATE = "update";
+
+    private static final String OP_DELETE = "delete";
+
+    private static final String FIELD_DATABASE = "database";
+
+    private static final String FIELD_TABLE = "table";
+
+    private final String database;
+
+    private final String table;
+
+    /** Names of fields. */
+    private final String[] fieldNames;
+
+    /** Number of fields. */
+    private final int fieldCount;
+
+    private final boolean ignoreParseErrors;
+
+    /** Pattern of the specific database. */
+    private final Pattern databasePattern;
+
+    /** Pattern of the specific table. */
+    private final Pattern tablePattern;
+
+    private final JsonDeserializationSchema jsonDeserializer;
+
+    private final CatalogTable catalogTable;
+    private final SeaTunnelRowType seaTunnelRowType;
+
+    public MaxWellJsonDeserializationSchema(
+            CatalogTable catalogTable, String database, String table, boolean 
ignoreParseErrors) {
+        this.catalogTable = catalogTable;
+        this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
+        this.jsonDeserializer =
+                new JsonDeserializationSchema(false, ignoreParseErrors, 
seaTunnelRowType);
+        this.database = database;
+        this.table = table;
+        this.fieldNames = seaTunnelRowType.getFieldNames();
+        this.fieldCount = seaTunnelRowType.getTotalFields();
+        this.ignoreParseErrors = ignoreParseErrors;
+        this.databasePattern = database == null ? null : 
Pattern.compile(database);
+        this.tablePattern = table == null ? null : Pattern.compile(table);
+    }
+
+    @Override
+    public SeaTunnelRow deserialize(byte[] message) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+        return this.seaTunnelRowType;
+    }
+
+    @Override
+    public void deserialize(byte[] message, Collector<SeaTunnelRow> out) {
+        if (message == null) {
+            return;
+        }
+        ObjectNode jsonNode = (ObjectNode) convertBytes(message);
+        if (database != null
+                && 
!databasePattern.matcher(jsonNode.get(FIELD_DATABASE).asText()).matches()) {
+            return;
+        }
+        if (table != null && 
!tablePattern.matcher(jsonNode.get(FIELD_TABLE).asText()).matches()) {
+            return;
+        }
+        JsonNode dataNode = jsonNode.get(FIELD_DATA);
+        String type = jsonNode.get(FIELD_TYPE).asText();
+        if (OP_INSERT.equals(type)) {
+            SeaTunnelRow rowInsert = convertJsonNode(dataNode);
+            rowInsert.setRowKind(RowKind.INSERT);
+            out.collect(rowInsert);
+        } else if (OP_UPDATE.equals(type)) {
+            SeaTunnelRow rowAfter = convertJsonNode(dataNode);
+            JsonNode oldNode = jsonNode.get(FIELD_OLD);
+            SeaTunnelRow rowBefore = convertJsonNode(oldNode);
+            for (int f = 0; f < fieldCount; f++) {
+                assert rowBefore != null;
+                if (rowBefore.isNullAt(f) && oldNode.findValue(fieldNames[f]) 
== null) {
+                    // fields in "old" (before) means the fields are changed
+                    // fields not in "old" (before) means the fields are not 
changed
+                    // so we just copy the not changed fields into before
+                    assert rowAfter != null;
+                    rowBefore.setField(f, rowAfter.getField(f));
+                }
+            }
+            assert rowBefore != null;
+            rowBefore.setRowKind(RowKind.UPDATE_BEFORE);
+            assert rowAfter != null;
+            rowAfter.setRowKind(RowKind.UPDATE_AFTER);
+            out.collect(rowBefore);
+            out.collect(rowAfter);
+        } else if (OP_DELETE.equals(type)) {
+            SeaTunnelRow rowDelete = convertJsonNode(dataNode);
+            rowDelete.setRowKind(RowKind.DELETE);
+            out.collect(rowDelete);
+        } else {
+            if (!ignoreParseErrors) {
+                throw new SeaTunnelJsonFormatException(
+                        CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                        format(
+                                "Unknown \"type\" value \"%s\". The MaxWell 
JSON message is '%s'",
+                                type, new String(message)));
+            }
+        }
+    }
+
+    private JsonNode convertBytes(byte[] message) {
+        try {
+            return jsonDeserializer.deserializeToJsonNode(message);
+        } catch (Exception t) {
+            if (ignoreParseErrors) {
+                return null;
+            }
+            throw new SeaTunnelJsonFormatException(
+                    CommonErrorCode.CONVERT_TO_CONNECTOR_TYPE_ERROR_SIMPLE,
+                    String.format("Failed to deserialize JSON '%s'.", new 
String(message)),
+                    t);
+        }
+    }
+
+    private SeaTunnelRow convertJsonNode(JsonNode root) {
+        return jsonDeserializer.convertToRowData(root);
+    }
+
+    private static SeaTunnelRowType createJsonRowType(SeaTunnelRowType 
physicalDataType) {
+        // MaxWell JSON contains other information, e.g. "ts", "sql", but we 
don't need them
+        return physicalDataType;
+    }
+
+    // 
------------------------------------------------------------------------------------------
+    // Builder
+    // 
------------------------------------------------------------------------------------------
+
+    /** Creates A builder for building a {@link 
MaxWellJsonDeserializationSchema}. */
+    public static Builder builder(CatalogTable catalogTable) {
+        return new Builder(catalogTable);
+    }
+
+    public static class Builder {
+
+        private boolean ignoreParseErrors = false;
+
+        private String database = null;
+
+        private String table = null;
+
+        private final CatalogTable catalogTable;
+
+        public Builder(CatalogTable catalogTable) {
+            this.catalogTable = catalogTable;
+        }
+
+        public Builder setDatabase(String database) {
+            this.database = database;
+            return this;
+        }
+
+        public Builder setTable(String table) {
+            this.table = table;
+            return this;
+        }
+
+        public Builder setIgnoreParseErrors(boolean ignoreParseErrors) {
+            this.ignoreParseErrors = ignoreParseErrors;
+            return this;
+        }
+
+        public MaxWellJsonDeserializationSchema build() {
+            return new MaxWellJsonDeserializationSchema(
+                    catalogTable, database, table, ignoreParseErrors);
+        }
+    }
+}
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonFormatOptions.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonFormatOptions.java
new file mode 100644
index 0000000000..0b6a5cd287
--- /dev/null
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonFormatOptions.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.maxwell;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.format.json.JsonFormatOptions;
+
+import java.util.Map;
+
+/** Option utils for MaxWell_json format. */
+public class MaxWellJsonFormatOptions {
+
+    public static final Option<Boolean> IGNORE_PARSE_ERRORS = 
JsonFormatOptions.IGNORE_PARSE_ERRORS;
+
+    public static final Option<String> DATABASE_INCLUDE =
+            Options.key("database.include")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "An optional regular expression to only read the 
specific databases changelog rows by regular matching the \"database\" meta 
field in the MaxWell record."
+                                    + "The pattern string is compatible with 
Java's Pattern.");
+
+    public static final Option<String> TABLE_INCLUDE =
+            Options.key("table.include")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "An optional regular expression to only read the 
specific tables changelog rows by regular matching the \"table\" meta field in 
the MaxWell record."
+                                    + "The pattern string is compatible with 
Java's Pattern.");
+
+    public static String getTableInclude(Map<String, String> options) {
+        return options.getOrDefault(TABLE_INCLUDE.key(), null);
+    }
+
+    public static String getDatabaseInclude(Map<String, String> options) {
+        return options.getOrDefault(DATABASE_INCLUDE.key(), null);
+    }
+
+    public static boolean getIgnoreParseErrors(Map<String, String> options) {
+        return Boolean.parseBoolean(
+                options.getOrDefault(IGNORE_PARSE_ERRORS.key(), 
IGNORE_PARSE_ERRORS.toString()));
+    }
+}
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonSerializationSchema.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonSerializationSchema.java
new file mode 100644
index 0000000000..931d6c453b
--- /dev/null
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonSerializationSchema.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.maxwell;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+
+public class MaxWellJsonSerializationSchema implements SerializationSchema {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String OP_INSERT = "INSERT";
+    private static final String OP_DELETE = "DELETE";
+
+    public static final String FORMAT = "MAXWELL";
+
+    private transient SeaTunnelRow reuse;
+
+    private final JsonSerializationSchema jsonSerializer;
+
+    public MaxWellJsonSerializationSchema(SeaTunnelRowType rowType) {
+        this.jsonSerializer = new 
JsonSerializationSchema(createJsonRowType(rowType));
+        this.reuse = new SeaTunnelRow(2);
+    }
+
+    @Override
+    public byte[] serialize(SeaTunnelRow row) {
+        try {
+            String opType = rowKind2String(row.getRowKind());
+            reuse.setField(0, row);
+            reuse.setField(1, opType);
+            return jsonSerializer.serialize(reuse);
+        } catch (Throwable t) {
+            throw CommonError.jsonOperationError(FORMAT, row.toString(), t);
+        }
+    }
+
+    private String rowKind2String(RowKind rowKind) {
+        switch (rowKind) {
+            case INSERT:
+            case UPDATE_AFTER:
+                return OP_INSERT;
+            case UPDATE_BEFORE:
+            case DELETE:
+                return OP_DELETE;
+            default:
+                throw new SeaTunnelJsonFormatException(
+                        CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
+                        String.format("Unsupported operation %s for row 
kind.", rowKind));
+        }
+    }
+
+    private static SeaTunnelRowType createJsonRowType(SeaTunnelRowType 
databaseSchema) {
+        // MaxWell JSON contains other information, e.g. "database", "ts"
+        // but we don't need them
+        // and we don't need "old" , because can not support 
UPDATE_BEFORE,UPDATE_AFTER
+        return new SeaTunnelRowType(
+                new String[] {"data", "type"},
+                new SeaTunnelDataType[] {databaseSchema, STRING_TYPE});
+    }
+}
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonSerDeSchemaTest.java
 
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonSerDeSchemaTest.java
new file mode 100644
index 0000000000..a4e06ac2b1
--- /dev/null
+++ 
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonSerDeSchemaTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.maxwell;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.seatunnel.api.table.type.BasicType.FLOAT_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.INT_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class MaxWellJsonSerDeSchemaTest {
+
+    private static final SeaTunnelRowType SEATUNNEL_ROW_TYPE =
+            new SeaTunnelRowType(
+                    new String[] {"id", "name", "description", "weight"},
+                    new SeaTunnelDataType[] {INT_TYPE, STRING_TYPE, 
STRING_TYPE, FLOAT_TYPE});
+    private static final CatalogTable catalogTables =
+            CatalogTableUtil.getCatalogTable("", "", "", "", 
SEATUNNEL_ROW_TYPE);
+
+    @Test
+    public void testFilteringTables() throws Exception {
+        List<String> lines = readLines("maxwell-data-filter-table.txt");
+        MaxWellJsonDeserializationSchema deserializationSchema =
+                new MaxWellJsonDeserializationSchema.Builder(catalogTables)
+                        .setDatabase("^test.*")
+                        .setTable("^prod.*")
+                        .build();
+        runTest(lines, deserializationSchema);
+    }
+
+    @Test
+    public void testDeserializeNullRow() throws Exception {
+        final MaxWellJsonDeserializationSchema deserializationSchema =
+                createMaxWellJsonDeserializationSchema(null, null);
+        final SimpleCollector collector = new SimpleCollector();
+
+        deserializationSchema.deserialize(null, collector);
+        assertEquals(0, collector.list.size());
+    }
+
+    public void runTest(List<String> lines, MaxWellJsonDeserializationSchema 
deserializationSchema)
+            throws IOException {
+        SimpleCollector collector = new SimpleCollector();
+        for (String line : lines) {
+            
deserializationSchema.deserialize(line.getBytes(StandardCharsets.UTF_8), 
collector);
+        }
+        List<String> expected =
+                Arrays.asList(
+                        "SeaTunnelRow{tableId=, kind=+I, fields=[101, scooter, 
Small 2-wheel scooter, 3.14]}",
+                        "SeaTunnelRow{tableId=, kind=+I, fields=[102, car 
battery, 12V car battery, 8.1]}",
+                        "SeaTunnelRow{tableId=, kind=+I, fields=[103, 12-pack 
drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]}",
+                        "SeaTunnelRow{tableId=, kind=+I, fields=[104, hammer, 
12oz carpenter's hammer, 0.75]}",
+                        "SeaTunnelRow{tableId=, kind=+I, fields=[105, hammer, 
14oz carpenter's hammer, 0.875]}",
+                        "SeaTunnelRow{tableId=, kind=+I, fields=[106, hammer, 
16oz carpenter's hammer, 1.0]}",
+                        "SeaTunnelRow{tableId=, kind=+I, fields=[107, rocks, 
box of assorted rocks, 5.3]}",
+                        "SeaTunnelRow{tableId=, kind=+I, fields=[108, jacket, 
water resistent black wind breaker, 0.1]}",
+                        "SeaTunnelRow{tableId=, kind=+I, fields=[109, spare 
tire, 24 inch spare tire, 22.2]}",
+                        "SeaTunnelRow{tableId=, kind=-U, fields=[106, hammer, 
16oz carpenter's hammer, 1.0]}",
+                        "SeaTunnelRow{tableId=, kind=+U, fields=[106, hammer, 
18oz carpenter hammer, 1.0]}",
+                        "SeaTunnelRow{tableId=, kind=-U, fields=[107, rocks, 
box of assorted rocks, 5.3]}",
+                        "SeaTunnelRow{tableId=, kind=+U, fields=[107, rocks, 
box of assorted rocks, 5.1]}",
+                        "SeaTunnelRow{tableId=, kind=+I, fields=[110, jacket, 
water resistent white wind breaker, 0.2]}",
+                        "SeaTunnelRow{tableId=, kind=+I, fields=[111, scooter, 
Big 2-wheel scooter , 5.18]}",
+                        "SeaTunnelRow{tableId=, kind=-U, fields=[110, jacket, 
water resistent white wind breaker, 0.2]}",
+                        "SeaTunnelRow{tableId=, kind=+U, fields=[110, jacket, 
new water resistent white wind breaker, 0.5]}",
+                        "SeaTunnelRow{tableId=, kind=-U, fields=[111, scooter, 
Big 2-wheel scooter , 5.18]}",
+                        "SeaTunnelRow{tableId=, kind=+U, fields=[111, scooter, 
Big 2-wheel scooter , 5.17]}",
+                        "SeaTunnelRow{tableId=, kind=-D, fields=[111, scooter, 
Big 2-wheel scooter , 5.17]}",
+                        "SeaTunnelRow{tableId=, kind=-U, fields=[101, scooter, 
Small 2-wheel scooter, 3.14]}",
+                        "SeaTunnelRow{tableId=, kind=+U, fields=[101, scooter, 
Small 2-wheel scooter, 5.17]}",
+                        "SeaTunnelRow{tableId=, kind=-U, fields=[102, car 
battery, 12V car battery, 8.1]}",
+                        "SeaTunnelRow{tableId=, kind=+U, fields=[102, car 
battery, 12V car battery, 5.17]}",
+                        "SeaTunnelRow{tableId=, kind=-D, fields=[102, car 
battery, 12V car battery, 5.17]}",
+                        "SeaTunnelRow{tableId=, kind=-D, fields=[103, 12-pack 
drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]}");
+        List<String> actual =
+                
collector.list.stream().map(Object::toString).collect(Collectors.toList());
+        assertEquals(expected, actual);
+
+        // test Serialization
+        MaxWellJsonSerializationSchema serializationSchema =
+                new 
MaxWellJsonSerializationSchema(catalogTables.getSeaTunnelRowType());
+        List<String> result = new ArrayList<>();
+        for (SeaTunnelRow rowData : collector.list) {
+            result.add(new String(serializationSchema.serialize(rowData), 
StandardCharsets.UTF_8));
+        }
+
+        List<String> expectedResult =
+                Arrays.asList(
+                        
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":3.14},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":102,\"name\":\"car 
battery\",\"description\":\"12V car 
battery\",\"weight\":8.1},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":103,\"name\":\"12-pack drill 
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to 
#3\",\"weight\":0.8},\"type\":\"INSERT\"}",
+                        
"{\"data\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's 
hammer\",\"weight\":0.75},\"type\":\"INSERT\"}",
+                        
"{\"data\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's 
hammer\",\"weight\":0.875},\"type\":\"INSERT\"}",
+                        
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's 
hammer\",\"weight\":1.0},\"type\":\"INSERT\"}",
+                        
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":5.3},\"type\":\"INSERT\"}",
+                        
"{\"data\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent 
black wind breaker\",\"weight\":0.1},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":109,\"name\":\"spare 
tire\",\"description\":\"24 inch spare 
tire\",\"weight\":22.2},\"type\":\"INSERT\"}",
+                        
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's 
hammer\",\"weight\":1.0},\"type\":\"DELETE\"}",
+                        
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz carpenter 
hammer\",\"weight\":1.0},\"type\":\"INSERT\"}",
+                        
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":5.3},\"type\":\"DELETE\"}",
+                        
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":5.1},\"type\":\"INSERT\"}",
+                        
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent 
white wind breaker\",\"weight\":0.2},\"type\":\"INSERT\"}",
+                        
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter \",\"weight\":5.18},\"type\":\"INSERT\"}",
+                        
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent 
white wind breaker\",\"weight\":0.2},\"type\":\"DELETE\"}",
+                        
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"new water 
resistent white wind breaker\",\"weight\":0.5},\"type\":\"INSERT\"}",
+                        
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter \",\"weight\":5.18},\"type\":\"DELETE\"}",
+                        
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter \",\"weight\":5.17},\"type\":\"INSERT\"}",
+                        
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter \",\"weight\":5.17},\"type\":\"DELETE\"}",
+                        
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":3.14},\"type\":\"DELETE\"}",
+                        
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":5.17},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":102,\"name\":\"car 
battery\",\"description\":\"12V car 
battery\",\"weight\":8.1},\"type\":\"DELETE\"}",
+                        "{\"data\":{\"id\":102,\"name\":\"car 
battery\",\"description\":\"12V car 
battery\",\"weight\":5.17},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":102,\"name\":\"car 
battery\",\"description\":\"12V car 
battery\",\"weight\":5.17},\"type\":\"DELETE\"}",
+                        "{\"data\":{\"id\":103,\"name\":\"12-pack drill 
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to 
#3\",\"weight\":0.8},\"type\":\"DELETE\"}");
+        assertEquals(expectedResult, result);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Utilities
+    // 
--------------------------------------------------------------------------------------------
+
+    private MaxWellJsonDeserializationSchema 
createMaxWellJsonDeserializationSchema(
+            String database, String table) {
+        return MaxWellJsonDeserializationSchema.builder(catalogTables)
+                .setDatabase(database)
+                .setTable(table)
+                .setIgnoreParseErrors(false)
+                .build();
+    }
+
+    private static List<String> readLines(String resource) throws IOException {
+        final URL url = 
MaxWellJsonSerDeSchemaTest.class.getClassLoader().getResource(resource);
+        assert url != null;
+        Path path = new File(url.getFile()).toPath();
+        return Files.readAllLines(path);
+    }
+
+    private static class SimpleCollector implements Collector<SeaTunnelRow> {
+
+        private List<SeaTunnelRow> list = new ArrayList<>();
+
+        @Override
+        public void collect(SeaTunnelRow record) {
+            list.add(record);
+        }
+
+        @Override
+        public Object getCheckpointLock() {
+            return null;
+        }
+    }
+}
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/test/resources/maxwell-data-filter-table.txt
 
b/seatunnel-formats/seatunnel-format-json/src/test/resources/maxwell-data-filter-table.txt
new file mode 100644
index 0000000000..7494391f89
--- /dev/null
+++ 
b/seatunnel-formats/seatunnel-format-json/src/test/resources/maxwell-data-filter-table.txt
@@ -0,0 +1,20 @@
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":0,"data":{"id":101,"name":"scooter","description":"Small
 2-wheel scooter","weight":3.14},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":1,"data":{"id":102,"name":"car
 battery","description":"12V car battery","weight":8.1},"primary_key_columns": 
["id"]}
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":2,"data":{"id":103,"name":"12-pack
 drill bits","description":"12-pack of drill bits with sizes ranging from #40 
to #3","weight":0.8},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":3,"data":{"id":104,"name":"hammer","description":"12oz
 carpenter's hammer","weight":0.75},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":4,"data":{"id":105,"name":"hammer","description":"14oz
 carpenter's hammer","weight":0.875},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":5,"data":{"id":106,"name":"hammer","description":"16oz
 carpenter's hammer","weight":1.0},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":6,"data":{"id":107,"name":"rocks","description":"box
 of assorted rocks","weight":5.3},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":7,"data":{"id":108,"name":"jacket","description":"water
 resistent black wind breaker","weight":0.1},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"commit":true,"data":{"id":109,"name":"spare
 tire","description":"24 inch spare tire","weight":22.2},"primary_key_columns": 
["id"]}
+{"database":"test","table":"product","type":"update","ts":1596684893,"xid":7152,"commit":true,"data":{"id":106,"name":"hammer","description":"18oz
 carpenter hammer","weight":1.0},"old":{"description":"16oz carpenter's 
hammer"},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"update","ts":1596684897,"xid":7169,"commit":true,"data":{"id":107,"name":"rocks","description":"box
 of assorted rocks","weight":5.1},"old":{"weight":5.3},"primary_key_columns": 
["id"]}
+{"database":"test","table":"product","type":"insert","ts":1596684900,"xid":7186,"commit":true,"data":{"id":110,"name":"jacket","description":"water
 resistent white wind breaker","weight":0.2},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"insert","ts":1596684904,"xid":7201,"commit":true,"data":{"id":111,"name":"scooter","description":"Big
 2-wheel scooter ","weight":5.18},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"update","ts":1596684906,"xid":7216,"commit":true,"data":{"id":110,"name":"jacket","description":"new
 water resistent white wind breaker","weight":0.5},"old":{"description":"water 
resistent white wind breaker","weight":0.2},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"update","ts":1596684912,"xid":7235,"commit":true,"data":{"id":111,"name":"scooter","description":"Big
 2-wheel scooter ","weight":5.17},"old":{"weight":5.18},"primary_key_columns": 
["id"]}
+{"database":"test","table":"product","type":"delete","ts":1596684914,"xid":7250,"commit":true,"data":{"id":111,"name":"scooter","description":"Big
 2-wheel scooter ","weight":5.17},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"update","ts":1596684928,"xid":7291,"xoffset":0,"data":{"id":101,"name":"scooter","description":"Small
 2-wheel scooter","weight":5.17},"old":{"weight":3.14},"primary_key_columns": 
["id"]}
+{"database":"test","table":"product","type":"update","ts":1596684928,"xid":7291,"commit":true,"data":{"id":102,"name":"car
 battery","description":"12V car 
battery","weight":5.17},"old":{"weight":8.1},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"delete","ts":1596684938,"xid":7322,"xoffset":0,"data":{"id":102,"name":"car
 battery","description":"12V car battery","weight":5.17},"primary_key_columns": 
["id"]}
+{"database":"test","table":"product","type":"delete","ts":1596684938,"xid":7322,"commit":true,"data":{"id":103,"name":"12-pack
 drill bits","description":"12-pack of drill bits with sizes ranging from #40 
to #3","weight":0.8},"primary_key_columns": ["id"]}
\ No newline at end of file

Reply via email to