Forgot to say that the signature for the onFire() that I think fits should be:
void onFire(Window window, TriggerContext ctx) throws Exception; > On Jul 22, 2016, at 12:47 PM, Kostas Kloudas <k.klou...@data-artisans.com> > wrote: > > Hi, > > I started working on the new triggers proposed here and so far I can see > two shortcomings in the current state of the triggers that do not play well > with the new proposals, and more specifically the composite triggers All > and Any. > > So here it goes: > > 1) In the document posted above, there are some new useful trigger > combinations (like Any and All) which allow > for combining primitive triggers. This decouples the TriggerResult of each > individual trigger from the action that > is actually going to be executed. For example, an All trigger may have one > proposing FIRE while the other > CONTINUE and the final result will be CONTINUE. > > In this case, any action that should be taken by each individual trigger > upon firing, e.g. cleaning its state as in the > case of CountTrigger, should be postponed until the parent trigger (All) > decides to fire. > > For this, there should be a onFire() method in each trigger that does exactly > that. This method should be called in the > fireOrCleanup() of the windowOperator, when the firing is successful. > > 2) In the current implementation, when stateful triggers, like the > CountTrigger, are combined in a composite Trigger > (with Any or All) their state is shared because the stateHandle is the same > for both. To solve this, the handle should > become unique, BUT consistent for the same Trigger. The latter implies that > the handle for the same trigger after > a node failure, should be the same as that of its predecessor (before the > failure). > > Let me know your thoughts on these. > > Kostas > > >> On Jul 21, 2016, at 10:24 AM, Aljoscha Krettek <aljos...@apache.org> wrote: >> >> I'm proposing to get this small change into 1.1: >> https://issues.apache.org/jira/browse/FLINK-4239 This will make our lives >> easier with the future proposed changes. >> >> What do you think? >> On Tue, 19 Jul 2016 at 11:41 Aljoscha Krettek <aljos...@apache.org> wrote: >> >>> Hi, >>> these new features should make it into the 1.2 release. We are already >>> working on releasing 1.1 so it won't make it for that one. unfortunately. >>> >>> Cheers, >>> Aljoscha >>> >>> On Mon, 18 Jul 2016 at 23:19 Chen Qin <qinnc...@gmail.com> wrote: >>> >>>> BTW, do you have rough timeline in term of roll out it to production? >>>> >>>> Thanks, >>>> Chen >>>> >>>> >>>> On Mon, Jul 18, 2016 at 2:46 AM, Aljoscha Krettek <aljos...@apache.org> >>>> wrote: >>>> >>>>> Hi, >>>>> Chen commented this on the doc (I'm mirroring here so everyone can >>>> follow): >>>>> "It would be cool to be able to access last snapshot of window states >>>>> before it get purged. Pipeline author might consider put it to external >>>>> storage and deal with late arriving events by restore corresponding >>>>> window." >>>>> >>>>> My answer: >>>>> This is partially covered by the section called "What Happens at >>>>> Window-Cleanup Time, Who Decides When to Purge". What I want to >>>> introduce >>>>> is that the window can have one final emission if there is new data in >>>> the >>>>> buffers at cleanup time. >>>>> >>>>> The work on this will also depend on this proposal: >>>>> >>>>> >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata >>>>> With >>>>> this, the WindowFunction can get meta data about the window firing so it >>>>> could be informed that this is the last firing before a cleanup and that >>>>> there already was an earlier, on-time firing. >>>>> >>>>> Does this cover your concerns, Chen? >>>>> >>>>> Cheers, >>>>> Aljoscha >>>>> >>>>> On Sun, 10 Jul 2016 at 21:24 Chen Qin <qinnc...@gmail.com> wrote: >>>>> >>>>>> Sure. Currently, it looks like any element assigned to a too late >>>> window >>>>>> will be dropped silently😓 ? >>>>>> >>>>>> Having a late window stream imply somehow Flink needs to add one more >>>>> state >>>>>> to window and split window state cleanup from window retirement. >>>>>> I would suggest as simple as adding a function in trigger called >>>>>> OnLateElement and always fire_purge it would enable developer aware >>>> and >>>>>> handle this case. >>>>>> >>>>>> Chen >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Jul 8, 2016 at 1:00 AM, Aljoscha Krettek <aljos...@apache.org >>>>> >>>>>> wrote: >>>>>> >>>>>>> @Chen I added a section at the end of the document regarding access >>>> to >>>>>> the >>>>>>> elements that are dropped as late. Right now, the section just >>>> mentions >>>>>>> that we have to do this but there is no real proposal yet for how >>>> to do >>>>>> it. >>>>>>> Only a rough sketch so that we don't forget about it. >>>>>>> >>>>>>> On Fri, 8 Jul 2016 at 07:47 Chen Qin <qinnc...@gmail.com> wrote: >>>>>>> >>>>>>>> +1 for allowedLateness scenario. >>>>>>>> >>>>>>>> The rationale behind is there are backfills or data issues hold >>>>>> in-window >>>>>>>> data till watermark pass end time. It cause sink write partial >>>>> output. >>>>>>>> >>>>>>>> Allow high allowedLateness threshold makes life easier to merge >>>> those >>>>>>>> results and overwrite partial output with correct output at sink. >>>> But >>>>>>> yeah, >>>>>>>> pipeline author is at risk of blow up statebackend with huge >>>> states. >>>>>>>> >>>>>>>> Alternatively, In some case, if sink allows read-check-merge >>>>> operation, >>>>>>>> window can explicit call out events ingested after >>>> allowedLateness. >>>>> It >>>>>>> asks >>>>>>>> pipeline author mitigated these events in a way outside of flink >>>>>>> ecosystem. >>>>>>>> The catch is that since everywhere in a flink job can replay and >>>>>>>> checkpoint, notification should somehow includes these info as >>>> well. >>>>>>>> >>>>>>>> Thanks >>>>>>>> Chen >>>>>>>> >>>>>>>> On Thu, Jul 7, 2016 at 12:14 AM, Kostas Kloudas < >>>>>>>> k.klou...@data-artisans.com >>>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> In the effort to move the discussion to the mailing list, rather >>>>> than >>>>>>> the >>>>>>>>> doc, >>>>>>>>> there was a comment in the doc: >>>>>>>>> >>>>>>>>> “It seems this proposal marries the allowed lateness of events >>>> and >>>>>> the >>>>>>>>> discarding of window state. In most use cases this should be >>>>>>> sufficient, >>>>>>>>> but there are instances where having independent control of >>>> these >>>>> may >>>>>>> be >>>>>>>>> useful. >>>>>>>>> >>>>>>>>> For instance, you may have a job that computes some aggregate, >>>>> like a >>>>>>>> sum. >>>>>>>>> You may want to keep the window state around for a while, but >>>> not >>>>> too >>>>>>>> long. >>>>>>>>> Yet you may want to continue processing late events after you >>>>>> discarded >>>>>>>> the >>>>>>>>> window state. It is possible that your stream sinks can make >>>> use of >>>>>>> this >>>>>>>>> data. For instance, they may be writing to a data store that >>>>> returns >>>>>> an >>>>>>>>> error if a row already exists, which allow the sink to read the >>>>>>> existing >>>>>>>>> row and update it with the new data." >>>>>>>>> >>>>>>>>> To which I would like to reply: >>>>>>>>> >>>>>>>>> If I understand your use-case correctly, I believe that the >>>>> proposed >>>>>>>>> binding of the allowed lateness to the state purging does not >>>>> impose >>>>>>> any >>>>>>>>> problem. The lateness specifies the upper time bound, after >>>> which >>>>> the >>>>>>>> state >>>>>>>>> will be discarded. Between the start of a window and its (end + >>>>>>>>> allowedLateness) you can write custom triggers that fire, purge >>>> the >>>>>>>> state, >>>>>>>>> or do nothing. Given this, I suppose that, at the most extreme >>>>> case, >>>>>>> you >>>>>>>>> can specify an allowed lateness of Long.MaxValue and do the >>>> purging >>>>>> of >>>>>>>> the >>>>>>>>> state "manually". By doing this, you remove the safeguard of >>>>> letting >>>>>>> the >>>>>>>>> system purge the state at some point in time, and you can do >>>> your >>>>> own >>>>>>>>> custom state management that fits your needs. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Kostas >>>>>>>>> >>>>>>>>>> On Jul 6, 2016, at 5:43 PM, Aljoscha Krettek < >>>>> aljos...@apache.org> >>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> @Vishnu Funny you should ask that because I have a design doc >>>>> lying >>>>>>>>> around. >>>>>>>>>> I'll open a new mail thread to not hijack this one. >>>>>>>>>> >>>>>>>>>> On Wed, 6 Jul 2016 at 17:17 Vishnu Viswanath < >>>>>>>>> vishnu.viswanat...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> I was going through the suggested improvements in window, >>>> and I >>>>>> have >>>>>>>>>>> few questions/suggestion on improvement regarding the >>>> Evictor. >>>>>>>>>>> >>>>>>>>>>> 1) I am having a use case where I have to create a custom >>>>> Evictor >>>>>>> that >>>>>>>>> will >>>>>>>>>>> evict elements from the window based on the value (e.g., if I >>>>> have >>>>>>>>> elements >>>>>>>>>>> are of case class Item(id: Int, type:String) then evict >>>> elements >>>>>>> that >>>>>>>>> has >>>>>>>>>>> type="a"). I believe this is not currently possible. >>>>>>>>>>> 2) this is somewhat related to 1) where there should be an >>>>> option >>>>>> to >>>>>>>>> evict >>>>>>>>>>> elements from anywhere in the window. not only from the >>>>> beginning >>>>>> of >>>>>>>> the >>>>>>>>>>> window. (e.g., apply the delta function to all elements and >>>>> remove >>>>>>> all >>>>>>>>>>> those don't pass. I checked the code and evict method just >>>>> returns >>>>>>> the >>>>>>>>>>> number of elements to be removed and processTriggerResult >>>> just >>>>>> skips >>>>>>>>> those >>>>>>>>>>> many elements from the beginning. >>>>>>>>>>> 3) Add an option to enables the user to decide if the >>>> eviction >>>>>>> should >>>>>>>>>>> happen before the apply function or after the apply function. >>>>>>>> Currently >>>>>>>>> it >>>>>>>>>>> is before the apply function, but I have a use case where I >>>> need >>>>>> to >>>>>>>>> first >>>>>>>>>>> apply the function and evict afterward. >>>>>>>>>>> >>>>>>>>>>> I am doing these for a POC so I think I can modify the flink >>>>> code >>>>>>> base >>>>>>>>> to >>>>>>>>>>> make these changes and build, but I would appreciate any >>>>>> suggestion >>>>>>> on >>>>>>>>>>> whether these are viable changes or will there any >>>> performance >>>>>> issue >>>>>>>> if >>>>>>>>>>> these are done. Also any pointer on where to start(e.g, do I >>>>>> create >>>>>>> a >>>>>>>>> new >>>>>>>>>>> class similar to EvictingWindowOperator that extends >>>>>>> WindowOperator?) >>>>>>>>>>> >>>>>>>>>>> Thanks and Regards, >>>>>>>>>>> Vishnu Viswanath, >>>>>>>>>>> >>>>>>>>>>> On Wed, Jul 6, 2016 at 9:39 AM, Aljoscha Krettek < >>>>>>> aljos...@apache.org >>>>>>>>> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> I did: >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> https://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3ccanmxww0abttjjg9ewdxrugxkjm7jscbenmvrzohpt2qo3pq...@mail.gmail.com%3e >>>>>>>>>>>> ;-) >>>>>>>>>>>> >>>>>>>>>>>> On Wed, 6 Jul 2016 at 15:31 Ufuk Celebi <u...@apache.org> >>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Jul 6, 2016 at 3:19 PM, Aljoscha Krettek < >>>>>>>> aljos...@apache.org >>>>>>>>>> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> In the future, it might be good to to discussions >>>> directly on >>>>>> the >>>>>>>> ML >>>>>>>>>>>> and >>>>>>>>>>>>>> then change the document accordingly. This way everyone >>>> can >>>>>>> follow >>>>>>>>>>> the >>>>>>>>>>>>>> discussion on the ML. I also feel that Google Doc comments >>>>>> often >>>>>>>>>>> don't >>>>>>>>>>>>> give >>>>>>>>>>>>>> enough space for expressing more complex opinions. >>>>>>>>>>>>> >>>>>>>>>>>>> I agree! Would you mind raising this point as a separate >>>>>>> discussion >>>>>>>> on >>>>>>>>>>>> dev@ >>>>>>>>>>>>> ? >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >