Hi, Jiangjie, Thanks for your reply and suggestion.
In fact, we don't want to modify the way JM triggers checkpoint, but we hope to give OperatorCoodinator a mechanism similar to ExternallyInducedSourceReader to coordinate the sending timing of checkpoint barrier (just advance from Source to OperatorCoodinator). We hope that the produced data and Checkpoint have a one-to-one mapping. If there is such a mechanism, the difficulty of programming and design can be greatly simplified. In addition, I am not sure if there is the same need in other OperatorCoordinator, because we always make a snapshot of OperatorCoordinator immediately. Thanks, Ming Li Becket Qin <becket....@gmail.com> 于2023年2月28日周二 08:31写道: > Hi Ming, > > I am not sure if I fully understand what you want. It seems what you are > looking for is to have a checkpoint triggered at a customized timing which > aligns with some semantic. This is not what the current checkpoint in Flink > was designed for. I think the basic idea of checkpoint is to just take a > snapshot of the current state, so we can restore to that state in case of > failure. This is completely orthogonal to the data semantic. > > Even with the ExternallyInducedSourceReader, the checkpoint is still > triggered by the JM. It is just the effective checkpoint barrier message (a > custom message in this case) will not be sent by the JM, but by the > external source storage. This helps when the external source storage needs > its own internal state to be aligned with the state of the Flink > SourceReader. For example, if the external source storage can only seek at > some bulk boundary, then it might wait until the current bulk to finish > before it sends the custom checkpoint barrier to the SourceReader. > > Considering this scenario, if the data we want has not been produced yet, > > but the *SourceCoordinator* receives the c*heckpoint* message, it will > > directly make a *checkpoint*, and the *ExternallyInducedSource* will not > > make a *checkpoint* immediately after receiving the *checkpoint*, but > > continues to wait for a new split. Even if a new split is generated, due > to > > the behavior of closing *gateway* in *FLINK-28606*, the new split cannot > be > > assigned to the *Source*, resulting in a deadlock (or forced to wait for > > checkpoint to time out). > > > In this case, the source reader should not "wait" for the splits that are > not included in this checkpoint. These splits should be a part of the next > checkpoint. It would be the Sink's responsibility to ensure the output is > committed in a way that aligns with the user semantic. > > That said, I agree it might be useful in some cases if users can decided > the checkpoint triggering timing. But that will be a new feature which > needs some careful design. > > Thanks, > > Jiangjie (Becket) Qin >