Hi Fabian,

Thanks for your reply!

It takes me a while to reconsider the proposal, and I think you’re right. The 
checkpoint hook would affect a lot of mechanisms we already have, and it’s 
unworthy.

> Wouldn't a transactional sink provide exactly the same guarantees?

Yes, it is. But it needs to cooperate with a trigger that fires right before 
each checkpoint, as you previously mentioned, and how should I achieve that 
with the current API?

Thank you very much!

Best,
Paul Lam


> 在 2018年10月15日,19:45,Fabian Hueske <fhue...@gmail.com> 写道:
> 
> Hi Paul,
> 
> I think this would be very tricky to implement and interfere with many parts 
> of the system like state backends, checkpointing logic, etc. 
> We would need to maintain a copy (or version) of the state at the time of a 
> checkpoint. There might be multiple checkpoints in flight. Checkpoints might 
> fail. We'd need to clean up the copies/versions. 
> Overall, I think this would be very complicated.
> 
> Wouldn't a transactional sink provide exactly the same guarantees?
> It would collect all results of the window operator and only apply them when 
> a checkpoint was successful.
> In case of a failure, an open transaction is aborted and the non-committed 
> results are re-computed.
> 
> Best, Fabian
> 
> Am Mo., 15. Okt. 2018 um 13:29 Uhr schrieb Paul Lam <paullin3...@gmail.com 
> <mailto:paullin3...@gmail.com>>:
> Hi Fabian,
> 
> Perhaps I didn’t explain that clearly. Actually I want a trigger to fire when 
> a checkpoint is completed, and emit the intermediate results in consistency
> with the completed checkpoint.
> 
> It works like this: 
> 1) Once the window operator receives a barrier, it performs the snapshot as 
> usual, and also makes a copy of the current aggregates. 
> 2) When the checkpoint succeeds, the trigger gets a notification by 
> checkpoint listener and emits the intermediate aggregates that was copied 
> previously.
> 
> It’s kind of similar to TwoPhaseCommitSinkFunction, but it’s used in a window 
> operator instead of a sink. 
> 
> The original motivation is that we want to keep a mysql table in 
> synchronization with the window aggregates, it can be done by firing the 
> trigger periodically to the get the newest intermediate results that can used 
> to update the external table. But neither timer nor queryable can provide 
> read-committed isolation, which is intolerable in my case, so I suggest 
> adding checkpoint hooks to the triggers to solve this problem.
> 
> I think more cases that need to emit window aggregates periodically can 
> leverage this feature, for timers and queryable states are too heavy to meet 
> a simple need like this while providing a lower isolation level.
> 
> Thanks a lot!
> 
> Best,
> Paul Lam
> 
> > 在 2018年10月15日,15:47,Fabian Hueske <fhue...@gmail.com 
> > <mailto:fhue...@gmail.com>> 写道:
> > 
> > Hi Paul,
> > 
> > If I got your proposal right, you'd like to fire a Trigger right before a
> > checkpoint is taken, correct?
> > So, before taking a checkpoint, a Trigger would fire and the operator would
> > process and emit some intermediate results.
> > 
> > This approach would not completely solve the consistency issue because a
> > checkpoint might fail.
> > A better approach would be to use a transactional sink that is integrated
> > with the checkpointing mechanism and emits data only on successful
> > checkpoints.
> > Flink provides the TwoPhaseCommitSinkFunction (see blog post [1]) and one
> > implemention for an exactly-once Kafka sink.
> > 
> > Best,
> > Fabian
> > 
> > [1]
> > https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
> >  
> > <https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html>
> > 
> > Am Mo., 15. Okt. 2018 um 04:52 Uhr schrieb Paul Lam <paullin3...@gmail.com 
> > <mailto:paullin3...@gmail.com>>:
> > 
> >> Hi,
> >> 
> >> I’ve come across some scenarios that periodic emitting aggregates is
> >> needed in case of event time windows, and I think it’s good to have a
> >> checkpoint hook on triggers.
> >> 
> >> Suppose we want a day metric, and the most intuitive way is to define a 1d
> >> event time window to calculate it. By default, the event time trigger fires
> >> and emit the final results when the watermark reaches the end of a day, but
> >> we hope to see the realtime(or near realtime) intermediate results also, so
> >> now we have several viable approaches I can think of:
> >> 
> >> 1. Implement a custom trigger (FIRE_AND_PURGE at midnight, and FIRE
> >> periodically). We could register a processing time timer to fire the
> >> trigger in the trigger context, but it has some drawbacks. First, we can
> >> only access the trigger context in a method, and there it’s no some method
> >> like open(TriggerContext) which was called on initialization, so we have to
> >> register a timer in the onElement(..) method when it was called for the
> >> first time and it’s not elegant. Second, emitting result on processing time
> >> provides only read-uncommitted consistency, which is not enough in some
> >> scenarios.
> >> 
> >> 2. Use queryable states and pull state updates from external systems. This
> >> requires changing the architecture to pull-based and the change would be
> >> too much. What’s more, the queryable state API is not stable yet.
> >> 
> >> 3. Change the window to a smaller one (e.g. 1 min window) which emits
> >> incremental aggregates, and reduce the results in external systems. This
> >> falls back to a stateless streaming job, making the architecture complex
> >> and the consistency weak.
> >> 
> >> So I suggest adding a checkpoint hook to the window triggers to enable
> >> emitting aggregates periodically with awareness of checkpointing, which
> >> solves the problems I mentioned in approach 1.
> >> 
> >> Since this is a most common scenario, there should be lots of practices to
> >> get it done which I haven't figured out yet, but I think it still make
> >> sense to add such a method to the triggers for the consistency reason.
> >> 
> >> Any suggestion is appreciated! Thanks a lot!
> >> 
> >> Best,
> >> Paul Lam
> >> 
> >> 
> >> 
> >> 
> 

Reply via email to