github-actions[bot] commented on code in PR #63514:
URL: https://github.com/apache/doris/pull/63514#discussion_r3288188985


##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -563,12 +564,9 @@ private PollResult waitForAnyCompletion() throws Exception 
{
             anyOf.join(); // Wait for at least one to complete
 
             // Find and process completed futures
-            Iterator<CompletableFuture<PollResult>> iterator = 
activePollFutures.iterator();
-            while (iterator.hasNext()) {
-                CompletableFuture<PollResult> future = iterator.next();
-
+            for (CompletableFuture<PollResult> future : activePollFutures) {
                 if (future.isDone()) {

Review Comment:
   `pollRecords()` is now monitor-free, but this loop still dereferences the 
shared `activePollFutures` field while `close()`/`finishSplitRecords()` can run 
concurrently, cancel the futures, clear the list, and set the field to `null` 
under the reader monitor. A DROP/cancel during snapshot polling can therefore 
wake `anyOf.join()` and then hit this line after `activePollFutures` has become 
`null` (or after the list was cleared), turning cleanup into an NPE/failed poll 
instead of deterministic cancellation. Please keep a local snapshot of the list 
for the whole wait/process cycle, or protect all accesses and the clear/null 
transition with the same synchronization so close cannot invalidate the field 
while the poll thread is using it.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -582,12 +582,9 @@ private PollResult waitForAnyCompletion() throws Exception 
{
             anyOf.join(); // Wait for at least one to complete
 
             // Find and process completed futures
-            Iterator<CompletableFuture<PollResult>> iterator = 
activePollFutures.iterator();
-            while (iterator.hasNext()) {
-                CompletableFuture<PollResult> future = iterator.next();
-
+            for (CompletableFuture<PollResult> future : activePollFutures) {
                 if (future.isDone()) {

Review Comment:
   Same snapshot-poll race as in the JDBC/PG reader: `pollRecords()` no longer 
holds the reader monitor, while `close()`/`finishSplitRecords()` can 
concurrently clear `activePollFutures` and set it to `null`. If a DROP/cancel 
arrives while this thread is blocked in `anyOf.join()`, the cancellation can 
wake the join and the next access to `activePollFutures` here can observe 
`null` or an emptied list, causing an NPE/failed task during cleanup. Please 
make the future list lifetime stable for the whole wait loop (for example, use 
a local list reference and avoid nulling it out underneath the poller) or 
synchronize the mutation and polling accesses consistently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to