This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 300a624c5b [Improve][CDC] Disable memory buffering when `exactly_once`
is turned off (#6017)
300a624c5b is described below
commit 300a624c5b89d1e2adb5a67a82a5274f55c1f30c
Author: hailin0 <[email protected]>
AuthorDate: Mon Dec 18 18:21:16 2023 +0800
[Improve][CDC] Disable memory buffering when `exactly_once` is turned off
(#6017)
---
.../source/reader/IncrementalSourceReader.java | 3 +-
.../external/IncrementalSourceScanFetcher.java | 132 ++++++++++++---------
.../connectors/seatunnel/cdc/mysql/MysqlCDCIT.java | 38 ++++++
...ysqlcdc_to_mysql_with_disable_exactly_once.conf | 60 ++++++++++
4 files changed, 175 insertions(+), 58 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
index ceb6215f41..9f1c5029d5 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
@@ -130,7 +130,8 @@ public class IncrementalSourceReader<T, C extends
SourceConfig>
for (SourceSplitStateBase splitState : finishedSplitIds.values()) {
SourceSplitBase sourceSplit = splitState.toSourceSplit();
checkState(
- sourceSplit.isSnapshotSplit(),
+ sourceSplit.isSnapshotSplit()
+ &&
sourceSplit.asSnapshotSplit().isSnapshotReadFinished(),
String.format(
"Only snapshot split could finish, but the actual
split is incremental split %s",
sourceSplit));
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
index 97c0c523e6..127b09406b 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
@@ -112,71 +112,89 @@ public class IncrementalSourceScanFetcher implements
Fetcher<SourceRecords, Sour
checkReadException();
if (hasNextElement.get()) {
- // eg:
- // data input: [low watermark event][snapshot events][high
watermark event][change
- // events][end watermark event]
- // data output: [low watermark event][normalized events][high
watermark event]
- boolean reachChangeLogStart = false;
- boolean reachChangeLogEnd = false;
- SourceRecord lowWatermark = null;
- SourceRecord highWatermark = null;
- Map<Struct, SourceRecord> outputBuffer = new LinkedHashMap<>();
- while (!reachChangeLogEnd) {
- checkReadException();
- List<DataChangeEvent> batch = queue.poll();
- for (DataChangeEvent event : batch) {
- SourceRecord record = event.getRecord();
- if (lowWatermark == null) {
- lowWatermark = record;
- assertLowWatermark(lowWatermark);
- continue;
- }
+ if (taskContext.isExactlyOnce()) {
+ return pollSplitRecordsIfExactlyOnce();
+ }
+ return pollSplitRecordsIfNotExactlyOnce();
+ }
+ // the data has been polled, no more data
+ reachEnd.compareAndSet(false, true);
+ return null;
+ }
- if (highWatermark == null && isHighWatermarkEvent(record))
{
- highWatermark = record;
- // snapshot events capture end
- if (taskContext.isExactlyOnce()) {
- // begin to capture binlog events
- reachChangeLogStart = true;
- continue;
- } else {
- // not support exactly-once, stop the loop
- reachChangeLogEnd = true;
- break;
- }
- }
+ public Iterator<SourceRecords> pollSplitRecordsIfNotExactlyOnce() throws
InterruptedException {
+ // eg:
+ // data input: [low watermark event][snapshot events][high watermark
event]
+ List<SourceRecord> sendRecords = new ArrayList<>();
+ List<DataChangeEvent> batch = queue.poll();
+ for (DataChangeEvent event : batch) {
+ SourceRecord record = event.getRecord();
+ sendRecords.add(record);
+ if (isHighWatermarkEvent(record)) {
+ hasNextElement.set(false);
+ }
+ }
+ // snapshot split return its data once
+ final List<SourceRecords> sourceRecordsSet = new ArrayList<>();
+ sourceRecordsSet.add(new SourceRecords(sendRecords));
+ return sourceRecordsSet.iterator();
+ }
- if (reachChangeLogStart && isEndWatermarkEvent(record)) {
- // capture to end watermark events, stop the loop
- reachChangeLogEnd = true;
- break;
- }
+ public Iterator<SourceRecords> pollSplitRecordsIfExactlyOnce() throws
InterruptedException {
+ // eg:
+ // data input: [low watermark event][snapshot events][high watermark
event][change
+ // events][end watermark event]
+ // data output: [low watermark event][normalized events][high
watermark event]
+ boolean reachChangeLogStart = false;
+ boolean reachChangeLogEnd = false;
+ SourceRecord lowWatermark = null;
+ SourceRecord highWatermark = null;
+ Map<Struct, SourceRecord> outputBuffer = new LinkedHashMap<>();
+ while (!reachChangeLogEnd) {
+ checkReadException();
+ List<DataChangeEvent> batch = queue.poll();
+ for (DataChangeEvent event : batch) {
+ SourceRecord record = event.getRecord();
+ if (lowWatermark == null) {
+ lowWatermark = record;
+ assertLowWatermark(lowWatermark);
+ continue;
+ }
+
+ if (highWatermark == null && isHighWatermarkEvent(record)) {
+ highWatermark = record;
+ // begin to capture binlog events
+ reachChangeLogStart = true;
+ continue;
+ }
+
+ if (reachChangeLogStart && isEndWatermarkEvent(record)) {
+ // capture to end watermark events, stop the loop
+ reachChangeLogEnd = true;
+ break;
+ }
- if (!reachChangeLogStart) {
- outputBuffer.put((Struct) record.key(), record);
- } else {
- if (isChangeRecordInChunkRange(record)) {
- // rewrite overlapping snapshot records through
the record key
- taskContext.rewriteOutputBuffer(outputBuffer,
record);
- }
+ if (!reachChangeLogStart) {
+ outputBuffer.put((Struct) record.key(), record);
+ } else {
+ if (isChangeRecordInChunkRange(record)) {
+ // rewrite overlapping snapshot records through the
record key
+ taskContext.rewriteOutputBuffer(outputBuffer, record);
}
}
}
- // snapshot split return its data once
- hasNextElement.set(false);
+ }
+ // snapshot split return its data once
+ hasNextElement.set(false);
- final List<SourceRecord> normalizedRecords = new ArrayList<>();
- normalizedRecords.add(lowWatermark);
-
normalizedRecords.addAll(taskContext.formatMessageTimestamp(outputBuffer.values()));
- normalizedRecords.add(highWatermark);
+ final List<SourceRecord> normalizedRecords = new ArrayList<>();
+ normalizedRecords.add(lowWatermark);
+
normalizedRecords.addAll(taskContext.formatMessageTimestamp(outputBuffer.values()));
+ normalizedRecords.add(highWatermark);
- final List<SourceRecords> sourceRecordsSet = new ArrayList<>();
- sourceRecordsSet.add(new SourceRecords(normalizedRecords));
- return sourceRecordsSet.iterator();
- }
- // the data has been polled, no more data
- reachEnd.compareAndSet(false, true);
- return null;
+ final List<SourceRecords> sourceRecordsSet = new ArrayList<>();
+ sourceRecordsSet.add(new SourceRecords(normalizedRecords));
+ return sourceRecordsSet.iterator();
}
private void assertLowWatermark(SourceRecord lowWatermark) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
index 6b3519f536..647ec9c140 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
@@ -173,6 +173,44 @@ public class MysqlCDCIT extends TestSuiteBase implements
TestResource {
});
}
+ @TestTemplate
+ public void testMysqlCdcCheckDataWithDisableExactlyonce(TestContainer
container) {
+ // Clear related content to ensure that multiple operations are not
affected
+ clearTable(MYSQL_DATABASE, SINK_TABLE);
+
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+
container.executeJob("/mysqlcdc_to_mysql_with_disable_exactly_once.conf");
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ log.info(query(getSinkQuerySQL(MYSQL_DATABASE,
SINK_TABLE)).toString());
+ Assertions.assertIterableEquals(
+ query(getSourceQuerySQL(MYSQL_DATABASE,
SOURCE_TABLE_1)),
+ query(getSinkQuerySQL(MYSQL_DATABASE,
SINK_TABLE)));
+ });
+
+ // insert update delete
+ executeSql("DELETE FROM " + MYSQL_DATABASE + "." + SOURCE_TABLE_1);
+ upsertDeleteSourceTable(MYSQL_DATABASE, SOURCE_TABLE_1);
+
+ // stream stage
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertIterableEquals(
+ query(getSourceQuerySQL(MYSQL_DATABASE,
SOURCE_TABLE_1)),
+ query(getSinkQuerySQL(MYSQL_DATABASE,
SINK_TABLE)));
+ });
+ }
+
@TestTemplate
@DisabledOnContainer(
value = {},
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_disable_exactly_once.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_disable_exactly_once.conf
new file mode 100644
index 0000000000..4b91a877d4
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_disable_exactly_once.conf
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ execution.parallelism = 1
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+ read_limit.bytes_per_second=7000000
+ read_limit.rows_per_second=400
+}
+
+source {
+ MySQL-CDC {
+ result_table_name = "customers_mysql_cdc"
+ server-id = 5652
+ username = "st_user"
+ password = "seatunnel"
+ table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
+ base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
+
+ exactly_once = false
+ snapshot.split.size = 1
+ snapshot.fetch.size = 1
+ }
+}
+
+sink {
+ jdbc {
+ source_table_name = "customers_mysql_cdc"
+ url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "st_user"
+ password = "seatunnel"
+
+ generate_sink_sql = true
+ # You need to configure both database and table
+ database = mysql_cdc
+ table = mysql_cdc_e2e_sink_table
+ primary_keys = ["id"]
+ }
+}
\ No newline at end of file