Hi all, Thanks for the detailed responses! I’ve taken a closer look, and as far as I can tell, this is an existing problem with the checkpointing mechanism. This is also tracked in this JIRA [1], but not fixed. The PR attached to the JIRA temporarily blocks the events being sent from Coordinator -> Subtask during checkpointing, but not the other way round, Subtask -> Coordinator. Thanks QingSheng for pointing this JIRA out!
TLDR, currently, there is no guarantee that OperatorEvents sent from Subtask to Coordinator will be processed before it’s rightful checkpoint. This will not be a problem if the OperatorEvents do not cause state mutation (reader registration, request split). It will cause problems if there is expected state mutation. I’ll quickly reiterate the problem, then address the comments on the thread. PROBLEM SUMMARY Consider situation below: Checkpoints and epochs: 1. EPOCH 1 2. Checkpoint 1 3. EPOCH 2 We have SourceCoordinator on JM and SourceSubtask on TM. Below illustrates the existing behaviour and the problem. 1. SourceCoordinator sends event A to SourceSubtask. This event belongs to EPOCH 1. 2. SourceSubtask processes records and sends event B to SourceCoordinator. This event belongs to EPOCH 1. 3. SourceCoordinator takes checkpoint 1. - We know that since RPC guarantees FIFO, event A will reach SourceSubtask before checkpoint barriers for checkpoint 1. This is good, since the event belongs to EPOCH 1. - During the checkpoint, further events sent after checkpoint is triggered are blocked from being sent to SourceSubtask, since they belong to EPOCH 2. This is correct. 4. SourceCoordinator receives event B. This is received and processes AFTER checkpoint 1. This is wrong, since this event belongs to EPOCH 1. Re Piotrek: > it was solving the problem via canceling the checkpoint in the scenario that > you described I don’t see this in master, but this would be an interesting approach. We could make this work by allowing SourceCoordinator to cancel a checkpoint if an event with timestamp earlier than checkpoint timestamp is received. Alternatively, we could implement a pre-checkpoint checkpoint between Coordinator and Subtask, with both way communication channels blocked on the 2nd acknowledgement from Subtask to Coordinator. That way we are guaranteed to not have this inconsistency, with the benefit of not failing checkpoints! Re Mason: > I guess the inconsistency could also be handled by the reader.start() method > such that the operator event is re-sent during restore. This is a chicken and egg problem. Event B belongs to checkpoint 1, but is not included. In order to re-send the event, it would need to be stored in checkpoint 2. If the job fails after taking checkpoint 1, there will be no checkpoint 2 to restore from, and the event is lost. Re Jing: > There is only one implementation of the method handleSourceEvent(int > subtaskId, SourceEvent sourceEvent) in HybridSourceSplitEnumerator[ That is correct, but the SourceReaderContext interface allows users to implement their own custom events and send them from Subtask to Coordinator [2]. > My understanding is that the OperatorEvent 1 will belong to the epoch after > the checkpoint in this case. That would be true for events sent from Coordinator -> Subtask, but not from Subtask -> Coordinator. I will work with those active on [1] to resolve this situation! Any feedback is welcome! Regards, Hong [1] https://issues.apache.org/jira/browse/FLINK-26029 [2] https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java#LL57C17-L57C47 On 26 May 2023, at 23:22, Jing Ge <j...@ververica.com.INVALID> wrote: CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe. Hi Hong, Great question! Afaik, it depends on the implementation. Speaking of the "loopback" event sending to the SplitEnumerator, I guess you meant here [1] (It might be good, if you could point out the right position in the source code to help us understand the question better:)), which will end up with calling the SplitEnumerator[2]. There is only one implementation of the method handleSourceEvent(int subtaskId, SourceEvent sourceEvent) in HybridSourceSplitEnumerator[3]. The only call that will send a operator event to the SplitEnumerator I found in the current master branch is in the HybridSourceReader when the reader reaches the end of the input of the current source[4]. Since the call is in SourceReader#pollNext(ReaderOutput output), it should follow the exactly once semantic mechanism defined by [5]. My understanding is that the OperatorEvent 1 will belong to the epoch after the checkpoint in this case. Best regards, Jing [1] https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java#L284 [2] https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java#L120 [3] https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java#L195 [4] https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java#LL95C13-L95C13 [5] https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java#L66 On Thu, May 25, 2023 at 4:27 AM Hongshun Wang <loserwang1...@gmail.com> wrote: Hi Hong, The checkpoint is triggered by the timer executor of CheckpointCoordinator. It triggers the checkpoint in SourceCoordinator (which is passed to SplitEnumerator) and then in SourceOperator. The checkpoint event is put in SplitEnumerator's event loop to be executed. You can see the details here. Yours Hongshun On Wed, May 17, 2023 at 11:39 PM Teoh, Hong <lian...@amazon.co.uk.invalid> wrote: Hi all, I’m writing a new source based on the FLIP-27 Source API, and I had some questions on the checkpointing mechanisms and associated guarantees. Would appreciate if someone more familiar with the API would be able to provide insights here! In FLIP-27 Source, we now have a SplitEnumerator (running on JM) and a SourceReader (running on TM). However, the SourceReader can send events to the SplitEnumerator. Given this, we have introduced a “loopback” communication mechanism from TM to JM, and I wonder if/how we handle this during checkpoints. Example of how data might be lost: 1. Checkpoint 123 triggered 2. SplitEnumerator takes checkpoint of state for checkpoint 123 3. SourceReader sends OperatorEvent 1 and mutates state to reflect this 4. SourceReader takes checkpoint of state for checkpoint 123 … 5. Checkpoint 123 completes Let’s assume OperatorEvent 1 would mutate SplitEnumerator state once processed, There is now inconsistent state between SourceReader state and SplitEnumerator state. (SourceReader assumes OperatorEvent 1 is processed, whereas SplitEnumerator has not processed OperatorEvent 1) Do we have any mechanisms for mitigating this issue? For example, does the SplitEnumerator re-take the snapshot of state for a checkpoint if an OperatorEvent is sent before the checkpoint is complete? Regards, Hong