Hi Fabian, Thanks for your reply!
It takes me a while to reconsider the proposal, and I think you’re right. The checkpoint hook would affect a lot of mechanisms we already have, and it’s unworthy. > Wouldn't a transactional sink provide exactly the same guarantees? Yes, it is. But it needs to cooperate with a trigger that fires right before each checkpoint, as you previously mentioned, and how should I achieve that with the current API? Thank you very much! Best, Paul Lam > 在 2018年10月15日,19:45,Fabian Hueske <fhue...@gmail.com> 写道: > > Hi Paul, > > I think this would be very tricky to implement and interfere with many parts > of the system like state backends, checkpointing logic, etc. > We would need to maintain a copy (or version) of the state at the time of a > checkpoint. There might be multiple checkpoints in flight. Checkpoints might > fail. We'd need to clean up the copies/versions. > Overall, I think this would be very complicated. > > Wouldn't a transactional sink provide exactly the same guarantees? > It would collect all results of the window operator and only apply them when > a checkpoint was successful. > In case of a failure, an open transaction is aborted and the non-committed > results are re-computed. > > Best, Fabian > > Am Mo., 15. Okt. 2018 um 13:29 Uhr schrieb Paul Lam <paullin3...@gmail.com > <mailto:paullin3...@gmail.com>>: > Hi Fabian, > > Perhaps I didn’t explain that clearly. Actually I want a trigger to fire when > a checkpoint is completed, and emit the intermediate results in consistency > with the completed checkpoint. > > It works like this: > 1) Once the window operator receives a barrier, it performs the snapshot as > usual, and also makes a copy of the current aggregates. > 2) When the checkpoint succeeds, the trigger gets a notification by > checkpoint listener and emits the intermediate aggregates that was copied > previously. > > It’s kind of similar to TwoPhaseCommitSinkFunction, but it’s used in a window > operator instead of a sink. > > The original motivation is that we want to keep a mysql table in > synchronization with the window aggregates, it can be done by firing the > trigger periodically to the get the newest intermediate results that can used > to update the external table. But neither timer nor queryable can provide > read-committed isolation, which is intolerable in my case, so I suggest > adding checkpoint hooks to the triggers to solve this problem. > > I think more cases that need to emit window aggregates periodically can > leverage this feature, for timers and queryable states are too heavy to meet > a simple need like this while providing a lower isolation level. > > Thanks a lot! > > Best, > Paul Lam > > > 在 2018年10月15日,15:47,Fabian Hueske <fhue...@gmail.com > > <mailto:fhue...@gmail.com>> 写道: > > > > Hi Paul, > > > > If I got your proposal right, you'd like to fire a Trigger right before a > > checkpoint is taken, correct? > > So, before taking a checkpoint, a Trigger would fire and the operator would > > process and emit some intermediate results. > > > > This approach would not completely solve the consistency issue because a > > checkpoint might fail. > > A better approach would be to use a transactional sink that is integrated > > with the checkpointing mechanism and emits data only on successful > > checkpoints. > > Flink provides the TwoPhaseCommitSinkFunction (see blog post [1]) and one > > implemention for an exactly-once Kafka sink. > > > > Best, > > Fabian > > > > [1] > > https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html > > > > <https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html> > > > > Am Mo., 15. Okt. 2018 um 04:52 Uhr schrieb Paul Lam <paullin3...@gmail.com > > <mailto:paullin3...@gmail.com>>: > > > >> Hi, > >> > >> I’ve come across some scenarios that periodic emitting aggregates is > >> needed in case of event time windows, and I think it’s good to have a > >> checkpoint hook on triggers. > >> > >> Suppose we want a day metric, and the most intuitive way is to define a 1d > >> event time window to calculate it. By default, the event time trigger fires > >> and emit the final results when the watermark reaches the end of a day, but > >> we hope to see the realtime(or near realtime) intermediate results also, so > >> now we have several viable approaches I can think of: > >> > >> 1. Implement a custom trigger (FIRE_AND_PURGE at midnight, and FIRE > >> periodically). We could register a processing time timer to fire the > >> trigger in the trigger context, but it has some drawbacks. First, we can > >> only access the trigger context in a method, and there it’s no some method > >> like open(TriggerContext) which was called on initialization, so we have to > >> register a timer in the onElement(..) method when it was called for the > >> first time and it’s not elegant. Second, emitting result on processing time > >> provides only read-uncommitted consistency, which is not enough in some > >> scenarios. > >> > >> 2. Use queryable states and pull state updates from external systems. This > >> requires changing the architecture to pull-based and the change would be > >> too much. What’s more, the queryable state API is not stable yet. > >> > >> 3. Change the window to a smaller one (e.g. 1 min window) which emits > >> incremental aggregates, and reduce the results in external systems. This > >> falls back to a stateless streaming job, making the architecture complex > >> and the consistency weak. > >> > >> So I suggest adding a checkpoint hook to the window triggers to enable > >> emitting aggregates periodically with awareness of checkpointing, which > >> solves the problems I mentioned in approach 1. > >> > >> Since this is a most common scenario, there should be lots of practices to > >> get it done which I haven't figured out yet, but I think it still make > >> sense to add such a method to the triggers for the consistency reason. > >> > >> Any suggestion is appreciated! Thanks a lot! > >> > >> Best, > >> Paul Lam > >> > >> > >> > >> >