Hi all,
Thanks for the detailed responses!

I’ve taken a closer look, and as far as I can tell, this is an existing problem 
with the checkpointing mechanism. This is also tracked in this JIRA [1], but 
not fixed. The PR attached to the JIRA temporarily blocks the events being sent 
from Coordinator -> Subtask during checkpointing, but not the other way round, 
Subtask -> Coordinator. Thanks QingSheng for pointing this JIRA out!

TLDR, currently, there is no guarantee that OperatorEvents sent from Subtask to 
Coordinator will be processed before it’s rightful checkpoint. This will not be 
a problem if the OperatorEvents do not cause state mutation (reader 
registration, request split). It will cause problems if there is expected state 
mutation.

I’ll quickly reiterate the problem, then address the comments on the thread.

PROBLEM SUMMARY
Consider situation below:

Checkpoints and epochs:
1. EPOCH 1
2. Checkpoint 1
3. EPOCH 2

We have SourceCoordinator on JM and SourceSubtask on TM. Below illustrates the 
existing behaviour and the problem.
1. SourceCoordinator sends event A to SourceSubtask. This event belongs to 
EPOCH 1.
2. SourceSubtask processes records and sends event B to SourceCoordinator. This 
event belongs to EPOCH 1.
3. SourceCoordinator takes checkpoint 1.
  - We know that since RPC guarantees FIFO, event A will reach SourceSubtask 
before checkpoint barriers for checkpoint 1. This is good, since the event 
belongs to EPOCH 1.
  - During the checkpoint, further events sent after checkpoint is triggered 
are blocked from being sent to SourceSubtask, since they belong to EPOCH 2. 
This is correct.
4. SourceCoordinator receives event B. This is received and processes AFTER 
checkpoint 1. This is wrong, since this event belongs to EPOCH 1.

Re Piotrek:
> it was solving the problem via canceling the checkpoint in the scenario that 
> you described

I don’t see this in master, but this would be an interesting approach. We could 
make this work by allowing SourceCoordinator to cancel a checkpoint if an event 
with timestamp earlier than checkpoint timestamp is received. Alternatively, we 
could implement a pre-checkpoint checkpoint between Coordinator and Subtask, 
with both way communication channels blocked on the 2nd acknowledgement from 
Subtask to Coordinator. That way we are guaranteed to not have this 
inconsistency, with the benefit of not failing checkpoints!

Re Mason:
> I guess the inconsistency could also be handled by the reader.start() method 
> such that the operator event is re-sent during restore.

This is a chicken and egg problem. Event B belongs to checkpoint 1, but is not 
included. In order to re-send the event, it would need to be stored in 
checkpoint 2. If the job fails after taking checkpoint 1, there will be no 
checkpoint 2 to restore from, and the event is lost.

Re Jing:
> There is only one implementation of the method handleSourceEvent(int 
> subtaskId, SourceEvent sourceEvent) in HybridSourceSplitEnumerator[

That is correct, but the SourceReaderContext interface allows users to 
implement their own custom events and send them from Subtask to Coordinator [2].

> My understanding is that the OperatorEvent 1 will belong to the epoch after 
> the checkpoint in this case.

That would be true for events sent from Coordinator -> Subtask, but not from 
Subtask -> Coordinator.

I will work with those active on [1] to resolve this situation! Any feedback is 
welcome!

Regards,
Hong


[1] https://issues.apache.org/jira/browse/FLINK-26029
[2] 
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java#LL57C17-L57C47



On 26 May 2023, at 23:22, Jing Ge <j...@ververica.com.INVALID> wrote:

CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.



Hi Hong,

Great question! Afaik, it depends on the implementation. Speaking of the
"loopback" event sending to the SplitEnumerator, I guess you meant here [1]
(It might be good, if you could point out the right position in the source
code to help us understand the question better:)), which will end up with
calling the SplitEnumerator[2]. There is only one implementation of the
method handleSourceEvent(int subtaskId, SourceEvent sourceEvent) in
HybridSourceSplitEnumerator[3].

The only call that will send a operator event to the SplitEnumerator I
found in the current master branch is in the HybridSourceReader when the
reader reaches the end of the input of the current source[4]. Since the
call is in SourceReader#pollNext(ReaderOutput output), it should follow the
exactly once semantic mechanism defined by [5]. My understanding is that
the OperatorEvent 1 will belong to the epoch after the checkpoint in this
case.

Best regards,
Jing

[1]
https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java#L284
[2]
https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java#L120
[3]
https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java#L195
[4]
https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java#LL95C13-L95C13
[5]
https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java#L66

On Thu, May 25, 2023 at 4:27 AM Hongshun Wang <loserwang1...@gmail.com>
wrote:

Hi Hong,

The checkpoint is triggered by the timer executor of CheckpointCoordinator.
It triggers the checkpoint in SourceCoordinator (which is passed to
SplitEnumerator) and then in SourceOperator. The checkpoint event is put in
SplitEnumerator's event loop to be executed. You can see the details here.

Yours
Hongshun

On Wed, May 17, 2023 at 11:39 PM Teoh, Hong <lian...@amazon.co.uk.invalid>
wrote:

Hi all,

I’m writing a new source based on the FLIP-27 Source API, and I had some
questions on the checkpointing mechanisms and associated guarantees.
Would
appreciate if someone more familiar with the API would be able to provide
insights here!

In FLIP-27 Source, we now have a SplitEnumerator (running on JM) and a
SourceReader (running on TM). However, the SourceReader can send events
to
the SplitEnumerator. Given this, we have introduced a “loopback”
communication mechanism from TM to JM, and I wonder if/how we handle this
during checkpoints.


Example of how data might be lost:
1. Checkpoint 123 triggered
2. SplitEnumerator takes checkpoint of state for checkpoint 123
3. SourceReader sends OperatorEvent 1 and mutates state to reflect this
4. SourceReader takes checkpoint of state for checkpoint 123
…
5. Checkpoint 123 completes

Let’s assume OperatorEvent 1 would mutate SplitEnumerator state once
processed, There is now inconsistent state between SourceReader state and
SplitEnumerator state. (SourceReader assumes OperatorEvent 1 is
processed,
whereas SplitEnumerator has not processed OperatorEvent 1)

Do we have any mechanisms for mitigating this issue? For example, does
the
SplitEnumerator re-take the snapshot of state for a checkpoint if an
OperatorEvent is sent before the checkpoint is complete?

Regards,
Hong


Reply via email to