Using the buffer server of a regular input port will not work since the
upstream may also go down and the tuples of the previous window(s) won't be
able to be replayed.  Good thing is, we only need to store the tuples of
the ITERATION_WINDOW_OFFSET windows prior to a checkpoint, not all the
windows, and in most cases the ITERATION_WINDOW_OFFSET is 1.

David

On Thu, Sep 17, 2015 at 8:25 AM, Thomas Weise <tho...@datatorrent.com>
wrote:

> An additional idea for the data backup. For any other port, we have
> upstream backup through buffer server (logically the abstraction is a WAL
> to enable replay). In case of the iteration port the is no upstream buffer
> server and hence the idea is to implement the WAL backed by HDFS.
>
> But there are one or many upstream buffer servers as any operator with an
> iteration port will also have at least one regular port. So could we use
> that buffer server to keep the data for the iteration port as well?
>
> BTW with regard to loop in the DAG, this depends on how you look at it.
> Considering the data (windows), it is not a loop. When you look at the
> stream connections, then it looks like a loop. The iteration port is like a
> pump, water is moved back upstream but it only flows downstream.
>
> On Wed, Sep 16, 2015 at 5:07 PM, Chandni Singh <chan...@datatorrent.com>
> wrote:
>
> > My comments:
> >
> > 1)
> > As I understand IdempotentStorageManager satisfies this use-case. It has
> > been used with operators which are dynamically partition-able and has
> been
> > integrated with various operators in Malhar to make them idempotent.
> >
> > So IMO we should not start building another version of recovery mechanism
> > in Apex.
> >
> > Once we have hammered out the details of the WAL abstraction (
> > https://malhar.atlassian.net/browse/APEX-99), we can deprecate
> > IdempotentStorageManager and use that.
> >
> > 2)
> > If we use IdempotentStorageManager here, it will give us  a better
> > understanding of its limitations and therefore help us with the WAL
> > abstraction.
> >
> > Thanks,
> > Chandni
> >
> >
> > On Wed, Sep 16, 2015 at 4:42 PM, David Yan <da...@datatorrent.com>
> wrote:
> >
> > > Thanks Chetan.
> > >
> > > Can you point me to the location of Deduper code that may be helpful
> with
> > > the recovery implementation?
> > >
> > > Does anyone have any opinion on the renaming of ITERATION_WINDOW_COUNT?
> > > DELAY_BY_WINDOW_COUNT? DELAY_WINDOW_COUNT?
> > >
> > > David
> > >
> > > On Wed, Sep 16, 2015 at 2:21 PM, Chetan Narsude <
> che...@datatorrent.com>
> > > wrote:
> > >
> > > > David,
> > > >
> > > >  I have 3 comments:
> > > >
> > > > 1. The "ahead window" phrase you discussed above is really behind
> > window.
> > > > With Apex, the windows which are ahead are the windows with smaller
> > > window
> > > > Id. smaller window ids are followed by bigger window ids.
> > > >
> > > > 2.  ITERATION_WINDOW_COUNT sounds like a misnomer. IMO, It  should be
> > > > something akin to DELAY_BY_WINDOW_COUNT as you are delaying the
> events
> > by
> > > > those many windows. You are not iterating over them as many times. It
> > > also
> > > > resonates with PortContext.SLIDE_BY_WINDOW_COUNT
> > > >
> > > > 3. Deduper has similar requirement where large amount of data
> > > (potentially
> > > > even larger) needs to be partitioned. You can borrow the idea/code
> from
> > > > there. And perhaps abstract the code to be reusable.
> > > >
> > > > HTH.
> > > >
> > > > --
> > > > Chetan
> > > >
> > > > On Wed, Sep 16, 2015 at 1:44 PM, David Yan <da...@datatorrent.com>
> > > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > One current disadvantage of Apex is the inability to do iterations
> > and
> > > > > machine learning algorithms because we don't allow loops in the
> > > > application
> > > > > DAG (hence the name DAG).  I am proposing that we allow loops in
> the
> > > DAG
> > > > if
> > > > > the loop advances the window ID by a configured amount.  A JIRA
> > ticket
> > > > has
> > > > > been created:
> > > > >
> > > > > https://malhar.atlassian.net/browse/APEX-60
> > > > >
> > > > > I have started this work in my fork at
> > > > > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
> > > > >
> > > > > The current progress is that a simple test case works.  Major work
> > > still
> > > > > needs to be done with respect to recovery and partitioning.
> > > > >
> > > > > The value ITERATION_WINDOW_COUNT is an attribute to an input port
> of
> > an
> > > > > operator.  If the value of the attribute is greater than or equal
> to
> > 1,
> > > > any
> > > > > tuples sent to the input port are treated to be
> > ITERATION_WINDOW_COUNT
> > > > > windows ahead of what they are.
> > > > >
> > > > > For recovery, we will need to checkpoint all the tuples between
> ports
> > > > with
> > > > > the to replay the looped tuples.  During the recovery, if the
> > operator
> > > > has
> > > > > an input port, with ITERATION_WINDOW_COUNT=2, is recovering from
> > > > checkpoint
> > > > > window 14, the tuples for that input port from window 13 and window
> > 14
> > > > need
> > > > > to be replayed to be treated as window 15 and window 16
> respectively
> > > > (13+2
> > > > > and 14+2).
> > > > >
> > > > > In other words, we need to store all the tuples from window with ID
> > > > > committedWindowId minus ITERATION_WINDOW_COUNT for recovery and
> purge
> > > the
> > > > > tuples earlier than that window.
> > > > > We can optimize this by only storing the tuples for
> > > > ITERATION_WINDOW_COUNT
> > > > > windows prior to any checkpoint.
> > > > >
> > > > > For that, we need a storage mechanism for the tuples.  Chandni
> > already
> > > > has
> > > > > something that fits this usage case in Apex Malhar.  The class is
> > > > > IdempotentStorageManager.  In order for this to be used in Apex
> core,
> > > we
> > > > > need to deprecate the class in Apex Malhar and move it to Apex
> Core.
> > > > >
> > > > > A JIRA ticket has been created for this particular work:
> > > > >
> > > > > https://malhar.atlassian.net/browse/APEX-128
> > > > >
> > > > > Some of the above has been discussed among Thomas, Chetan, Chandni,
> > and
> > > > > myself.
> > > > >
> > > > > For partitioning, we have not started any discussion or
> > brainstorming.
> > > > We
> > > > > appreciate any feedback on this and any other aspect related to
> > > > supporting
> > > > > iterations in general.
> > > > >
> > > > > Thanks!
> > > > >
> > > > > David
> > > > >
> > > >
> > >
> >
>

Reply via email to