Hi Hong, Great question! Afaik, it depends on the implementation. Speaking of the "loopback" event sending to the SplitEnumerator, I guess you meant here [1] (It might be good, if you could point out the right position in the source code to help us understand the question better:)), which will end up with calling the SplitEnumerator[2]. There is only one implementation of the method handleSourceEvent(int subtaskId, SourceEvent sourceEvent) in HybridSourceSplitEnumerator[3].
The only call that will send a operator event to the SplitEnumerator I found in the current master branch is in the HybridSourceReader when the reader reaches the end of the input of the current source[4]. Since the call is in SourceReader#pollNext(ReaderOutput output), it should follow the exactly once semantic mechanism defined by [5]. My understanding is that the OperatorEvent 1 will belong to the epoch after the checkpoint in this case. Best regards, Jing [1] https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java#L284 [2] https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java#L120 [3] https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java#L195 [4] https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java#LL95C13-L95C13 [5] https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java#L66 On Thu, May 25, 2023 at 4:27 AM Hongshun Wang <loserwang1...@gmail.com> wrote: > Hi Hong, > > The checkpoint is triggered by the timer executor of CheckpointCoordinator. > It triggers the checkpoint in SourceCoordinator (which is passed to > SplitEnumerator) and then in SourceOperator. The checkpoint event is put in > SplitEnumerator's event loop to be executed. You can see the details here. > > Yours > Hongshun > > On Wed, May 17, 2023 at 11:39 PM Teoh, Hong <lian...@amazon.co.uk.invalid> > wrote: > > > Hi all, > > > > I’m writing a new source based on the FLIP-27 Source API, and I had some > > questions on the checkpointing mechanisms and associated guarantees. > Would > > appreciate if someone more familiar with the API would be able to provide > > insights here! > > > > In FLIP-27 Source, we now have a SplitEnumerator (running on JM) and a > > SourceReader (running on TM). However, the SourceReader can send events > to > > the SplitEnumerator. Given this, we have introduced a “loopback” > > communication mechanism from TM to JM, and I wonder if/how we handle this > > during checkpoints. > > > > > > Example of how data might be lost: > > 1. Checkpoint 123 triggered > > 2. SplitEnumerator takes checkpoint of state for checkpoint 123 > > 3. SourceReader sends OperatorEvent 1 and mutates state to reflect this > > 4. SourceReader takes checkpoint of state for checkpoint 123 > > … > > 5. Checkpoint 123 completes > > > > Let’s assume OperatorEvent 1 would mutate SplitEnumerator state once > > processed, There is now inconsistent state between SourceReader state and > > SplitEnumerator state. (SourceReader assumes OperatorEvent 1 is > processed, > > whereas SplitEnumerator has not processed OperatorEvent 1) > > > > Do we have any mechanisms for mitigating this issue? For example, does > the > > SplitEnumerator re-take the snapshot of state for a checkpoint if an > > OperatorEvent is sent before the checkpoint is complete? > > > > Regards, > > Hong >