This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 300a624c5b [Improve][CDC] Disable memory buffering when `exactly_once` 
is turned off (#6017)
300a624c5b is described below

commit 300a624c5b89d1e2adb5a67a82a5274f55c1f30c
Author: hailin0 <[email protected]>
AuthorDate: Mon Dec 18 18:21:16 2023 +0800

    [Improve][CDC] Disable memory buffering when `exactly_once` is turned off 
(#6017)
---
 .../source/reader/IncrementalSourceReader.java     |   3 +-
 .../external/IncrementalSourceScanFetcher.java     | 132 ++++++++++++---------
 .../connectors/seatunnel/cdc/mysql/MysqlCDCIT.java |  38 ++++++
 ...ysqlcdc_to_mysql_with_disable_exactly_once.conf |  60 ++++++++++
 4 files changed, 175 insertions(+), 58 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
index ceb6215f41..9f1c5029d5 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
@@ -130,7 +130,8 @@ public class IncrementalSourceReader<T, C extends 
SourceConfig>
         for (SourceSplitStateBase splitState : finishedSplitIds.values()) {
             SourceSplitBase sourceSplit = splitState.toSourceSplit();
             checkState(
-                    sourceSplit.isSnapshotSplit(),
+                    sourceSplit.isSnapshotSplit()
+                            && 
sourceSplit.asSnapshotSplit().isSnapshotReadFinished(),
                     String.format(
                             "Only snapshot split could finish, but the actual 
split is incremental split %s",
                             sourceSplit));
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
index 97c0c523e6..127b09406b 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
@@ -112,71 +112,89 @@ public class IncrementalSourceScanFetcher implements 
Fetcher<SourceRecords, Sour
         checkReadException();
 
         if (hasNextElement.get()) {
-            // eg:
-            // data input: [low watermark event][snapshot events][high 
watermark event][change
-            // events][end watermark event]
-            // data output: [low watermark event][normalized events][high 
watermark event]
-            boolean reachChangeLogStart = false;
-            boolean reachChangeLogEnd = false;
-            SourceRecord lowWatermark = null;
-            SourceRecord highWatermark = null;
-            Map<Struct, SourceRecord> outputBuffer = new LinkedHashMap<>();
-            while (!reachChangeLogEnd) {
-                checkReadException();
-                List<DataChangeEvent> batch = queue.poll();
-                for (DataChangeEvent event : batch) {
-                    SourceRecord record = event.getRecord();
-                    if (lowWatermark == null) {
-                        lowWatermark = record;
-                        assertLowWatermark(lowWatermark);
-                        continue;
-                    }
+            if (taskContext.isExactlyOnce()) {
+                return pollSplitRecordsIfExactlyOnce();
+            }
+            return pollSplitRecordsIfNotExactlyOnce();
+        }
+        // the data has been polled, no more data
+        reachEnd.compareAndSet(false, true);
+        return null;
+    }
 
-                    if (highWatermark == null && isHighWatermarkEvent(record)) 
{
-                        highWatermark = record;
-                        // snapshot events capture end
-                        if (taskContext.isExactlyOnce()) {
-                            // begin to capture binlog events
-                            reachChangeLogStart = true;
-                            continue;
-                        } else {
-                            // not support exactly-once, stop the loop
-                            reachChangeLogEnd = true;
-                            break;
-                        }
-                    }
+    public Iterator<SourceRecords> pollSplitRecordsIfNotExactlyOnce() throws 
InterruptedException {
+        // eg:
+        // data input: [low watermark event][snapshot events][high watermark 
event]
+        List<SourceRecord> sendRecords = new ArrayList<>();
+        List<DataChangeEvent> batch = queue.poll();
+        for (DataChangeEvent event : batch) {
+            SourceRecord record = event.getRecord();
+            sendRecords.add(record);
+            if (isHighWatermarkEvent(record)) {
+                hasNextElement.set(false);
+            }
+        }
+        // snapshot split return its data once
+        final List<SourceRecords> sourceRecordsSet = new ArrayList<>();
+        sourceRecordsSet.add(new SourceRecords(sendRecords));
+        return sourceRecordsSet.iterator();
+    }
 
-                    if (reachChangeLogStart && isEndWatermarkEvent(record)) {
-                        // capture to end watermark events, stop the loop
-                        reachChangeLogEnd = true;
-                        break;
-                    }
+    public Iterator<SourceRecords> pollSplitRecordsIfExactlyOnce() throws 
InterruptedException {
+        // eg:
+        // data input: [low watermark event][snapshot events][high watermark 
event][change
+        // events][end watermark event]
+        // data output: [low watermark event][normalized events][high 
watermark event]
+        boolean reachChangeLogStart = false;
+        boolean reachChangeLogEnd = false;
+        SourceRecord lowWatermark = null;
+        SourceRecord highWatermark = null;
+        Map<Struct, SourceRecord> outputBuffer = new LinkedHashMap<>();
+        while (!reachChangeLogEnd) {
+            checkReadException();
+            List<DataChangeEvent> batch = queue.poll();
+            for (DataChangeEvent event : batch) {
+                SourceRecord record = event.getRecord();
+                if (lowWatermark == null) {
+                    lowWatermark = record;
+                    assertLowWatermark(lowWatermark);
+                    continue;
+                }
+
+                if (highWatermark == null && isHighWatermarkEvent(record)) {
+                    highWatermark = record;
+                    // begin to capture binlog events
+                    reachChangeLogStart = true;
+                    continue;
+                }
+
+                if (reachChangeLogStart && isEndWatermarkEvent(record)) {
+                    // capture to end watermark events, stop the loop
+                    reachChangeLogEnd = true;
+                    break;
+                }
 
-                    if (!reachChangeLogStart) {
-                        outputBuffer.put((Struct) record.key(), record);
-                    } else {
-                        if (isChangeRecordInChunkRange(record)) {
-                            // rewrite overlapping snapshot records through 
the record key
-                            taskContext.rewriteOutputBuffer(outputBuffer, 
record);
-                        }
+                if (!reachChangeLogStart) {
+                    outputBuffer.put((Struct) record.key(), record);
+                } else {
+                    if (isChangeRecordInChunkRange(record)) {
+                        // rewrite overlapping snapshot records through the 
record key
+                        taskContext.rewriteOutputBuffer(outputBuffer, record);
                     }
                 }
             }
-            // snapshot split return its data once
-            hasNextElement.set(false);
+        }
+        // snapshot split return its data once
+        hasNextElement.set(false);
 
-            final List<SourceRecord> normalizedRecords = new ArrayList<>();
-            normalizedRecords.add(lowWatermark);
-            
normalizedRecords.addAll(taskContext.formatMessageTimestamp(outputBuffer.values()));
-            normalizedRecords.add(highWatermark);
+        final List<SourceRecord> normalizedRecords = new ArrayList<>();
+        normalizedRecords.add(lowWatermark);
+        
normalizedRecords.addAll(taskContext.formatMessageTimestamp(outputBuffer.values()));
+        normalizedRecords.add(highWatermark);
 
-            final List<SourceRecords> sourceRecordsSet = new ArrayList<>();
-            sourceRecordsSet.add(new SourceRecords(normalizedRecords));
-            return sourceRecordsSet.iterator();
-        }
-        // the data has been polled, no more data
-        reachEnd.compareAndSet(false, true);
-        return null;
+        final List<SourceRecords> sourceRecordsSet = new ArrayList<>();
+        sourceRecordsSet.add(new SourceRecords(normalizedRecords));
+        return sourceRecordsSet.iterator();
     }
 
     private void assertLowWatermark(SourceRecord lowWatermark) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
index 6b3519f536..647ec9c140 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
@@ -173,6 +173,44 @@ public class MysqlCDCIT extends TestSuiteBase implements 
TestResource {
                         });
     }
 
+    @TestTemplate
+    public void testMysqlCdcCheckDataWithDisableExactlyonce(TestContainer 
container) {
+        // Clear related content to ensure that multiple operations are not 
affected
+        clearTable(MYSQL_DATABASE, SINK_TABLE);
+
+        CompletableFuture.supplyAsync(
+                () -> {
+                    try {
+                        
container.executeJob("/mysqlcdc_to_mysql_with_disable_exactly_once.conf");
+                    } catch (Exception e) {
+                        log.error("Commit task exception :" + e.getMessage());
+                        throw new RuntimeException(e);
+                    }
+                    return null;
+                });
+        await().atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            log.info(query(getSinkQuerySQL(MYSQL_DATABASE, 
SINK_TABLE)).toString());
+                            Assertions.assertIterableEquals(
+                                    query(getSourceQuerySQL(MYSQL_DATABASE, 
SOURCE_TABLE_1)),
+                                    query(getSinkQuerySQL(MYSQL_DATABASE, 
SINK_TABLE)));
+                        });
+
+        // insert update delete
+        executeSql("DELETE FROM " + MYSQL_DATABASE + "." + SOURCE_TABLE_1);
+        upsertDeleteSourceTable(MYSQL_DATABASE, SOURCE_TABLE_1);
+
+        // stream stage
+        await().atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            Assertions.assertIterableEquals(
+                                    query(getSourceQuerySQL(MYSQL_DATABASE, 
SOURCE_TABLE_1)),
+                                    query(getSinkQuerySQL(MYSQL_DATABASE, 
SINK_TABLE)));
+                        });
+    }
+
     @TestTemplate
     @DisabledOnContainer(
             value = {},
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_disable_exactly_once.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_disable_exactly_once.conf
new file mode 100644
index 0000000000..4b91a877d4
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_disable_exactly_once.conf
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  execution.parallelism = 1
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+  read_limit.bytes_per_second=7000000
+  read_limit.rows_per_second=400
+}
+
+source {
+  MySQL-CDC {
+    result_table_name = "customers_mysql_cdc"
+    server-id = 5652
+    username = "st_user"
+    password = "seatunnel"
+    table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
+    base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
+
+    exactly_once = false
+    snapshot.split.size = 1
+    snapshot.fetch.size = 1
+  }
+}
+
+sink {
+  jdbc {
+    source_table_name = "customers_mysql_cdc"
+    url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
+    driver = "com.mysql.cj.jdbc.Driver"
+    user = "st_user"
+    password = "seatunnel"
+
+    generate_sink_sql = true
+    # You need to configure both database and table
+    database = mysql_cdc
+    table = mysql_cdc_e2e_sink_table
+    primary_keys = ["id"]
+  }
+}
\ No newline at end of file

Reply via email to