My preference is also for option 3. It looks clean and simple to implement.
~ Bhupesh _______________________________________________________ Bhupesh Chawda E: [email protected] | Twitter: @bhupeshsc www.datatorrent.com | apex.apache.org On Fri, Mar 10, 2017 at 3:06 PM, Tushar Gosavi <[email protected]> wrote: > Can you please let me know your preference? My preference is for solution > 3, by adding a StorageAgent which creates an empty file, and using this > storage agent for leaf stateless operators. > > - Tushar. > > > On Tue, Mar 7, 2017 at 1:52 PM, Tushar Gosavi <[email protected]> > wrote: > > > Thank you all for the feedback. > > > > Some of the useful output operator can be stateless, they push data > > received in a window to output store. for example KafkaOutputOperator/ > JDBCOutputOperator, > > or the output stores where > > writes are idempotent, which covers most of the key-value stores. > > > > I was going to use the existing logic to compute the committedWindowId > > with addition of few steps explained below. > > solution-1 > > - Calculate committedWindow with leaf operator checkpoints set to current > > timestamp (current behaviour) > > - Update leaf operators recoveryWindowId to committedWindowId > > - Calculate committedWindow again, this steps is required because as > > downstream operator recoveryWindowId is reduced and hence we may have to > > adjust the recoveryWindowId of upstream operators. > > > > This will prevent leaf stateless opeartors to start from current > > timestamp, hence reducing amount of data loss. But As per the concern > > raised by Bhupesh about last stateless operator being slow, the solution > > suggested by Vlad is sufficient > > > > solution-1 > > - as explained above. If little loss is expected we could go with this > > appraoch. > > solution-2 > > - Fail validation if last operator is stateless in AT_LEAST_ONCE scenario > > as suggested by Vlad. > > This could break backward compatibility as old applications will fail > to > > launch. > > solution-3 > > - Mark last operator stateful in AT_LEAST_ONCE scenario. > > > > Let me know about your preference. > > > > Regards, > > - Tushar. > > > > > > On Mon, Mar 6, 2017 at 8:31 PM, Vlad Rozov <[email protected]> > > wrote: > > > >> For a long chain of stateless operators at the end of a DAG, it is > >> possible that time to propagate the end window to a leaf operator is > >> greater than the time for a checkpoint to be persisted in HDFS. > >> > >> If at least once processing guarantee is necessary, the leaf operators > >> should not be STATELESS. Will invalidating DAG that has one or more leaf > >> operator marked as STATELESS with AT_LEAST_ONCE processing solve > >> APEXCORE-619? It is not the best solution, but I think it is sufficient > for > >> the described scenario. > >> > >> Thank you, > >> > >> Vlad > >> > >> > >> On 3/2/17 08:43, Thomas Weise wrote: > >> > >>> Good point, that's correct for a stateless leaf operator (operator that > >>> does not have downstream operators). The minimum of upstream > checkpoints > >>> can be higher than the last windowId seen by the leaf operator. > Although > >>> that is a low probability, because it would mean the time it took for > the > >>> checkpoint to become visible in HDFS is less than propagation of > >>> endWindow > >>> downstream. > >>> > >>> It's also not a problem for an intermediate stateless operator, because > >>> the > >>> downstream checkpoint will inform the recovery windowId. Most of the > time > >>> stateless operators are intermediate. > >>> > >>> Leaf operators are the output operators. I suspect in the original > >>> scenario > >>> is was a console output operator? Useful output operators usually won't > >>> be > >>> stateless, they have to track state to interact with the external > system > >>> correctly. I'm bringing this up for adequate cost/benefit analysis. > >>> > >>> In absence of stateful downstream operator, you only have the committed > >>> windowId, which is essentially a checkpointing watermark. On > application > >>> restart it has to be recomputed from the checkpoints available, and > does > >>> not cover the scenario Tushar reported originally. > >>> > >>> Saving committed windowId comes at a cost, it would have to be written > to > >>> the journal before operators are notified. Care has been taken to no > >>> write > >>> unnecessarily to the journal, as it is blocking I/O and in this case > the > >>> frequency depends on the order of arrival of checkpoint notifications > >>> from > >>> operators. We also don't want to delay commitedWindow notification, as > >>> that > >>> would introduce latency. > >>> > >>> Thomas > >>> > >>> > >>> On Thu, Mar 2, 2017 at 2:10 AM, Bhupesh Chawda < > [email protected]> > >>> wrote: > >>> > >>> What if all operators complete first checkpoints but the stateless > >>>> operator > >>>> could not cross the first checkpoint window, and the DAG crashed. > >>>> If we try to figure out the recovery checkpoint now, we might conclude > >>>> that > >>>> checkpoint 1 is the point to start and we may miss some data getting > >>>> processed by the stateless operator. Probably in this case at-least > >>>> once is > >>>> also not guaranteed? > >>>> > >>>> ~ Bhupesh > >>>> > >>>> > >>>> _______________________________________________________ > >>>> > >>>> Bhupesh Chawda > >>>> > >>>> E: [email protected] | Twitter: @bhupeshsc > >>>> > >>>> www.datatorrent.com | apex.apache.org > >>>> > >>>> > >>>> > >>>> On Thu, Mar 2, 2017 at 8:06 AM, Thomas Weise <[email protected]> wrote: > >>>> > >>>> Dummy checkpoints, continuously writing committed window id and the > like > >>>>> all introduce overhead that is probably not needed. > >>>>> > >>>>> All the information to derive what we need is likely available and > IMO > >>>>> > >>>> the > >>>> > >>>>> discussion should be on what is the correct way of using it. I will > >>>>> have > >>>>> > >>>> a > >>>> > >>>>> look when I get to it as well. > >>>>> > >>>>> Thanks, > >>>>> Thomas > >>>>> > >>>>> > >>>>> On Wed, Mar 1, 2017 at 6:29 PM, Sandesh Hegde < > [email protected] > >>>>> > > >>>>> wrote: > >>>>> > >>>>> Instead of treating the stateless operator in a special way and > missing > >>>>>> corner cases, just have a dummy checkpoint, then there is no need to > >>>>>> > >>>>> handle > >>>>> > >>>>>> corner cases. > >>>>>> > >>>>>> There is a name for this solution, > >>>>>> https://en.wikipedia.org/wiki/Null_Object_pattern > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Wed, Mar 1, 2017 at 2:52 PM Pramod Immaneni < > >>>>>> [email protected] > >>>>>> wrote: > >>>>>> > >>>>>> There is code in various places that deals with stateless operators > >>>>>>> > >>>>>> in > >>>> > >>>>> a > >>>>> > >>>>>> special way even though a physical checkpoint does not exist on the > >>>>>>> > >>>>>> disk. > >>>>> > >>>>>> It is probably a matter of applying similar thought process/logic > >>>>>>> > >>>>>> correctly > >>>>>> > >>>>>>> here. > >>>>>>> > >>>>>>> On Wed, Mar 1, 2017 at 2:27 PM, Amol Kekre <[email protected]> > >>>>>>> > >>>>>> wrote: > >>>>> > >>>>>> hmm! the fact that commitWindowId has moved up (right now in memory > >>>>>>>> > >>>>>>> of > >>>>> > >>>>>> Stram) should mean that a complete set of checkpoints are > >>>>>>>> > >>>>>>> available, > >>>> > >>>>> i.e > >>>>>> > >>>>>>> commitWindowId can be derived. Lets say that next checkpoint window > >>>>>>>> > >>>>>>> also > >>>>>> > >>>>>>> gets checkpointed across the app, commitwindowID is in memory but > >>>>>>>> > >>>>>>> not > >>>> > >>>>> written to stram-state yet, then upon relaunch the latest > >>>>>>>> > >>>>>>> commitwindowID > >>>>>> > >>>>>>> should get computed correctly. > >>>>>>>> > >>>>>>>> This may be just about setting stateless operators to > >>>>>>>> > >>>>>>> commitWindowid > >>>> > >>>>> on > >>>>> > >>>>>> re-launch? aka bug/feature? > >>>>>>>> > >>>>>>>> Thks > >>>>>>>> Amol > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> E:[email protected] | M: 510-449-2606 <(510)%20449-2606> | > >>>>>>>> > >>>>>>> Twitter: > >>>>> > >>>>>> @*amolhkekre* > >>>>>>> > >>>>>>>> www.datatorrent.com | apex.apache.org > >>>>>>>> > >>>>>>>> *Join us at Apex Big Data World-San Jose > >>>>>>>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!* > >>>>>>>> [image: http://www.apexbigdata.com/san-jose-register.html] > >>>>>>>> <http://www.apexbigdata.com/san-jose-register.html> > >>>>>>>> > >>>>>>>> On Wed, Mar 1, 2017 at 1:41 PM, Pramod Immaneni < > >>>>>>>> > >>>>>>> [email protected]> > >>>>>> > >>>>>>> wrote: > >>>>>>>> > >>>>>>>> Do we need to save committedWindowId? Can't it be computed from > >>>>>>>>> > >>>>>>>> existing > >>>>>>> > >>>>>>>> checkpoints by walking through the DAG. We probably do this > >>>>>>>>> > >>>>>>>> anyway > >>>> > >>>>> and > >>>>>> > >>>>>>> I > >>>>>>> > >>>>>>>> suspect there is a minor bug somewhere in there. If an operator > >>>>>>>>> > >>>>>>>> is > >>>> > >>>>> stateless you could assume checkpoint as long max for sake of > >>>>>>>>> > >>>>>>>> computation > >>>>>>> > >>>>>>>> and compute the committed window to be the lowest common > >>>>>>>>> > >>>>>>>> checkpoint. > >>>>> > >>>>>> If > >>>>>> > >>>>>>> they are all stateless and you end up with long max you can start > >>>>>>>>> > >>>>>>>> with > >>>>>> > >>>>>>> window id that reflects the current timestamp. > >>>>>>>>> > >>>>>>>>> Thanks > >>>>>>>>> > >>>>>>>>> On Wed, Mar 1, 2017 at 1:09 PM, Amol Kekre <[email protected] > >>>>>>>>> > >>>>>>>> wrote: > >>>>>>> > >>>>>>>> CommitWindowId could be computed from the existing checkpoints. > >>>>>>>>>> > >>>>>>>>> That > >>>>>> > >>>>>>> solution still needs purge to be done after commitWindowId is > >>>>>>>>>> > >>>>>>>>> confirmed > >>>>>>> > >>>>>>>> to > >>>>>>>>> > >>>>>>>>>> be saved in Stram state. Without ths the commitWindowId > >>>>>>>>>> > >>>>>>>>> computed > >>>> > >>>>> from > >>>>>> > >>>>>>> the > >>>>>>>> > >>>>>>>>> checkpoints may have some checkpoints missing. > >>>>>>>>>> > >>>>>>>>>> Thks > >>>>>>>>>> Amol > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> E:[email protected] | M: 510-449-2606 <(510)%20449-2606> | > >>>>>>>>>> > >>>>>>>>> Twitter: @*amolhkekre* > >>>>>>> > >>>>>>>> www.datatorrent.com | apex.apache.org > >>>>>>>>>> > >>>>>>>>>> *Join us at Apex Big Data World-San Jose > >>>>>>>>>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!* > >>>>>>>>>> [image: http://www.apexbigdata.com/san-jose-register.html] > >>>>>>>>>> <http://www.apexbigdata.com/san-jose-register.html> > >>>>>>>>>> > >>>>>>>>>> On Wed, Mar 1, 2017 at 12:36 PM, Pramod Immaneni < > >>>>>>>>>> > >>>>>>>>> [email protected] > >>>>>>>> > >>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>> Can't the commitedWindowId be calculated by looking at the > >>>>>>>>>>> > >>>>>>>>>> physical > >>>>>> > >>>>>>> plan > >>>>>>>>> > >>>>>>>>>> and the existing checkpoints? > >>>>>>>>>>> > >>>>>>>>>>> On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi < > >>>>>>>>>>> > >>>>>>>>>> [email protected] > >>>>> > >>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Help Needed for APEXCORE-619 > >>>>>>>>>>>> > >>>>>>>>>>>> Issue : When application is relaunched after long time with > >>>>>>>>>>>> > >>>>>>>>>>> stateless > >>>>>>>> > >>>>>>>>> opeartors at the end of the DAG, the stateless operators > >>>>>>>>>>>> > >>>>>>>>>>> starts > >>>>> > >>>>>> with > >>>>>>>> > >>>>>>>>> a > >>>>>>>>> > >>>>>>>>>> very > >>>>>>>>>>> > >>>>>>>>>>>> high windowId. In this case the stateless operator ignors > >>>>>>>>>>>> > >>>>>>>>>>> all > >>>> > >>>>> the > >>>>>> > >>>>>>> data > >>>>>>>>> > >>>>>>>>>> received till upstream operator catches up with it. This > >>>>>>>>>>>> > >>>>>>>>>>> breaks > >>>>> > >>>>>> the > >>>>>>> > >>>>>>>> *at-least-once* gaurantee while relaunch of the opeartor or > >>>>>>>>>>>> > >>>>>>>>>>> when > >>>>>> > >>>>>>> master > >>>>>>>>> > >>>>>>>>>> is > >>>>>>>>>>> > >>>>>>>>>>>> killed and application is restarted. > >>>>>>>>>>>> > >>>>>>>>>>>> Solutions: > >>>>>>>>>>>> - Fix windowId for stateless leaf operators from upstream > >>>>>>>>>>>> > >>>>>>>>>>> opeartor. > >>>>>>> > >>>>>>>> But > >>>>>>>>> > >>>>>>>>>> it > >>>>>>>>>>> > >>>>>>>>>>>> has some issues when we have a join with two upstrams > >>>>>>>>>>>> > >>>>>>>>>>> operators > >>>>> > >>>>>> at > >>>>>>> > >>>>>>>> different windowId. If we set the windowID to min(upstream > >>>>>>>>>>>> > >>>>>>>>>>> windowId), > >>>>>>>> > >>>>>>>>> then > >>>>>>>>>>> > >>>>>>>>>>>> we need to again recalulate the new recovery window ids for > >>>>>>>>>>>> > >>>>>>>>>>> upstream > >>>>>>>> > >>>>>>>>> paths > >>>>>>>>>>> > >>>>>>>>>>>> from this operators. > >>>>>>>>>>>> > >>>>>>>>>>>> - Other solution is to create a empty file in checkpoint > >>>>>>>>>>>> > >>>>>>>>>>> directory > >>>>>>> > >>>>>>>> for > >>>>>>>>> > >>>>>>>>>> stateless operators. This will help us to identify the > >>>>>>>>>>>> > >>>>>>>>>>> checkpoints > >>>>>>> > >>>>>>>> of > >>>>>>>> > >>>>>>>>> stateless operators during relaunch instead of computing > >>>>>>>>>>>> > >>>>>>>>>>> from > >>>> > >>>>> latest > >>>>>>>> > >>>>>>>>> timestamp. > >>>>>>>>>>>> > >>>>>>>>>>>> - Bring the entire DAG to committedWindowId. This could be > >>>>>>>>>>>> > >>>>>>>>>>> achived > >>>>>>> > >>>>>>>> using > >>>>>>>>>> > >>>>>>>>>>> writing committedWindowId in a journal. we need to make > >>>>>>>>>>>> > >>>>>>>>>>> sure > >>>> > >>>>> that > >>>>>> > >>>>>>> we > >>>>>>>> > >>>>>>>>> are > >>>>>>>>>> > >>>>>>>>>>> not puring the checkpointed state until the > >>>>>>>>>>>> > >>>>>>>>>>> committedWundowId > >>>> > >>>>> is > >>>>>> > >>>>>>> saved > >>>>>>>>> > >>>>>>>>>> in > >>>>>>>>>> > >>>>>>>>>>> journal. > >>>>>>>>>>>> > >>>>>>>>>>>> Let me know your thoughs on this and preferred solution. > >>>>>>>>>>>> > >>>>>>>>>>>> Regards, > >>>>>>>>>>>> -Tushar. > >>>>>>>>>>>> > >>>>>>>>>>>> -- > >>>>>> *Join us at Apex Big Data World-San Jose > >>>>>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!* > >>>>>> [image: http://www.apexbigdata.com/san-jose-register.html] > >>>>>> > >>>>>> > >> > > >
