Adamyuanyuan commented on code in PR #10208:
URL: https://github.com/apache/seatunnel/pull/10208#discussion_r2654876790


##########
seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java:
##########
@@ -91,21 +93,25 @@ public SourceReader<SeaTunnelRow, SplitWrapper<SplitT>> 
createReader(
     @Override
     public SplitEnumerator<SplitWrapper<SplitT>, EnumStateT> createEnumerator(
             SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext) throws 
Exception {
+        Set<Integer> noMoreSplitsSignaledReaders = 
ConcurrentHashMap.newKeySet();
         SourceSplitEnumerator.Context<SplitT> context =
-                new FlinkSourceSplitEnumeratorContext<>(enumContext);
+                new FlinkSourceSplitEnumeratorContext<>(
+                        enumContext, noMoreSplitsSignaledReaders::add);
         SourceSplitEnumerator<SplitT, EnumStateT> enumerator = 
source.createEnumerator(context);
-        return new FlinkSourceEnumerator<>(enumerator, enumContext);
+        return new FlinkSourceEnumerator<>(enumerator, enumContext, 
noMoreSplitsSignaledReaders);
     }
 
     @Override
     public SplitEnumerator<SplitWrapper<SplitT>, EnumStateT> restoreEnumerator(
             SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext, 
EnumStateT checkpoint)
             throws Exception {
+        Set<Integer> noMoreSplitsSignaledReaders = 
ConcurrentHashMap.newKeySet();

Review Comment:
   noMoreSplitsSignaledReaders is a runtime helper set in the Flink translation 
layer. It records which reader subtasks have already been signaled “no more 
splits”, so if a reader is re-registered after failover while the 
coordinator/enumerator keeps running, we can re-emit NoMoreSplitsEvent for that 
subtask. A set is sufficient because we only need a per-subtask “ever signaled” 
marker (the concurrent set is mainly to avoid potential races).
   
   In restoreEnumerator(...) we intentionally create a fresh empty set because 
the enumerator instance is being reconstructed for a new attempt, and this 
helper state is not part of the connector’s checkpointed enumerator state 
(EnumStateT). Persisting it would require changing/wrapping EnumStateT and 
handling state compatibility / rescaling concerns. The set will be repopulated 
in the new attempt when signalNoMoreSplits is called again, and subsequent 
re-registrations can still be handled correctly.
   
   The core scenario to fix this time is "the reader restarts/re-registers, but 
the enumerator/coordinator does not rebuild along with it" (so run() will not 
run again). In this case, checkpoint restoration will not be performed, and 
noMoreSplitsSignaledReaders, as an in-memory state, remains all the time, which 
can just be used to judge and reissue signalNoMoreSplits.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to