yuxiqian commented on code in PR #4101:
URL: https://github.com/apache/flink-cdc/pull/4101#discussion_r2338390820


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java:
##########
@@ -86,18 +88,21 @@ public IncrementalSourceScanFetcher(FetchTask.Context 
taskContext, int subtaskId
     }
 
     @Override
-    public void submitTask(FetchTask<SourceSplitBase> fetchTask) {
+    public Future<?> submitTask(FetchTask<SourceSplitBase> fetchTask) {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+
         this.snapshotSplitReadTask = fetchTask;
         this.currentSnapshotSplit = fetchTask.getSplit().asSnapshotSplit();
         taskContext.configure(currentSnapshotSplit);
         this.queue = taskContext.getQueue();
         this.hasNextElement.set(true);
         this.reachEnd.set(false);
 
-        executorService.execute(
+        return executorService.submit(
                 () -> {
                     try {
                         snapshotSplitReadTask.execute(taskContext);
+                        completableFuture.complete(null);
                     } catch (Exception e) {
                         setReadException(e);

Review Comment:
   Should we call `Future#completeExceptionally` if an exception occurs?



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/Fetcher.java:
##########
@@ -33,7 +34,7 @@
 public interface Fetcher<T, Split> {
 
     /** Add to task to fetch, this should call only when the reader is idle. */
-    void submitTask(FetchTask<Split> fetchTask);
+    Future<?> submitTask(FetchTask<Split> fetchTask);

Review Comment:
   IIUC this `Future` works like a trigger and returns nothing. Use 
`Future<Void>` instead?



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlTableIdCaseInsensitveITCase.java:
##########
@@ -196,7 +196,7 @@ private List<Event> executeAlterAndProvideExpected(TableId 
tableId, Statement st
 
         statement.execute(
                 String.format(
-                        "ALTER TABLE `%s`.`products` ADD COLUMN `DESC1` 
VARCHAR(45) NULL AFTER `weight`;",
+                        "ALTER TABLE `%s`.`products` ADD COLUMN `DESC1` 
VARCHAR(45) NULL AFTER `WEIGHT`;",

Review Comment:
   Is this change relevant?



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java:
##########
@@ -1119,6 +1121,37 @@ public void 
testBinlogOffsetCompareWithSnapshotAndBinlogPhase() throws Exception
         Assertions.assertThat(sourceRecords).isEmpty();
     }
 
+    @Test
+    void testReadBinlogWithException() throws Exception {
+        customerDatabase.createAndInitialize();
+        MySqlSourceConfig sourceConfig =
+                getConfig(StartupOptions.latest(), new String[] {"customers"});
+        binaryLogClient = 
DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
+        mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig);
+
+        // Create reader and submit splits
+        StatefulTaskContext statefulTaskContext =
+                new StatefulTaskContext(sourceConfig, binaryLogClient, 
mySqlConnection);
+        MySqlBinlogSplit split = createBinlogSplit(sourceConfig);
+        BinlogSplitReader reader = new BinlogSplitReader(statefulTaskContext, 
0);
+
+        // Mock an exception occurring during stream split reading by setting 
the error handler
+        // and stopping the change event source to test exception handling
+        Future<?> future = reader.submitSplit(split);
+        statefulTaskContext
+                .getErrorHandler()
+                .setProducerThrowable(new RuntimeException("Test read with 
exception"));
+        reader.getChangeEventSourceContext().stopChangeEventSource();
+        // wait until executor is finished.
+        future.get();

Review Comment:
   Better provide a timeout here, or the test case might hang if something went 
wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to