Adamyuanyuan commented on code in PR #10208:
URL: https://github.com/apache/seatunnel/pull/10208#discussion_r2654876790
##########
seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java:
##########
@@ -91,21 +93,25 @@ public SourceReader<SeaTunnelRow, SplitWrapper<SplitT>>
createReader(
@Override
public SplitEnumerator<SplitWrapper<SplitT>, EnumStateT> createEnumerator(
SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext) throws
Exception {
+ Set<Integer> noMoreSplitsSignaledReaders =
ConcurrentHashMap.newKeySet();
SourceSplitEnumerator.Context<SplitT> context =
- new FlinkSourceSplitEnumeratorContext<>(enumContext);
+ new FlinkSourceSplitEnumeratorContext<>(
+ enumContext, noMoreSplitsSignaledReaders::add);
SourceSplitEnumerator<SplitT, EnumStateT> enumerator =
source.createEnumerator(context);
- return new FlinkSourceEnumerator<>(enumerator, enumContext);
+ return new FlinkSourceEnumerator<>(enumerator, enumContext,
noMoreSplitsSignaledReaders);
}
@Override
public SplitEnumerator<SplitWrapper<SplitT>, EnumStateT> restoreEnumerator(
SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext,
EnumStateT checkpoint)
throws Exception {
+ Set<Integer> noMoreSplitsSignaledReaders =
ConcurrentHashMap.newKeySet();
Review Comment:
noMoreSplitsSignaledReaders is a runtime helper set in the Flink translation
layer. It records which reader subtasks have already been signaled “no more
splits”, so if a reader is re-registered after failover while the
coordinator/enumerator keeps running, we can re-emit NoMoreSplitsEvent for that
subtask. A set is sufficient because we only need a per-subtask “ever signaled”
marker (the concurrent set is mainly to avoid potential races).
In restoreEnumerator(...) we intentionally create a fresh empty set because
the enumerator instance is being reconstructed for a new attempt, and this
helper state is not part of the connector’s checkpointed enumerator state
(EnumStateT). Persisting it would require changing/wrapping EnumStateT and
handling state compatibility / rescaling concerns. The set will be repopulated
in the new attempt when signalNoMoreSplits is called again, and subsequent
re-registrations can still be handled correctly.
The core scenario to fix this time is "the reader restarts/re-registers, but
the enumerator/coordinator does not rebuild along with it" (so run() will not
run again). In this case, checkpoint restoration will not be performed, and
noMoreSplitsSignaledReaders, as an in-memory state, remains all the time, which
can just be used to judge and reissue signalNoMoreSplits.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]