This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 4281b867ac [Feature][connector][kafka] Support read Maxwell format
message from kafka #4415 (#4428)
4281b867ac is described below
commit 4281b867ac04fbc74e6f8ce14d6c4254884db532
Author: ZhiLin Li <[email protected]>
AuthorDate: Wed Jul 3 17:29:25 2024 +0800
[Feature][connector][kafka] Support read Maxwell format message from kafka
#4415 (#4428)
---
docs/en/connector-v2/formats/canal-json.md | 2 +-
docs/en/connector-v2/formats/maxwell-json.md | 91 ++++++++
docs/en/connector-v2/formats/ogg-json.md | 4 +-
.../seatunnel/kafka/config/MessageFormat.java | 3 +-
.../serialize/DefaultSeaTunnelRowSerializer.java | 3 +
.../seatunnel/kafka/source/KafkaSourceConfig.java | 6 +
.../e2e/connector/kafka/KafkaFormatIT.java | 83 ++++++++
.../src/test/resources/maxwell/maxwell_data.txt | 12 ++
.../kafkasource_maxwell_cdc_to_pgsql.conf | 64 ++++++
.../kafkasource_maxwell_to_kafka.conf | 58 ++++++
.../maxwell/MaxWellJsonDeserializationSchema.java | 228 +++++++++++++++++++++
.../json/maxwell/MaxWellJsonFormatOptions.java | 60 ++++++
.../maxwell/MaxWellJsonSerializationSchema.java | 86 ++++++++
.../json/maxwell/MaxWellJsonSerDeSchemaTest.java | 187 +++++++++++++++++
.../test/resources/maxwell-data-filter-table.txt | 20 ++
15 files changed, 903 insertions(+), 4 deletions(-)
diff --git a/docs/en/connector-v2/formats/canal-json.md
b/docs/en/connector-v2/formats/canal-json.md
index 02e185bee5..6e133a9a82 100644
--- a/docs/en/connector-v2/formats/canal-json.md
+++ b/docs/en/connector-v2/formats/canal-json.md
@@ -24,7 +24,7 @@ SeaTunnel also supports to encode the INSERT/UPDATE/DELETE
messages in SeaTunnel
# How to use
-## Kafka uses example
+## Kafka Uses Example
Canal provides a unified format for changelog, here is a simple example for an
update operation captured from a MySQL products table:
diff --git a/docs/en/connector-v2/formats/maxwell-json.md
b/docs/en/connector-v2/formats/maxwell-json.md
new file mode 100644
index 0000000000..5e1c851d9e
--- /dev/null
+++ b/docs/en/connector-v2/formats/maxwell-json.md
@@ -0,0 +1,91 @@
+# MaxWell Format
+
+[Maxwell](https://maxwells-daemon.io/) is a CDC (Changelog Data Capture) tool
that can stream changes in real-time from MySQL into Kafka, Kinesis and other
streaming connectors. Maxwell provides a unified format schema for changelog
and supports to serialize messages using JSON.
+
+Seatunnel supports to interpret MaxWell JSON messages as INSERT/UPDATE/DELETE
messages into seatunnel system. This is useful in many cases to leverage this
feature, such as
+
+ synchronizing incremental data from databases to other systems
+ auditing logs
+ real-time materialized views on databases
+ temporal join changing history of a database table and so on.
+
+Seatunnel also supports to encode the INSERT/UPDATE/DELETE messages in
Seatunnel as MaxWell JSON messages, and emit to storage like Kafka. However,
currently Seatunnel can’t combine UPDATE_BEFORE and UPDATE_AFTER into a single
UPDATE message. Therefore, Seatunnel encodes UPDATE_BEFORE and UPDATE_AFTER as
DELETE and INSERT MaxWell messages.
+
+# Format Options
+
+| Option | Default | Required |
Description
|
+|----------------------------------|---------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| format | (none) | yes | Specify what format
to use, here should be 'maxwell_json'.
|
+| maxwell_json.ignore-parse-errors | false | no | Skip fields and rows
with parse errors instead of failing. Fields are set to null in case of errors.
|
+| maxwell_json.database.include | (none) | no | An optional regular
expression to only read the specific databases changelog rows by regular
matching the "database" meta field in the MaxWell record. The pattern string is
compatible with Java's Pattern. |
+| maxwell_json.table.include | (none) | no | An optional regular
expression to only read the specific tables changelog rows by regular matching
the "table" meta field in the MaxWell record. The pattern string is compatible
with Java's Pattern. |
+
+# How To Use MaxWell format
+
+## Kafka Uses Example
+
+MaxWell provides a unified format for changelog, here is a simple example for
an update operation captured from a MySQL products table:
+
+```bash
+{
+ "database":"test",
+ "table":"product",
+ "type":"insert",
+ "ts":1596684904,
+ "xid":7201,
+ "commit":true,
+ "data":{
+ "id":111,
+ "name":"scooter",
+ "description":"Big 2-wheel scooter ",
+ "weight":5.18
+ },
+ "primary_key_columns":[
+ "id"
+ ]
+}
+```
+
+Note: please refer to MaxWell documentation about the meaning of each fields.
+
+The MySQL products table has 4 columns (id, name, description and weight).
+The above JSON message is an update change event on the products table where
the weight value of the row with id = 111 is changed from 5.18 to 5.15.
+Assuming the messages have been synchronized to Kafka topic products_binlog,
then we can use the following Seatunnel to consume this topic and interpret the
change events.
+
+```bash
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "products_binlog"
+ result_table_name = "kafka_name"
+ start_mode = earliest
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ description = "string"
+ weight = "string"
+ }
+ },
+ format = maxwell_json
+ }
+
+}
+
+transform {
+}
+
+sink {
+ Kafka {
+ bootstrap.servers = "localhost:9092"
+ topic = "consume-binlog"
+ format = maxwell_json
+ }
+}
+```
+
diff --git a/docs/en/connector-v2/formats/ogg-json.md
b/docs/en/connector-v2/formats/ogg-json.md
index 4c363c779a..3faeb33c4f 100644
--- a/docs/en/connector-v2/formats/ogg-json.md
+++ b/docs/en/connector-v2/formats/ogg-json.md
@@ -20,9 +20,9 @@ Seatunnel also supports to encode the INSERT/UPDATE/DELETE
messages in Seatunnel
| ogg_json.database.include | (none) | no | An optional regular
expression to only read the specific databases changelog rows by regular
matching the "database" meta field in the Canal record. The pattern string is
compatible with Java's Pattern. |
| ogg_json.table.include | (none) | no | An optional regular
expression to only read the specific tables changelog rows by regular matching
the "table" meta field in the Canal record. The pattern string is compatible
with Java's Pattern. |
-# How to use Ogg format
+# How to Use Ogg format
-## Kafka uses example
+## Kafka Uses Example
Ogg provides a unified format for changelog, here is a simple example for an
update operation captured from a Oracle products table:
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
index 18e466e41c..f02cebcbe3 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
@@ -25,5 +25,6 @@ public enum MessageFormat {
COMPATIBLE_DEBEZIUM_JSON,
COMPATIBLE_KAFKA_CONNECT_JSON,
OGG_JSON,
- AVRO
+ AVRO,
+ MAXWELL_JSON
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
index 51d491002c..d4a77e74b9 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
@@ -31,6 +31,7 @@ import
org.apache.seatunnel.format.json.JsonSerializationSchema;
import org.apache.seatunnel.format.json.canal.CanalJsonSerializationSchema;
import
org.apache.seatunnel.format.json.debezium.DebeziumJsonSerializationSchema;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+import org.apache.seatunnel.format.json.maxwell.MaxWellJsonSerializationSchema;
import org.apache.seatunnel.format.json.ogg.OggJsonSerializationSchema;
import org.apache.seatunnel.format.text.TextSerializationSchema;
@@ -226,6 +227,8 @@ public class DefaultSeaTunnelRowSerializer implements
SeaTunnelRowSerializer {
return new OggJsonSerializationSchema(rowType);
case DEBEZIUM_JSON:
return new DebeziumJsonSerializationSchema(rowType);
+ case MAXWELL_JSON:
+ return new MaxWellJsonSerializationSchema(rowType);
case COMPATIBLE_DEBEZIUM_JSON:
return new CompatibleDebeziumJsonSerializationSchema(rowType,
isKey);
case AVRO:
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
index 232ee2f0e0..1c782ca6ab 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
@@ -42,6 +42,7 @@ import
org.apache.seatunnel.format.json.JsonDeserializationSchema;
import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
import
org.apache.seatunnel.format.json.debezium.DebeziumJsonDeserializationSchema;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+import
org.apache.seatunnel.format.json.maxwell.MaxWellJsonDeserializationSchema;
import org.apache.seatunnel.format.json.ogg.OggJsonDeserializationSchema;
import org.apache.seatunnel.format.text.TextDeserializationSchema;
import org.apache.seatunnel.format.text.constant.TextFormatConstant;
@@ -250,6 +251,11 @@ public class KafkaSourceConfig implements Serializable {
return OggJsonDeserializationSchema.builder(catalogTable)
.setIgnoreParseErrors(true)
.build();
+ case MAXWELL_JSON:
+ return MaxWellJsonDeserializationSchema.builder(catalogTable)
+ .setIgnoreParseErrors(true)
+ .build();
+
case COMPATIBLE_KAFKA_CONNECT_JSON:
Boolean keySchemaEnable =
readonlyConfig.get(
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java
index 25fa9b4aab..ec7b0173ff 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java
@@ -95,6 +95,12 @@ import static org.awaitility.Awaitility.given;
public class KafkaFormatIT extends TestSuiteBase implements TestResource {
private static final Logger LOG =
LoggerFactory.getLogger(KafkaFormatIT.class);
+
+ // ---------------------------MaxWell Format
Parameter---------------------------------------
+ private static final String MAXWELL_DATA_PATH =
"/maxwell/maxwell_data.txt";
+ private static final String MAXWELL_KAFKA_SOURCE_TOPIC =
"maxwell-test-cdc_mds";
+ private static final String MAXWELL_KAFKA_SINK_TOPIC = "test-maxwell-sink";
+
// ---------------------------Ogg Format
Parameter---------------------------------------
private static final String OGG_DATA_PATH = "/ogg/ogg_data.txt";
private static final String OGG_KAFKA_SOURCE_TOPIC = "test-ogg-source";
@@ -239,6 +245,7 @@ public class KafkaFormatIT extends TestSuiteBase implements
TestResource {
{
put(CANAL_DATA_PATH, CANAL_KAFKA_SOURCE_TOPIC);
put(OGG_DATA_PATH, OGG_KAFKA_SOURCE_TOPIC);
+ put(MAXWELL_DATA_PATH, MAXWELL_KAFKA_SOURCE_TOPIC);
put(COMPATIBLE_DATA_PATH,
COMPATIBLE_KAFKA_SOURCE_TOPIC);
put(DEBEZIUM_DATA_PATH, DEBEZIUM_KAFKA_SOURCE_TOPIC);
}
@@ -418,6 +425,25 @@ public class KafkaFormatIT extends TestSuiteBase
implements TestResource {
checkCompatibleFormat();
}
+ @TestTemplate
+ public void testFormatMaxWellCheck(TestContainer container)
+ throws IOException, InterruptedException {
+
+ LOG.info("====================== Check MaxWell======================");
+ // check MaxWell to Postgresql
+ Container.ExecResult checkMaxWellResultToKafka =
+
container.executeJob("/maxwellFormatIT/kafkasource_maxwell_to_kafka.conf");
+ Assertions.assertEquals(
+ 0, checkMaxWellResultToKafka.getExitCode(),
checkMaxWellResultToKafka.getStderr());
+
+ Container.ExecResult checkDataResult =
+
container.executeJob("/maxwellFormatIT/kafkasource_maxwell_cdc_to_pgsql.conf");
+ Assertions.assertEquals(0, checkDataResult.getExitCode(),
checkDataResult.getStderr());
+
+ // Check MaxWell
+ checkMaxWellFormat();
+ }
+
private void checkFormatCanalAndOgg() {
List<List<Object>> postgreSinkTableList =
getPostgreSinkTableList(PG_SINK_TABLE1);
List<List<Object>> checkArraysResult =
@@ -533,6 +559,63 @@ public class KafkaFormatIT extends TestSuiteBase
implements TestResource {
Assertions.assertIterableEquals(expected, postgreSinkTableList);
}
+ private void checkMaxWellFormat() {
+ List<String> expectedResult =
+ Arrays.asList(
+
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":\"3.14\"},\"type\":\"INSERT\"}",
+ "{\"data\":{\"id\":102,\"name\":\"car
battery\",\"description\":\"12V car
battery\",\"weight\":\"8.1\"},\"type\":\"INSERT\"}",
+ "{\"data\":{\"id\":103,\"name\":\"12-pack drill
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to
#3\",\"weight\":\"0.8\"},\"type\":\"INSERT\"}",
+
"{\"data\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's
hammer\",\"weight\":\"0.75\"},\"type\":\"INSERT\"}",
+
"{\"data\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's
hammer\",\"weight\":\"0.875\"},\"type\":\"INSERT\"}",
+
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's
hammer\",\"weight\":\"1.0\"},\"type\":\"INSERT\"}",
+
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":\"5.3\"},\"type\":\"INSERT\"}",
+
"{\"data\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent
black wind breaker\",\"weight\":\"0.1\"},\"type\":\"INSERT\"}",
+ "{\"data\":{\"id\":109,\"name\":\"spare
tire\",\"description\":\"24 inch spare
tire\",\"weight\":\"22.2\"},\"type\":\"INSERT\"}",
+
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":\"3.14\"},\"type\":\"DELETE\"}",
+
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":\"4.56\"},\"type\":\"INSERT\"}",
+
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":\"5.3\"},\"type\":\"DELETE\"}",
+
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":\"7.88\"},\"type\":\"INSERT\"}",
+ "{\"data\":{\"id\":109,\"name\":\"spare
tire\",\"description\":\"24 inch spare
tire\",\"weight\":\"22.2\"},\"type\":\"DELETE\"}");
+
+ ArrayList<String> result = new ArrayList<>();
+ ArrayList<String> topics = new ArrayList<>();
+ topics.add(MAXWELL_KAFKA_SINK_TOPIC);
+ kafkaConsumer.subscribe(topics);
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ ConsumerRecords<String, String> consumerRecords =
+
kafkaConsumer.poll(Duration.ofMillis(1000));
+ for (ConsumerRecord<String, String> record :
consumerRecords) {
+ result.add(record.value());
+ }
+ Assertions.assertEquals(expectedResult, result);
+ });
+
+ LOG.info(
+ "==================== start kafka MaxWell format to pg check
====================");
+
+ List<List<Object>> postgreSinkTableList =
getPostgreSinkTableList(PG_SINK_TABLE1);
+
+ List<List<Object>> expected =
+ Stream.<List<Object>>of(
+ Arrays.asList(101, "scooter", "Small 2-wheel
scooter", "4.56"),
+ Arrays.asList(102, "car battery", "12V car
battery", "8.1"),
+ Arrays.asList(
+ 103,
+ "12-pack drill bits",
+ "12-pack of drill bits with sizes
ranging from #40 to #3",
+ "0.8"),
+ Arrays.asList(104, "hammer", "12oz carpenter's
hammer", "0.75"),
+ Arrays.asList(105, "hammer", "14oz carpenter's
hammer", "0.875"),
+ Arrays.asList(106, "hammer", "16oz carpenter's
hammer", "1.0"),
+ Arrays.asList(107, "rocks", "box of assorted
rocks", "7.88"),
+ Arrays.asList(
+ 108, "jacket", "water resistent black
wind breaker", "0.1"))
+ .collect(Collectors.toList());
+ Assertions.assertIterableEquals(expected, postgreSinkTableList);
+ }
+
private void checkOggFormat() {
List<String> kafkaExpectedResult =
Arrays.asList(
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/maxwell/maxwell_data.txt
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/maxwell/maxwell_data.txt
new file mode 100644
index 0000000000..615c5b4458
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/maxwell/maxwell_data.txt
@@ -0,0 +1,12 @@
+{"database":"maxwell_eal7e6","table":"products","type":"insert","ts":1699253290,"xid":246,"xoffset":0,"data":{"id":101,"name":"scooter","description":"Small
2-wheel scooter","weight":"3.14"}}
+{"database":"maxwell_eal7e6","table":"products","type":"insert","ts":1699253290,"xid":246,"xoffset":1,"data":{"id":102,"name":"car
battery","description":"12V car battery","weight":"8.1"}}
+{"database":"maxwell_eal7e6","table":"products","type":"insert","ts":1699253290,"xid":246,"xoffset":2,"data":{"id":103,"name":"12-pack
drill bits","description":"12-pack of drill bits with sizes ranging from #40
to #3","weight":"0.8"}}
+{"database":"maxwell_eal7e6","table":"products","type":"insert","ts":1699253290,"xid":246,"xoffset":3,"data":{"id":104,"name":"hammer","description":"12oz
carpenter's hammer","weight":"0.75"}}
+{"database":"maxwell_eal7e6","table":"products","type":"insert","ts":1699253290,"xid":246,"xoffset":4,"data":{"id":105,"name":"hammer","description":"14oz
carpenter's hammer","weight":"0.875"}}
+{"database":"maxwell_eal7e6","table":"products","type":"insert","ts":1699253290,"xid":246,"xoffset":5,"data":{"id":106,"name":"hammer","description":"16oz
carpenter's hammer","weight":"1.0"}}
+{"database":"maxwell_eal7e6","table":"products","type":"insert","ts":1699253290,"xid":246,"xoffset":6,"data":{"id":107,"name":"rocks","description":"box
of assorted rocks","weight":"5.3"}}
+{"database":"maxwell_eal7e6","table":"products","type":"insert","ts":1699253290,"xid":246,"xoffset":7,"data":{"id":108,"name":"jacket","description":"water
resistent black wind breaker","weight":"0.1"}}
+{"database":"maxwell_eal7e6","table":"products","type":"insert","ts":1699253290,"xid":246,"commit":true,"data":{"id":109,"name":"spare
tire","description":"24 inch spare tire","weight":"22.2"}}
+{"database":"maxwell_eal7e6","table":"products","type":"update","ts":1699253290,"xid":248,"commit":true,"data":{"id":101,"name":"scooter","description":"Small
2-wheel scooter","weight":"4.56"},"old":{"weight":"3.14"}}
+{"database":"maxwell_eal7e6","table":"products","type":"update","ts":1699253290,"xid":250,"commit":true,"data":{"id":107,"name":"rocks","description":"box
of assorted rocks","weight":"7.88"},"old":{"weight":"5.3"}}
+{"database":"maxwell_eal7e6","table":"products","type":"delete","ts":1699253290,"xid":252,"commit":true,"data":{"id":109,"name":"spare
tire","description":"24 inch spare tire","weight":"22.2"}}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/maxwellFormatIT/kafkasource_maxwell_cdc_to_pgsql.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/maxwellFormatIT/kafkasource_maxwell_cdc_to_pgsql.conf
new file mode 100644
index 0000000000..ada7122730
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/maxwellFormatIT/kafkasource_maxwell_cdc_to_pgsql.conf
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ Kafka {
+ bootstrap.servers = "kafka_e2e:9092"
+ topic = "maxwell-test-cdc_mds"
+ consumer.group = "maxwell_format_to_pg"
+ start_mode = earliest
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ description = "string"
+ weight = "string"
+ }
+ },
+ format = maxwell_json
+ }
+}
+
+
+sink {
+ Jdbc {
+
+ driver = org.postgresql.Driver
+ url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
+ user = test
+ password = test
+ generate_sink_sql = true
+ database = test
+ table = public.sink
+ primary_keys = ["id"]
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/maxwellFormatIT/kafkasource_maxwell_to_kafka.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/maxwellFormatIT/kafkasource_maxwell_to_kafka.conf
new file mode 100644
index 0000000000..b7d25eb9a0
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/maxwellFormatIT/kafkasource_maxwell_to_kafka.conf
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ Kafka {
+ bootstrap.servers = "kafka_e2e:9092"
+ topic = "maxwell-test-cdc_mds"
+ consumer.group = "maxwell_format_to_kafka"
+ start_mode = earliest
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ description = "string"
+ weight = "string"
+ }
+ },
+ format = maxwell_json
+ }
+}
+
+sink {
+ Kafka {
+ bootstrap.servers = "kafka_e2e:9092"
+ topic = "test-maxwell-sink"
+ format = maxwell_json
+ partition = 0
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonDeserializationSchema.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonDeserializationSchema.java
new file mode 100644
index 0000000000..a87784353e
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonDeserializationSchema.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.maxwell;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+import static java.lang.String.format;
+
+public class MaxWellJsonDeserializationSchema implements
DeserializationSchema<SeaTunnelRow> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String FIELD_OLD = "old";
+
+ private static final String FIELD_DATA = "data";
+
+ private static final String FIELD_TYPE = "type";
+
+ private static final String OP_INSERT = "insert";
+
+ private static final String OP_UPDATE = "update";
+
+ private static final String OP_DELETE = "delete";
+
+ private static final String FIELD_DATABASE = "database";
+
+ private static final String FIELD_TABLE = "table";
+
+ private final String database;
+
+ private final String table;
+
+ /** Names of fields. */
+ private final String[] fieldNames;
+
+ /** Number of fields. */
+ private final int fieldCount;
+
+ private final boolean ignoreParseErrors;
+
+ /** Pattern of the specific database. */
+ private final Pattern databasePattern;
+
+ /** Pattern of the specific table. */
+ private final Pattern tablePattern;
+
+ private final JsonDeserializationSchema jsonDeserializer;
+
+ private final CatalogTable catalogTable;
+ private final SeaTunnelRowType seaTunnelRowType;
+
+ public MaxWellJsonDeserializationSchema(
+ CatalogTable catalogTable, String database, String table, boolean
ignoreParseErrors) {
+ this.catalogTable = catalogTable;
+ this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
+ this.jsonDeserializer =
+ new JsonDeserializationSchema(false, ignoreParseErrors,
seaTunnelRowType);
+ this.database = database;
+ this.table = table;
+ this.fieldNames = seaTunnelRowType.getFieldNames();
+ this.fieldCount = seaTunnelRowType.getTotalFields();
+ this.ignoreParseErrors = ignoreParseErrors;
+ this.databasePattern = database == null ? null :
Pattern.compile(database);
+ this.tablePattern = table == null ? null : Pattern.compile(table);
+ }
+
+ @Override
+ public SeaTunnelRow deserialize(byte[] message) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+ return this.seaTunnelRowType;
+ }
+
+ @Override
+ public void deserialize(byte[] message, Collector<SeaTunnelRow> out) {
+ if (message == null) {
+ return;
+ }
+ ObjectNode jsonNode = (ObjectNode) convertBytes(message);
+ if (database != null
+ &&
!databasePattern.matcher(jsonNode.get(FIELD_DATABASE).asText()).matches()) {
+ return;
+ }
+ if (table != null &&
!tablePattern.matcher(jsonNode.get(FIELD_TABLE).asText()).matches()) {
+ return;
+ }
+ JsonNode dataNode = jsonNode.get(FIELD_DATA);
+ String type = jsonNode.get(FIELD_TYPE).asText();
+ if (OP_INSERT.equals(type)) {
+ SeaTunnelRow rowInsert = convertJsonNode(dataNode);
+ rowInsert.setRowKind(RowKind.INSERT);
+ out.collect(rowInsert);
+ } else if (OP_UPDATE.equals(type)) {
+ SeaTunnelRow rowAfter = convertJsonNode(dataNode);
+ JsonNode oldNode = jsonNode.get(FIELD_OLD);
+ SeaTunnelRow rowBefore = convertJsonNode(oldNode);
+ for (int f = 0; f < fieldCount; f++) {
+ assert rowBefore != null;
+ if (rowBefore.isNullAt(f) && oldNode.findValue(fieldNames[f])
== null) {
+ // fields in "old" (before) means the fields are changed
+ // fields not in "old" (before) means the fields are not
changed
+ // so we just copy the not changed fields into before
+ assert rowAfter != null;
+ rowBefore.setField(f, rowAfter.getField(f));
+ }
+ }
+ assert rowBefore != null;
+ rowBefore.setRowKind(RowKind.UPDATE_BEFORE);
+ assert rowAfter != null;
+ rowAfter.setRowKind(RowKind.UPDATE_AFTER);
+ out.collect(rowBefore);
+ out.collect(rowAfter);
+ } else if (OP_DELETE.equals(type)) {
+ SeaTunnelRow rowDelete = convertJsonNode(dataNode);
+ rowDelete.setRowKind(RowKind.DELETE);
+ out.collect(rowDelete);
+ } else {
+ if (!ignoreParseErrors) {
+ throw new SeaTunnelJsonFormatException(
+ CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+ format(
+ "Unknown \"type\" value \"%s\". The MaxWell
JSON message is '%s'",
+ type, new String(message)));
+ }
+ }
+ }
+
+ private JsonNode convertBytes(byte[] message) {
+ try {
+ return jsonDeserializer.deserializeToJsonNode(message);
+ } catch (Exception t) {
+ if (ignoreParseErrors) {
+ return null;
+ }
+ throw new SeaTunnelJsonFormatException(
+ CommonErrorCode.CONVERT_TO_CONNECTOR_TYPE_ERROR_SIMPLE,
+ String.format("Failed to deserialize JSON '%s'.", new
String(message)),
+ t);
+ }
+ }
+
+ private SeaTunnelRow convertJsonNode(JsonNode root) {
+ return jsonDeserializer.convertToRowData(root);
+ }
+
+ private static SeaTunnelRowType createJsonRowType(SeaTunnelRowType
physicalDataType) {
+ // MaxWell JSON contains other information, e.g. "ts", "sql", but we
don't need them
+ return physicalDataType;
+ }
+
+ //
------------------------------------------------------------------------------------------
+ // Builder
+ //
------------------------------------------------------------------------------------------
+
+ /** Creates A builder for building a {@link
MaxWellJsonDeserializationSchema}. */
+ public static Builder builder(CatalogTable catalogTable) {
+ return new Builder(catalogTable);
+ }
+
+ public static class Builder {
+
+ private boolean ignoreParseErrors = false;
+
+ private String database = null;
+
+ private String table = null;
+
+ private final CatalogTable catalogTable;
+
+ public Builder(CatalogTable catalogTable) {
+ this.catalogTable = catalogTable;
+ }
+
+ public Builder setDatabase(String database) {
+ this.database = database;
+ return this;
+ }
+
+ public Builder setTable(String table) {
+ this.table = table;
+ return this;
+ }
+
+ public Builder setIgnoreParseErrors(boolean ignoreParseErrors) {
+ this.ignoreParseErrors = ignoreParseErrors;
+ return this;
+ }
+
+ public MaxWellJsonDeserializationSchema build() {
+ return new MaxWellJsonDeserializationSchema(
+ catalogTable, database, table, ignoreParseErrors);
+ }
+ }
+}
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonFormatOptions.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonFormatOptions.java
new file mode 100644
index 0000000000..0b6a5cd287
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonFormatOptions.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.maxwell;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.format.json.JsonFormatOptions;
+
+import java.util.Map;
+
+/** Option utils for MaxWell_json format. */
+public class MaxWellJsonFormatOptions {
+
+ public static final Option<Boolean> IGNORE_PARSE_ERRORS =
JsonFormatOptions.IGNORE_PARSE_ERRORS;
+
+ public static final Option<String> DATABASE_INCLUDE =
+ Options.key("database.include")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "An optional regular expression to only read the
specific databases changelog rows by regular matching the \"database\" meta
field in the MaxWell record."
+ + "The pattern string is compatible with
Java's Pattern.");
+
+ public static final Option<String> TABLE_INCLUDE =
+ Options.key("table.include")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "An optional regular expression to only read the
specific tables changelog rows by regular matching the \"table\" meta field in
the MaxWell record."
+ + "The pattern string is compatible with
Java's Pattern.");
+
+ public static String getTableInclude(Map<String, String> options) {
+ return options.getOrDefault(TABLE_INCLUDE.key(), null);
+ }
+
+ public static String getDatabaseInclude(Map<String, String> options) {
+ return options.getOrDefault(DATABASE_INCLUDE.key(), null);
+ }
+
+ public static boolean getIgnoreParseErrors(Map<String, String> options) {
+ return Boolean.parseBoolean(
+ options.getOrDefault(IGNORE_PARSE_ERRORS.key(),
IGNORE_PARSE_ERRORS.toString()));
+ }
+}
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonSerializationSchema.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonSerializationSchema.java
new file mode 100644
index 0000000000..931d6c453b
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonSerializationSchema.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.maxwell;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+
+public class MaxWellJsonSerializationSchema implements SerializationSchema {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String OP_INSERT = "INSERT";
+ private static final String OP_DELETE = "DELETE";
+
+ public static final String FORMAT = "MAXWELL";
+
+ private transient SeaTunnelRow reuse;
+
+ private final JsonSerializationSchema jsonSerializer;
+
+ public MaxWellJsonSerializationSchema(SeaTunnelRowType rowType) {
+ this.jsonSerializer = new
JsonSerializationSchema(createJsonRowType(rowType));
+ this.reuse = new SeaTunnelRow(2);
+ }
+
+ @Override
+ public byte[] serialize(SeaTunnelRow row) {
+ try {
+ String opType = rowKind2String(row.getRowKind());
+ reuse.setField(0, row);
+ reuse.setField(1, opType);
+ return jsonSerializer.serialize(reuse);
+ } catch (Throwable t) {
+ throw CommonError.jsonOperationError(FORMAT, row.toString(), t);
+ }
+ }
+
+ private String rowKind2String(RowKind rowKind) {
+ switch (rowKind) {
+ case INSERT:
+ case UPDATE_AFTER:
+ return OP_INSERT;
+ case UPDATE_BEFORE:
+ case DELETE:
+ return OP_DELETE;
+ default:
+ throw new SeaTunnelJsonFormatException(
+ CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
+ String.format("Unsupported operation %s for row
kind.", rowKind));
+ }
+ }
+
+ private static SeaTunnelRowType createJsonRowType(SeaTunnelRowType
databaseSchema) {
+ // MaxWell JSON contains other information, e.g. "database", "ts"
+ // but we don't need them
+ // and we don't need "old" , because can not support
UPDATE_BEFORE,UPDATE_AFTER
+ return new SeaTunnelRowType(
+ new String[] {"data", "type"},
+ new SeaTunnelDataType[] {databaseSchema, STRING_TYPE});
+ }
+}
diff --git
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonSerDeSchemaTest.java
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonSerDeSchemaTest.java
new file mode 100644
index 0000000000..a4e06ac2b1
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonSerDeSchemaTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.maxwell;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.seatunnel.api.table.type.BasicType.FLOAT_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.INT_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class MaxWellJsonSerDeSchemaTest {
+
+ private static final SeaTunnelRowType SEATUNNEL_ROW_TYPE =
+ new SeaTunnelRowType(
+ new String[] {"id", "name", "description", "weight"},
+ new SeaTunnelDataType[] {INT_TYPE, STRING_TYPE,
STRING_TYPE, FLOAT_TYPE});
+ private static final CatalogTable catalogTables =
+ CatalogTableUtil.getCatalogTable("", "", "", "",
SEATUNNEL_ROW_TYPE);
+
+ @Test
+ public void testFilteringTables() throws Exception {
+ List<String> lines = readLines("maxwell-data-filter-table.txt");
+ MaxWellJsonDeserializationSchema deserializationSchema =
+ new MaxWellJsonDeserializationSchema.Builder(catalogTables)
+ .setDatabase("^test.*")
+ .setTable("^prod.*")
+ .build();
+ runTest(lines, deserializationSchema);
+ }
+
+ @Test
+ public void testDeserializeNullRow() throws Exception {
+ final MaxWellJsonDeserializationSchema deserializationSchema =
+ createMaxWellJsonDeserializationSchema(null, null);
+ final SimpleCollector collector = new SimpleCollector();
+
+ deserializationSchema.deserialize(null, collector);
+ assertEquals(0, collector.list.size());
+ }
+
+ public void runTest(List<String> lines, MaxWellJsonDeserializationSchema
deserializationSchema)
+ throws IOException {
+ SimpleCollector collector = new SimpleCollector();
+ for (String line : lines) {
+
deserializationSchema.deserialize(line.getBytes(StandardCharsets.UTF_8),
collector);
+ }
+ List<String> expected =
+ Arrays.asList(
+ "SeaTunnelRow{tableId=, kind=+I, fields=[101, scooter,
Small 2-wheel scooter, 3.14]}",
+ "SeaTunnelRow{tableId=, kind=+I, fields=[102, car
battery, 12V car battery, 8.1]}",
+ "SeaTunnelRow{tableId=, kind=+I, fields=[103, 12-pack
drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]}",
+ "SeaTunnelRow{tableId=, kind=+I, fields=[104, hammer,
12oz carpenter's hammer, 0.75]}",
+ "SeaTunnelRow{tableId=, kind=+I, fields=[105, hammer,
14oz carpenter's hammer, 0.875]}",
+ "SeaTunnelRow{tableId=, kind=+I, fields=[106, hammer,
16oz carpenter's hammer, 1.0]}",
+ "SeaTunnelRow{tableId=, kind=+I, fields=[107, rocks,
box of assorted rocks, 5.3]}",
+ "SeaTunnelRow{tableId=, kind=+I, fields=[108, jacket,
water resistent black wind breaker, 0.1]}",
+ "SeaTunnelRow{tableId=, kind=+I, fields=[109, spare
tire, 24 inch spare tire, 22.2]}",
+ "SeaTunnelRow{tableId=, kind=-U, fields=[106, hammer,
16oz carpenter's hammer, 1.0]}",
+ "SeaTunnelRow{tableId=, kind=+U, fields=[106, hammer,
18oz carpenter hammer, 1.0]}",
+ "SeaTunnelRow{tableId=, kind=-U, fields=[107, rocks,
box of assorted rocks, 5.3]}",
+ "SeaTunnelRow{tableId=, kind=+U, fields=[107, rocks,
box of assorted rocks, 5.1]}",
+ "SeaTunnelRow{tableId=, kind=+I, fields=[110, jacket,
water resistent white wind breaker, 0.2]}",
+ "SeaTunnelRow{tableId=, kind=+I, fields=[111, scooter,
Big 2-wheel scooter , 5.18]}",
+ "SeaTunnelRow{tableId=, kind=-U, fields=[110, jacket,
water resistent white wind breaker, 0.2]}",
+ "SeaTunnelRow{tableId=, kind=+U, fields=[110, jacket,
new water resistent white wind breaker, 0.5]}",
+ "SeaTunnelRow{tableId=, kind=-U, fields=[111, scooter,
Big 2-wheel scooter , 5.18]}",
+ "SeaTunnelRow{tableId=, kind=+U, fields=[111, scooter,
Big 2-wheel scooter , 5.17]}",
+ "SeaTunnelRow{tableId=, kind=-D, fields=[111, scooter,
Big 2-wheel scooter , 5.17]}",
+ "SeaTunnelRow{tableId=, kind=-U, fields=[101, scooter,
Small 2-wheel scooter, 3.14]}",
+ "SeaTunnelRow{tableId=, kind=+U, fields=[101, scooter,
Small 2-wheel scooter, 5.17]}",
+ "SeaTunnelRow{tableId=, kind=-U, fields=[102, car
battery, 12V car battery, 8.1]}",
+ "SeaTunnelRow{tableId=, kind=+U, fields=[102, car
battery, 12V car battery, 5.17]}",
+ "SeaTunnelRow{tableId=, kind=-D, fields=[102, car
battery, 12V car battery, 5.17]}",
+ "SeaTunnelRow{tableId=, kind=-D, fields=[103, 12-pack
drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]}");
+ List<String> actual =
+
collector.list.stream().map(Object::toString).collect(Collectors.toList());
+ assertEquals(expected, actual);
+
+ // test Serialization
+ MaxWellJsonSerializationSchema serializationSchema =
+ new
MaxWellJsonSerializationSchema(catalogTables.getSeaTunnelRowType());
+ List<String> result = new ArrayList<>();
+ for (SeaTunnelRow rowData : collector.list) {
+ result.add(new String(serializationSchema.serialize(rowData),
StandardCharsets.UTF_8));
+ }
+
+ List<String> expectedResult =
+ Arrays.asList(
+
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":3.14},\"type\":\"INSERT\"}",
+ "{\"data\":{\"id\":102,\"name\":\"car
battery\",\"description\":\"12V car
battery\",\"weight\":8.1},\"type\":\"INSERT\"}",
+ "{\"data\":{\"id\":103,\"name\":\"12-pack drill
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to
#3\",\"weight\":0.8},\"type\":\"INSERT\"}",
+
"{\"data\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's
hammer\",\"weight\":0.75},\"type\":\"INSERT\"}",
+
"{\"data\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's
hammer\",\"weight\":0.875},\"type\":\"INSERT\"}",
+
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's
hammer\",\"weight\":1.0},\"type\":\"INSERT\"}",
+
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":5.3},\"type\":\"INSERT\"}",
+
"{\"data\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent
black wind breaker\",\"weight\":0.1},\"type\":\"INSERT\"}",
+ "{\"data\":{\"id\":109,\"name\":\"spare
tire\",\"description\":\"24 inch spare
tire\",\"weight\":22.2},\"type\":\"INSERT\"}",
+
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's
hammer\",\"weight\":1.0},\"type\":\"DELETE\"}",
+
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz carpenter
hammer\",\"weight\":1.0},\"type\":\"INSERT\"}",
+
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":5.3},\"type\":\"DELETE\"}",
+
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":5.1},\"type\":\"INSERT\"}",
+
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent
white wind breaker\",\"weight\":0.2},\"type\":\"INSERT\"}",
+
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter \",\"weight\":5.18},\"type\":\"INSERT\"}",
+
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent
white wind breaker\",\"weight\":0.2},\"type\":\"DELETE\"}",
+
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"new water
resistent white wind breaker\",\"weight\":0.5},\"type\":\"INSERT\"}",
+
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter \",\"weight\":5.18},\"type\":\"DELETE\"}",
+
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter \",\"weight\":5.17},\"type\":\"INSERT\"}",
+
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter \",\"weight\":5.17},\"type\":\"DELETE\"}",
+
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":3.14},\"type\":\"DELETE\"}",
+
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":5.17},\"type\":\"INSERT\"}",
+ "{\"data\":{\"id\":102,\"name\":\"car
battery\",\"description\":\"12V car
battery\",\"weight\":8.1},\"type\":\"DELETE\"}",
+ "{\"data\":{\"id\":102,\"name\":\"car
battery\",\"description\":\"12V car
battery\",\"weight\":5.17},\"type\":\"INSERT\"}",
+ "{\"data\":{\"id\":102,\"name\":\"car
battery\",\"description\":\"12V car
battery\",\"weight\":5.17},\"type\":\"DELETE\"}",
+ "{\"data\":{\"id\":103,\"name\":\"12-pack drill
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to
#3\",\"weight\":0.8},\"type\":\"DELETE\"}");
+ assertEquals(expectedResult, result);
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Utilities
+ //
--------------------------------------------------------------------------------------------
+
+ private MaxWellJsonDeserializationSchema
createMaxWellJsonDeserializationSchema(
+ String database, String table) {
+ return MaxWellJsonDeserializationSchema.builder(catalogTables)
+ .setDatabase(database)
+ .setTable(table)
+ .setIgnoreParseErrors(false)
+ .build();
+ }
+
+ private static List<String> readLines(String resource) throws IOException {
+ final URL url =
MaxWellJsonSerDeSchemaTest.class.getClassLoader().getResource(resource);
+ assert url != null;
+ Path path = new File(url.getFile()).toPath();
+ return Files.readAllLines(path);
+ }
+
+ private static class SimpleCollector implements Collector<SeaTunnelRow> {
+
+ private List<SeaTunnelRow> list = new ArrayList<>();
+
+ @Override
+ public void collect(SeaTunnelRow record) {
+ list.add(record);
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return null;
+ }
+ }
+}
diff --git
a/seatunnel-formats/seatunnel-format-json/src/test/resources/maxwell-data-filter-table.txt
b/seatunnel-formats/seatunnel-format-json/src/test/resources/maxwell-data-filter-table.txt
new file mode 100644
index 0000000000..7494391f89
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-json/src/test/resources/maxwell-data-filter-table.txt
@@ -0,0 +1,20 @@
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":0,"data":{"id":101,"name":"scooter","description":"Small
2-wheel scooter","weight":3.14},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":1,"data":{"id":102,"name":"car
battery","description":"12V car battery","weight":8.1},"primary_key_columns":
["id"]}
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":2,"data":{"id":103,"name":"12-pack
drill bits","description":"12-pack of drill bits with sizes ranging from #40
to #3","weight":0.8},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":3,"data":{"id":104,"name":"hammer","description":"12oz
carpenter's hammer","weight":0.75},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":4,"data":{"id":105,"name":"hammer","description":"14oz
carpenter's hammer","weight":0.875},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":5,"data":{"id":106,"name":"hammer","description":"16oz
carpenter's hammer","weight":1.0},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":6,"data":{"id":107,"name":"rocks","description":"box
of assorted rocks","weight":5.3},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":7,"data":{"id":108,"name":"jacket","description":"water
resistent black wind breaker","weight":0.1},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"commit":true,"data":{"id":109,"name":"spare
tire","description":"24 inch spare tire","weight":22.2},"primary_key_columns":
["id"]}
+{"database":"test","table":"product","type":"update","ts":1596684893,"xid":7152,"commit":true,"data":{"id":106,"name":"hammer","description":"18oz
carpenter hammer","weight":1.0},"old":{"description":"16oz carpenter's
hammer"},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"update","ts":1596684897,"xid":7169,"commit":true,"data":{"id":107,"name":"rocks","description":"box
of assorted rocks","weight":5.1},"old":{"weight":5.3},"primary_key_columns":
["id"]}
+{"database":"test","table":"product","type":"insert","ts":1596684900,"xid":7186,"commit":true,"data":{"id":110,"name":"jacket","description":"water
resistent white wind breaker","weight":0.2},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"insert","ts":1596684904,"xid":7201,"commit":true,"data":{"id":111,"name":"scooter","description":"Big
2-wheel scooter ","weight":5.18},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"update","ts":1596684906,"xid":7216,"commit":true,"data":{"id":110,"name":"jacket","description":"new
water resistent white wind breaker","weight":0.5},"old":{"description":"water
resistent white wind breaker","weight":0.2},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"update","ts":1596684912,"xid":7235,"commit":true,"data":{"id":111,"name":"scooter","description":"Big
2-wheel scooter ","weight":5.17},"old":{"weight":5.18},"primary_key_columns":
["id"]}
+{"database":"test","table":"product","type":"delete","ts":1596684914,"xid":7250,"commit":true,"data":{"id":111,"name":"scooter","description":"Big
2-wheel scooter ","weight":5.17},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"update","ts":1596684928,"xid":7291,"xoffset":0,"data":{"id":101,"name":"scooter","description":"Small
2-wheel scooter","weight":5.17},"old":{"weight":3.14},"primary_key_columns":
["id"]}
+{"database":"test","table":"product","type":"update","ts":1596684928,"xid":7291,"commit":true,"data":{"id":102,"name":"car
battery","description":"12V car
battery","weight":5.17},"old":{"weight":8.1},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"delete","ts":1596684938,"xid":7322,"xoffset":0,"data":{"id":102,"name":"car
battery","description":"12V car battery","weight":5.17},"primary_key_columns":
["id"]}
+{"database":"test","table":"product","type":"delete","ts":1596684938,"xid":7322,"commit":true,"data":{"id":103,"name":"12-pack
drill bits","description":"12-pack of drill bits with sizes ranging from #40
to #3","weight":0.8},"primary_key_columns": ["id"]}
\ No newline at end of file