Thanks Pramod and Sandesh for offering to help. Here's my rough plan:
1. Add DelayOperator interface. The DelayOperator will have a method firstWindow(long windowId). Implementations of this interface is supposed to emit tuples for the first window of the execution of the operator (either the first window of the execution of the application or the first window after recovery). 2. Add SimpleDelayOperator that implements DelayOperator. It has one input port and one output port. It simply passes the tuples from the input port to the output port and does not do anything for firstWindow() call. 3. Engine (e.g. DAG validation) to support loops in case of DelayOperator. 4. Implement the +1 delay in the engine for input ports that are connected to output ports of DelayOperator. 5. Add capability in the engine to let the operator know when is the next checkpoint window. 6. Add DefaultDelayOperator that extends SimpleDelayOperator. It writes the tuples in the window before each checkpoint to a DFS-backed WAL (using item 5 above), and it overrides firstWindow() to read from the WAL and emits the tuples at recovery. 7. Add +N delay capability (in addition to +1) in the DefaultDelayOperator. Let me know whether this plan sounds good to you. I'm done with 1, 2, and 3. 4 is in progress. I think the bulk of the work is 5 and 6 and we can discuss how we can divide the work. 7 is a nice-to-have and does not have to be done unless there is a demand. David On Wed, Nov 4, 2015 at 10:11 AM, Pramod Immaneni <pra...@datatorrent.com> wrote: > I would like to help. I might be able to pick up some of the smaller tasks. > > On Wed, Nov 4, 2015 at 10:05 AM, David Yan <da...@datatorrent.com> wrote: > > > Thank you for all your feedback. Looks like option #2 wins. > > > > I will be working on this in November and please let me know if you'd > like > > to join the effort! > > > > On Wed, Nov 4, 2015 at 9:58 AM, Thomas Weise <tho...@datatorrent.com> > > wrote: > > > > > Agreed, there is no ambiguity. > > > > > > #2 will also allow the user to tune locality as there are no implicit > > > streams as opposed to the unifier like approach. > > > > > > On Wed, Nov 4, 2015 at 9:54 AM, David Yan <da...@datatorrent.com> > wrote: > > > > > > > On Tue, Nov 3, 2015 at 9:57 PM, Thomas Weise <tho...@datatorrent.com > > > > > > wrote: > > > > > > > > > > > > > > #2 will address that. But if an operator with the delay interface > has > > > > > multiple input ports, on which port will the engine perform the > > delay? > > > > > Maybe we will need to validate that a delay operator can only have > a > > > > single > > > > > input port? > > > > > > > > > > > > > My understanding is that the engine performs the +1 delay on the > input > > > > ports of operators that are connected to output ports of delay > > operators. > > > > So whether or not the delay operator has multiple input ports should > > not > > > > matter. > > > > > > > > > >