yuxiqian commented on code in PR #4101:
URL: https://github.com/apache/flink-cdc/pull/4101#discussion_r2338390820
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java:
##########
@@ -86,18 +88,21 @@ public IncrementalSourceScanFetcher(FetchTask.Context
taskContext, int subtaskId
}
@Override
- public void submitTask(FetchTask<SourceSplitBase> fetchTask) {
+ public Future<?> submitTask(FetchTask<SourceSplitBase> fetchTask) {
+ CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+
this.snapshotSplitReadTask = fetchTask;
this.currentSnapshotSplit = fetchTask.getSplit().asSnapshotSplit();
taskContext.configure(currentSnapshotSplit);
this.queue = taskContext.getQueue();
this.hasNextElement.set(true);
this.reachEnd.set(false);
- executorService.execute(
+ return executorService.submit(
() -> {
try {
snapshotSplitReadTask.execute(taskContext);
+ completableFuture.complete(null);
} catch (Exception e) {
setReadException(e);
Review Comment:
Should we call `Future#completeExceptionally` if an exception occurs?
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/Fetcher.java:
##########
@@ -33,7 +34,7 @@
public interface Fetcher<T, Split> {
/** Add to task to fetch, this should call only when the reader is idle. */
- void submitTask(FetchTask<Split> fetchTask);
+ Future<?> submitTask(FetchTask<Split> fetchTask);
Review Comment:
IIUC this `Future` works like a trigger and returns nothing. Use
`Future<Void>` instead?
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlTableIdCaseInsensitveITCase.java:
##########
@@ -196,7 +196,7 @@ private List<Event> executeAlterAndProvideExpected(TableId
tableId, Statement st
statement.execute(
String.format(
- "ALTER TABLE `%s`.`products` ADD COLUMN `DESC1`
VARCHAR(45) NULL AFTER `weight`;",
+ "ALTER TABLE `%s`.`products` ADD COLUMN `DESC1`
VARCHAR(45) NULL AFTER `WEIGHT`;",
Review Comment:
Is this change relevant?
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java:
##########
@@ -1119,6 +1121,37 @@ public void
testBinlogOffsetCompareWithSnapshotAndBinlogPhase() throws Exception
Assertions.assertThat(sourceRecords).isEmpty();
}
+ @Test
+ void testReadBinlogWithException() throws Exception {
+ customerDatabase.createAndInitialize();
+ MySqlSourceConfig sourceConfig =
+ getConfig(StartupOptions.latest(), new String[] {"customers"});
+ binaryLogClient =
DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
+ mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig);
+
+ // Create reader and submit splits
+ StatefulTaskContext statefulTaskContext =
+ new StatefulTaskContext(sourceConfig, binaryLogClient,
mySqlConnection);
+ MySqlBinlogSplit split = createBinlogSplit(sourceConfig);
+ BinlogSplitReader reader = new BinlogSplitReader(statefulTaskContext,
0);
+
+ // Mock an exception occurring during stream split reading by setting
the error handler
+ // and stopping the change event source to test exception handling
+ Future<?> future = reader.submitSplit(split);
+ statefulTaskContext
+ .getErrorHandler()
+ .setProducerThrowable(new RuntimeException("Test read with
exception"));
+ reader.getChangeEventSourceContext().stopChangeEventSource();
+ // wait until executor is finished.
+ future.get();
Review Comment:
Better provide a timeout here, or the test case might hang if something went
wrong.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]