That is a good point Tim. However the delay operations typically are just that, only delay and not overloaded with other things so why not option 1.
On Tue, Nov 3, 2015 at 3:34 PM, Timothy Farkas <t...@datatorrent.com> wrote: > +1 for option 2 > > Also would it be possible to chain delay operators? A lot of stochastic and > adaptive methods depend on finding the correlations between the current > time step n and previous k time steps (n, n - 1), (n, n - 2), (n, n - 3) > ... (n, n - k) > > Here is a picture of a model that uses delay operators (in the picture > these are represented by z^-1) and is used for time series prediction. > > > https://upload.wikimedia.org/wikipedia/commons/thumb/d/d2/IIRFilter2.svg/250px-IIRFilter2.svg.png > > On Tue, Nov 3, 2015 at 3:14 PM, Sasha Parfenov <sa...@datatorrent.com> > wrote: > > > +1 for option 2. Although option 2 doesn't mirror the current unifiers > > like option 1, and may look more complicated when viewing logical plan, I > > think the benefits of flexibility of specifying locality and ability to > > bring multiple downstream operators into a single delay operators may be > > important for some projects. For me the added flexibility wins, > > particularly in light of efforts towards a simpler high level API. > > > > > > > > On Mon, Nov 2, 2015 at 11:25 AM, David Yan <da...@datatorrent.com> > wrote: > > > > > Please share your thoughts using the dev mailing list on this topic if > > you > > > can. Thanks. > > > > > > ---------- Forwarded message ---------- > > > From: David Yan <da...@datatorrent.com> > > > Date: Thu, Oct 29, 2015 at 11:11 AM > > > Subject: Re: Supporting iterations in Apex > > > To: dev@apex.incubator.apache.org > > > > > > > > > This delay operator will act as an input operator for the first window > > and > > > act as a regular operator after that. > > > The engine will increment the window id of the windows from all the > > output > > > ports of the delay operator. > > > > > > We will need a new interface for the delay operator, extending the > > > existing Operator interface. The interface will probably include: > > > > > > - Emitting the tuples for the first window > > > - Emitting the tuples after recovery > > > > > > We will provide a default implementation of the delay operator with a > > > write-ahead log that stores the tuples for the window before each > > > checkpoint for recovery. We will also probably support the number of > > > windows to delay using an operator property. > > > > > > Let's look at this DAG with an iteration loop: > > > > > > upstream --> A --> B --> downstream > > > ^ | > > > |-----| > > > > > > With the delay operator, the physical view of the DAG looks like this > > with > > > D being the delay operator: > > > > > > upstream --> A --> B --> downstream > > > ^ | > > > |-D<--| > > > > > > There are two approaches for specifying the delay operator. > > > > > > 1) As discussed earlier on this thread, the delay operator can be > > > specified as an *input port attribute* of A. The delay operator D will > > > not appear in the logical DAG. The engine will do the +1 on the window > > ID > > > based on the presence of the input port attribute. In this case, the > > delay > > > operator does not need to specify any input port, just like the > unifier, > > > with the process(tuple) method implicitly taking in the tuples from the > > > output port of B, which logically connects to the input port of A. > > > > > > 2) The delay operator is specified and connected *as any other > operator* > > > in the logical DAG. The engine will do the +1 on the window ID if the > > > operator implements the delay operator interface. In this case, the > > delay > > > operator D will need to specify at least one input port (just like a > > > regular operator), and can actually have multiple input ports. > > > > > > I'm leaning toward the 2nd approach. > > > > > > Please share your thoughts. Which one you think is better? Or maybe > > > suggest a different approach altogether? > > > > > > Thanks! > > > > > > David > > > > > > David > > > > > > On Wed, Oct 7, 2015 at 10:51 AM, Thomas Weise <tho...@datatorrent.com> > > > wrote: > > > > > >> Why not set the the delay operator as attribute? We already support > > >> partitioners and stream codecs as attribute. > > >> > > >> > > >> On Wed, Oct 7, 2015 at 10:09 AM, Pramod Immaneni < > > pra...@datatorrent.com> > > >> wrote: > > >> > > >> > How about making just the window delay an attribute on the input > port. > > >> The > > >> > operator connection is just like a normal DAG stream creation. We > > could > > >> > also support connecting same operator to multiple input ports with > > >> > different delay and handle fault recovery accordingly. > > >> > > > >> > On Wed, Oct 7, 2015 at 9:53 AM, David Yan <da...@datatorrent.com> > > >> wrote: > > >> > > > >> > > The iteration operator actually resembles the usage of unifiers. > We > > >> have > > >> > > getUnifier in the interface of OutputPort. > > >> > > > > >> > > But if we add getDelayOperator in the interface of InputPort, that > > >> would > > >> > > introduce backward incompatibility especially since we can't use > the > > >> > > default implementation feature of interfaces that is in Java 8. > > >> > > > > >> > > Putting the class object as an attribute of the InputPort is not > > good > > >> > > either because you can't configure the delay operator itself. > > >> > > > > >> > > Thoughts? > > >> > > > > >> > > David > > >> > > > > >> > > On Fri, Sep 25, 2015 at 10:10 AM, David Yan < > da...@datatorrent.com> > > >> > wrote: > > >> > > > > >> > > > This is a very good idea. This way, we can have a default > > >> > implementation > > >> > > > of that operator and the user can control how the tuples are > > stored > > >> by > > >> > > > having his/her own implementation. How many windows the > operator > > >> > delays > > >> > > is > > >> > > > part of the implementation of that operator. > > >> > > > > > >> > > > I am thinking of getting rid of the ITERATION_WINDOW_OFFSET > > >> attribute > > >> > and > > >> > > > introduce a DELAY_OPERATOR_CLASS attribute so that the user can > > >> specify > > >> > > the > > >> > > > delay operator class to be used. > > >> > > > > > >> > > > More thoughts? > > >> > > > > > >> > > > David > > >> > > > > > >> > > > On Thu, Sep 17, 2015 at 7:16 PM, Gaurav Gupta < > > >> gau...@datatorrent.com> > > >> > > > wrote: > > >> > > > > > >> > > >> Hey David, > > >> > > >> > > >> > > >> I was thinking can we add another operator in front of the > input > > >> port > > >> > > that > > >> > > >> has ITERATION_WINDOW_COUNT set. The new additional operator > will > > >> have > > >> > > >> property whose value will be set equal to > ITERATION_WINDOW_COUNT > > >> and > > >> > it > > >> > > >> will be responsible for caching the data for those many windows > > and > > >> > > >> delaying the data. This operator can act as unifier cum > iterator > > >> > > operator. > > >> > > >> For this you may not need any external storage agent as > platform > > >> > > >> checkpointing should help you here. > > >> > > >> > > >> > > >> We are doing something similar for Sliding window. > > >> > > >> > > >> > > >> Thanks > > >> > > >> -Gaurav > > >> > > >> > > >> > > >> On Wed, Sep 16, 2015 at 1:44 PM, David Yan < > > da...@datatorrent.com> > > >> > > wrote: > > >> > > >> > > >> > > >> > Hi all, > > >> > > >> > > > >> > > >> > One current disadvantage of Apex is the inability to do > > >> iterations > > >> > and > > >> > > >> > machine learning algorithms because we don't allow loops in > the > > >> > > >> application > > >> > > >> > DAG (hence the name DAG). I am proposing that we allow loops > > in > > >> the > > >> > > >> DAG if > > >> > > >> > the loop advances the window ID by a configured amount. A > JIRA > > >> > ticket > > >> > > >> has > > >> > > >> > been created: > > >> > > >> > > > >> > > >> > https://malhar.atlassian.net/browse/APEX-60 > > >> > > >> > > > >> > > >> > I have started this work in my fork at > > >> > > >> > > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60 > > . > > >> > > >> > > > >> > > >> > The current progress is that a simple test case works. Major > > >> work > > >> > > still > > >> > > >> > needs to be done with respect to recovery and partitioning. > > >> > > >> > > > >> > > >> > The value ITERATION_WINDOW_COUNT is an attribute to an input > > >> port of > > >> > > an > > >> > > >> > operator. If the value of the attribute is greater than or > > >> equal to > > >> > > 1, > > >> > > >> any > > >> > > >> > tuples sent to the input port are treated to be > > >> > ITERATION_WINDOW_COUNT > > >> > > >> > windows ahead of what they are. > > >> > > >> > > > >> > > >> > For recovery, we will need to checkpoint all the tuples > between > > >> > ports > > >> > > >> with > > >> > > >> > the to replay the looped tuples. During the recovery, if the > > >> > operator > > >> > > >> has > > >> > > >> > an input port, with ITERATION_WINDOW_COUNT=2, is recovering > > from > > >> > > >> checkpoint > > >> > > >> > window 14, the tuples for that input port from window 13 and > > >> window > > >> > 14 > > >> > > >> need > > >> > > >> > to be replayed to be treated as window 15 and window 16 > > >> respectively > > >> > > >> (13+2 > > >> > > >> > and 14+2). > > >> > > >> > > > >> > > >> > In other words, we need to store all the tuples from window > > with > > >> ID > > >> > > >> > committedWindowId minus ITERATION_WINDOW_COUNT for recovery > and > > >> > purge > > >> > > >> the > > >> > > >> > tuples earlier than that window. > > >> > > >> > We can optimize this by only storing the tuples for > > >> > > >> ITERATION_WINDOW_COUNT > > >> > > >> > windows prior to any checkpoint. > > >> > > >> > > > >> > > >> > For that, we need a storage mechanism for the tuples. > Chandni > > >> > already > > >> > > >> has > > >> > > >> > something that fits this usage case in Apex Malhar. The > class > > is > > >> > > >> > IdempotentStorageManager. In order for this to be used in > Apex > > >> > core, > > >> > > we > > >> > > >> > need to deprecate the class in Apex Malhar and move it to > Apex > > >> Core. > > >> > > >> > > > >> > > >> > A JIRA ticket has been created for this particular work: > > >> > > >> > > > >> > > >> > https://malhar.atlassian.net/browse/APEX-128 > > >> > > >> > > > >> > > >> > Some of the above has been discussed among Thomas, Chetan, > > >> Chandni, > > >> > > and > > >> > > >> > myself. > > >> > > >> > > > >> > > >> > For partitioning, we have not started any discussion or > > >> > brainstorming. > > >> > > >> We > > >> > > >> > appreciate any feedback on this and any other aspect related > to > > >> > > >> supporting > > >> > > >> > iterations in general. > > >> > > >> > > > >> > > >> > Thanks! > > >> > > >> > > > >> > > >> > David > > >> > > >> > > > >> > > >> > > >> > > > > > >> > > > > > >> > > > > >> > > > >> > > > > > > > > > > > >