Hi Hong,

Great question! Afaik, it depends on the implementation. Speaking of the
"loopback" event sending to the SplitEnumerator, I guess you meant here [1]
(It might be good, if you could point out the right position in the source
code to help us understand the question better:)), which will end up with
calling the SplitEnumerator[2]. There is only one implementation of the
method handleSourceEvent(int subtaskId, SourceEvent sourceEvent) in
HybridSourceSplitEnumerator[3].

The only call that will send a operator event to the SplitEnumerator I
found in the current master branch is in the HybridSourceReader when the
reader reaches the end of the input of the current source[4]. Since the
call is in SourceReader#pollNext(ReaderOutput output), it should follow the
exactly once semantic mechanism defined by [5]. My understanding is that
the OperatorEvent 1 will belong to the epoch after the checkpoint in this
case.

Best regards,
Jing

[1]
https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java#L284
[2]
https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java#L120
[3]
https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java#L195
[4]
https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java#LL95C13-L95C13
[5]
https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java#L66

On Thu, May 25, 2023 at 4:27 AM Hongshun Wang <loserwang1...@gmail.com>
wrote:

> Hi Hong,
>
> The checkpoint is triggered by the timer executor of CheckpointCoordinator.
> It triggers the checkpoint in SourceCoordinator (which is passed to
> SplitEnumerator) and then in SourceOperator. The checkpoint event is put in
> SplitEnumerator's event loop to be executed. You can see the details here.
>
> Yours
> Hongshun
>
> On Wed, May 17, 2023 at 11:39 PM Teoh, Hong <lian...@amazon.co.uk.invalid>
> wrote:
>
> > Hi all,
> >
> > I’m writing a new source based on the FLIP-27 Source API, and I had some
> > questions on the checkpointing mechanisms and associated guarantees.
> Would
> > appreciate if someone more familiar with the API would be able to provide
> > insights here!
> >
> > In FLIP-27 Source, we now have a SplitEnumerator (running on JM) and a
> > SourceReader (running on TM). However, the SourceReader can send events
> to
> > the SplitEnumerator. Given this, we have introduced a “loopback”
> > communication mechanism from TM to JM, and I wonder if/how we handle this
> > during checkpoints.
> >
> >
> > Example of how data might be lost:
> > 1. Checkpoint 123 triggered
> > 2. SplitEnumerator takes checkpoint of state for checkpoint 123
> > 3. SourceReader sends OperatorEvent 1 and mutates state to reflect this
> > 4. SourceReader takes checkpoint of state for checkpoint 123
> > …
> > 5. Checkpoint 123 completes
> >
> > Let’s assume OperatorEvent 1 would mutate SplitEnumerator state once
> > processed, There is now inconsistent state between SourceReader state and
> > SplitEnumerator state. (SourceReader assumes OperatorEvent 1 is
> processed,
> > whereas SplitEnumerator has not processed OperatorEvent 1)
> >
> > Do we have any mechanisms for mitigating this issue? For example, does
> the
> > SplitEnumerator re-take the snapshot of state for a checkpoint if an
> > OperatorEvent is sent before the checkpoint is complete?
> >
> > Regards,
> > Hong
>

Reply via email to