My preference is also for option 3. It looks clean and simple to implement.

~ Bhupesh


_______________________________________________________

Bhupesh Chawda

E: [email protected] | Twitter: @bhupeshsc

www.datatorrent.com  |  apex.apache.org



On Fri, Mar 10, 2017 at 3:06 PM, Tushar Gosavi <[email protected]>
wrote:

> Can you please let me know your preference? My preference is for solution
> 3, by adding a StorageAgent which creates an empty file, and using this
> storage agent for leaf stateless operators.
>
> - Tushar.
>
>
> On Tue, Mar 7, 2017 at 1:52 PM, Tushar Gosavi <[email protected]>
> wrote:
>
> > Thank you all for the feedback.
> >
> > Some of the useful output operator can be stateless, they push data
> > received in a window to output store. for example KafkaOutputOperator/
> JDBCOutputOperator,
> > or the output stores where
> > writes are idempotent, which covers most of the key-value stores.
> >
> > I was going to use the existing logic to compute the committedWindowId
> > with addition of few steps explained below.
> > solution-1
> > - Calculate committedWindow with leaf operator checkpoints set to current
> > timestamp (current behaviour)
> > - Update leaf operators recoveryWindowId to committedWindowId
> > - Calculate committedWindow again, this steps is required because as
> > downstream operator recoveryWindowId is reduced and hence we may have to
> > adjust the recoveryWindowId of upstream operators.
> >
> > This will prevent leaf stateless opeartors to start from current
> > timestamp, hence reducing amount of data loss. But As per the concern
> > raised by Bhupesh about last stateless operator being slow, the solution
> > suggested by Vlad is sufficient
> >
> > solution-1
> > - as explained above. If little loss is expected we could go with this
> > appraoch.
> > solution-2
> > - Fail validation if last operator is stateless in AT_LEAST_ONCE scenario
> > as suggested by Vlad.
> >   This could break backward compatibility as old applications will fail
> to
> > launch.
> > solution-3
> > - Mark last operator stateful in AT_LEAST_ONCE scenario.
> >
> > Let me know about your preference.
> >
> > Regards,
> > - Tushar.
> >
> >
> > On Mon, Mar 6, 2017 at 8:31 PM, Vlad Rozov <[email protected]>
> > wrote:
> >
> >> For a long chain of stateless operators at the end of a DAG, it is
> >> possible that time to propagate the end window to a leaf operator is
> >> greater than the time for a checkpoint to be persisted in HDFS.
> >>
> >> If at least once processing guarantee is necessary, the leaf operators
> >> should not be STATELESS. Will invalidating DAG that has one or more leaf
> >> operator marked as STATELESS with AT_LEAST_ONCE processing solve
> >> APEXCORE-619? It is not the best solution, but I think it is sufficient
> for
> >> the described scenario.
> >>
> >> Thank you,
> >>
> >> Vlad
> >>
> >>
> >> On 3/2/17 08:43, Thomas Weise wrote:
> >>
> >>> Good point, that's correct for a stateless leaf operator (operator that
> >>> does not have downstream operators). The minimum of upstream
> checkpoints
> >>> can be higher than the last windowId seen by the leaf operator.
> Although
> >>> that is a low probability, because it would mean the time it took for
> the
> >>> checkpoint to become visible in HDFS is less than propagation of
> >>> endWindow
> >>> downstream.
> >>>
> >>> It's also not a problem for an intermediate stateless operator, because
> >>> the
> >>> downstream checkpoint will inform the recovery windowId. Most of the
> time
> >>> stateless operators are intermediate.
> >>>
> >>> Leaf operators are the output operators. I suspect in the original
> >>> scenario
> >>> is was a console output operator? Useful output operators usually won't
> >>> be
> >>> stateless, they have to track state to interact with the external
> system
> >>> correctly. I'm bringing this up for adequate cost/benefit analysis.
> >>>
> >>> In absence of stateful downstream operator, you only have the committed
> >>> windowId, which is essentially a checkpointing watermark. On
> application
> >>> restart it has to be recomputed from the checkpoints available, and
> does
> >>> not cover the scenario Tushar reported originally.
> >>>
> >>> Saving committed windowId comes at a cost, it would have to be written
> to
> >>> the journal before operators are notified. Care has been taken to no
> >>> write
> >>> unnecessarily to the journal, as it is blocking I/O and in this case
> the
> >>> frequency depends on the order of arrival of checkpoint notifications
> >>> from
> >>> operators. We also don't want to delay commitedWindow notification, as
> >>> that
> >>> would introduce latency.
> >>>
> >>> Thomas
> >>>
> >>>
> >>> On Thu, Mar 2, 2017 at 2:10 AM, Bhupesh Chawda <
> [email protected]>
> >>> wrote:
> >>>
> >>> What if all operators complete first checkpoints but the stateless
> >>>> operator
> >>>> could not cross the first checkpoint window, and the DAG crashed.
> >>>> If we try to figure out the recovery checkpoint now, we might conclude
> >>>> that
> >>>> checkpoint 1 is the point to start and we may miss some data getting
> >>>> processed by the stateless operator. Probably in this case at-least
> >>>> once is
> >>>> also not guaranteed?
> >>>>
> >>>> ~ Bhupesh
> >>>>
> >>>>
> >>>> _______________________________________________________
> >>>>
> >>>> Bhupesh Chawda
> >>>>
> >>>> E: [email protected] | Twitter: @bhupeshsc
> >>>>
> >>>> www.datatorrent.com  |  apex.apache.org
> >>>>
> >>>>
> >>>>
> >>>> On Thu, Mar 2, 2017 at 8:06 AM, Thomas Weise <[email protected]> wrote:
> >>>>
> >>>> Dummy checkpoints, continuously writing committed window id and the
> like
> >>>>> all introduce overhead that is probably not needed.
> >>>>>
> >>>>> All the information to derive what we need is likely available and
> IMO
> >>>>>
> >>>> the
> >>>>
> >>>>> discussion should be on what is the correct way of using it. I will
> >>>>> have
> >>>>>
> >>>> a
> >>>>
> >>>>> look when I get to it as well.
> >>>>>
> >>>>> Thanks,
> >>>>> Thomas
> >>>>>
> >>>>>
> >>>>> On Wed, Mar 1, 2017 at 6:29 PM, Sandesh Hegde <
> [email protected]
> >>>>> >
> >>>>> wrote:
> >>>>>
> >>>>> Instead of treating the stateless operator in a special way and
> missing
> >>>>>> corner cases, just have a dummy checkpoint, then there is no need to
> >>>>>>
> >>>>> handle
> >>>>>
> >>>>>> corner cases.
> >>>>>>
> >>>>>> There is a name for this solution,
> >>>>>> https://en.wikipedia.org/wiki/Null_Object_pattern
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Wed, Mar 1, 2017 at 2:52 PM Pramod Immaneni <
> >>>>>> [email protected]
> >>>>>> wrote:
> >>>>>>
> >>>>>> There is code in various places that deals with stateless operators
> >>>>>>>
> >>>>>> in
> >>>>
> >>>>> a
> >>>>>
> >>>>>> special way even though a physical checkpoint does not exist on the
> >>>>>>>
> >>>>>> disk.
> >>>>>
> >>>>>> It is probably a matter of applying similar thought process/logic
> >>>>>>>
> >>>>>> correctly
> >>>>>>
> >>>>>>> here.
> >>>>>>>
> >>>>>>> On Wed, Mar 1, 2017 at 2:27 PM, Amol Kekre <[email protected]>
> >>>>>>>
> >>>>>> wrote:
> >>>>>
> >>>>>> hmm! the fact that commitWindowId has moved up (right now in memory
> >>>>>>>>
> >>>>>>> of
> >>>>>
> >>>>>> Stram) should mean that a complete set of checkpoints are
> >>>>>>>>
> >>>>>>> available,
> >>>>
> >>>>> i.e
> >>>>>>
> >>>>>>> commitWindowId can be derived. Lets say that next checkpoint window
> >>>>>>>>
> >>>>>>> also
> >>>>>>
> >>>>>>> gets checkpointed across the app, commitwindowID is in memory but
> >>>>>>>>
> >>>>>>> not
> >>>>
> >>>>> written to stram-state yet, then upon relaunch the latest
> >>>>>>>>
> >>>>>>> commitwindowID
> >>>>>>
> >>>>>>> should get computed correctly.
> >>>>>>>>
> >>>>>>>> This may be just about setting stateless operators to
> >>>>>>>>
> >>>>>>> commitWindowid
> >>>>
> >>>>> on
> >>>>>
> >>>>>> re-launch? aka bug/feature?
> >>>>>>>>
> >>>>>>>> Thks
> >>>>>>>> Amol
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> E:[email protected] | M: 510-449-2606 <(510)%20449-2606> |
> >>>>>>>>
> >>>>>>> Twitter:
> >>>>>
> >>>>>> @*amolhkekre*
> >>>>>>>
> >>>>>>>> www.datatorrent.com  |  apex.apache.org
> >>>>>>>>
> >>>>>>>> *Join us at Apex Big Data World-San Jose
> >>>>>>>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
> >>>>>>>> [image: http://www.apexbigdata.com/san-jose-register.html]
> >>>>>>>> <http://www.apexbigdata.com/san-jose-register.html>
> >>>>>>>>
> >>>>>>>> On Wed, Mar 1, 2017 at 1:41 PM, Pramod Immaneni <
> >>>>>>>>
> >>>>>>> [email protected]>
> >>>>>>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>> Do we need to save committedWindowId? Can't it be computed from
> >>>>>>>>>
> >>>>>>>> existing
> >>>>>>>
> >>>>>>>> checkpoints by walking through the DAG. We probably do this
> >>>>>>>>>
> >>>>>>>> anyway
> >>>>
> >>>>> and
> >>>>>>
> >>>>>>> I
> >>>>>>>
> >>>>>>>> suspect there is a minor bug somewhere in there. If an operator
> >>>>>>>>>
> >>>>>>>> is
> >>>>
> >>>>> stateless you could assume checkpoint as long max for sake of
> >>>>>>>>>
> >>>>>>>> computation
> >>>>>>>
> >>>>>>>> and compute the committed window to be the lowest common
> >>>>>>>>>
> >>>>>>>> checkpoint.
> >>>>>
> >>>>>> If
> >>>>>>
> >>>>>>> they are all stateless and you end up with long max you can start
> >>>>>>>>>
> >>>>>>>> with
> >>>>>>
> >>>>>>> window id that reflects the current timestamp.
> >>>>>>>>>
> >>>>>>>>> Thanks
> >>>>>>>>>
> >>>>>>>>> On Wed, Mar 1, 2017 at 1:09 PM, Amol Kekre <[email protected]
> >>>>>>>>>
> >>>>>>>> wrote:
> >>>>>>>
> >>>>>>>> CommitWindowId could be computed from the existing checkpoints.
> >>>>>>>>>>
> >>>>>>>>> That
> >>>>>>
> >>>>>>> solution still needs purge to be done after commitWindowId is
> >>>>>>>>>>
> >>>>>>>>> confirmed
> >>>>>>>
> >>>>>>>> to
> >>>>>>>>>
> >>>>>>>>>> be saved in Stram state. Without ths the commitWindowId
> >>>>>>>>>>
> >>>>>>>>> computed
> >>>>
> >>>>> from
> >>>>>>
> >>>>>>> the
> >>>>>>>>
> >>>>>>>>> checkpoints may have some checkpoints missing.
> >>>>>>>>>>
> >>>>>>>>>> Thks
> >>>>>>>>>> Amol
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> E:[email protected] | M: 510-449-2606 <(510)%20449-2606> |
> >>>>>>>>>>
> >>>>>>>>> Twitter: @*amolhkekre*
> >>>>>>>
> >>>>>>>> www.datatorrent.com  |  apex.apache.org
> >>>>>>>>>>
> >>>>>>>>>> *Join us at Apex Big Data World-San Jose
> >>>>>>>>>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
> >>>>>>>>>> [image: http://www.apexbigdata.com/san-jose-register.html]
> >>>>>>>>>> <http://www.apexbigdata.com/san-jose-register.html>
> >>>>>>>>>>
> >>>>>>>>>> On Wed, Mar 1, 2017 at 12:36 PM, Pramod Immaneni <
> >>>>>>>>>>
> >>>>>>>>> [email protected]
> >>>>>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Can't the commitedWindowId be calculated by looking at the
> >>>>>>>>>>>
> >>>>>>>>>> physical
> >>>>>>
> >>>>>>> plan
> >>>>>>>>>
> >>>>>>>>>> and the existing checkpoints?
> >>>>>>>>>>>
> >>>>>>>>>>> On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi <
> >>>>>>>>>>>
> >>>>>>>>>> [email protected]
> >>>>>
> >>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Help Needed for APEXCORE-619
> >>>>>>>>>>>>
> >>>>>>>>>>>> Issue : When application is relaunched after long time with
> >>>>>>>>>>>>
> >>>>>>>>>>> stateless
> >>>>>>>>
> >>>>>>>>> opeartors at the end of the DAG, the stateless operators
> >>>>>>>>>>>>
> >>>>>>>>>>> starts
> >>>>>
> >>>>>> with
> >>>>>>>>
> >>>>>>>>> a
> >>>>>>>>>
> >>>>>>>>>> very
> >>>>>>>>>>>
> >>>>>>>>>>>> high windowId. In this case the stateless operator ignors
> >>>>>>>>>>>>
> >>>>>>>>>>> all
> >>>>
> >>>>> the
> >>>>>>
> >>>>>>> data
> >>>>>>>>>
> >>>>>>>>>> received till upstream operator catches up with it. This
> >>>>>>>>>>>>
> >>>>>>>>>>> breaks
> >>>>>
> >>>>>> the
> >>>>>>>
> >>>>>>>> *at-least-once* gaurantee while relaunch of the opeartor or
> >>>>>>>>>>>>
> >>>>>>>>>>> when
> >>>>>>
> >>>>>>> master
> >>>>>>>>>
> >>>>>>>>>> is
> >>>>>>>>>>>
> >>>>>>>>>>>> killed and application is restarted.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Solutions:
> >>>>>>>>>>>> - Fix windowId for stateless leaf operators from upstream
> >>>>>>>>>>>>
> >>>>>>>>>>> opeartor.
> >>>>>>>
> >>>>>>>> But
> >>>>>>>>>
> >>>>>>>>>> it
> >>>>>>>>>>>
> >>>>>>>>>>>> has some issues when we have a join with two upstrams
> >>>>>>>>>>>>
> >>>>>>>>>>> operators
> >>>>>
> >>>>>> at
> >>>>>>>
> >>>>>>>> different windowId. If we set the windowID to min(upstream
> >>>>>>>>>>>>
> >>>>>>>>>>> windowId),
> >>>>>>>>
> >>>>>>>>> then
> >>>>>>>>>>>
> >>>>>>>>>>>> we need to again recalulate the new recovery window ids for
> >>>>>>>>>>>>
> >>>>>>>>>>> upstream
> >>>>>>>>
> >>>>>>>>> paths
> >>>>>>>>>>>
> >>>>>>>>>>>> from this operators.
> >>>>>>>>>>>>
> >>>>>>>>>>>> - Other solution is to create a empty file in checkpoint
> >>>>>>>>>>>>
> >>>>>>>>>>> directory
> >>>>>>>
> >>>>>>>> for
> >>>>>>>>>
> >>>>>>>>>> stateless operators. This will help us to identify the
> >>>>>>>>>>>>
> >>>>>>>>>>> checkpoints
> >>>>>>>
> >>>>>>>> of
> >>>>>>>>
> >>>>>>>>> stateless operators during relaunch instead of computing
> >>>>>>>>>>>>
> >>>>>>>>>>> from
> >>>>
> >>>>> latest
> >>>>>>>>
> >>>>>>>>> timestamp.
> >>>>>>>>>>>>
> >>>>>>>>>>>> - Bring the entire DAG to committedWindowId. This could be
> >>>>>>>>>>>>
> >>>>>>>>>>> achived
> >>>>>>>
> >>>>>>>> using
> >>>>>>>>>>
> >>>>>>>>>>> writing committedWindowId in a journal. we need to make
> >>>>>>>>>>>>
> >>>>>>>>>>> sure
> >>>>
> >>>>> that
> >>>>>>
> >>>>>>> we
> >>>>>>>>
> >>>>>>>>> are
> >>>>>>>>>>
> >>>>>>>>>>> not puring the checkpointed state until the
> >>>>>>>>>>>>
> >>>>>>>>>>> committedWundowId
> >>>>
> >>>>> is
> >>>>>>
> >>>>>>> saved
> >>>>>>>>>
> >>>>>>>>>> in
> >>>>>>>>>>
> >>>>>>>>>>> journal.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Let me know your thoughs on this and preferred solution.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Regards,
> >>>>>>>>>>>> -Tushar.
> >>>>>>>>>>>>
> >>>>>>>>>>>> --
> >>>>>> *Join us at Apex Big Data World-San Jose
> >>>>>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
> >>>>>> [image: http://www.apexbigdata.com/san-jose-register.html]
> >>>>>>
> >>>>>>
> >>
> >
>

Reply via email to