There is code in various places that deals with stateless operators in a special way even though a physical checkpoint does not exist on the disk. It is probably a matter of applying similar thought process/logic correctly here.
On Wed, Mar 1, 2017 at 2:27 PM, Amol Kekre <[email protected]> wrote: > hmm! the fact that commitWindowId has moved up (right now in memory of > Stram) should mean that a complete set of checkpoints are available, i.e > commitWindowId can be derived. Lets say that next checkpoint window also > gets checkpointed across the app, commitwindowID is in memory but not > written to stram-state yet, then upon relaunch the latest commitwindowID > should get computed correctly. > > This may be just about setting stateless operators to commitWindowid on > re-launch? aka bug/feature? > > Thks > Amol > > > > E:[email protected] | M: 510-449-2606 | Twitter: @*amolhkekre* > > www.datatorrent.com | apex.apache.org > > *Join us at Apex Big Data World-San Jose > <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!* > [image: http://www.apexbigdata.com/san-jose-register.html] > <http://www.apexbigdata.com/san-jose-register.html> > > On Wed, Mar 1, 2017 at 1:41 PM, Pramod Immaneni <[email protected]> > wrote: > > > Do we need to save committedWindowId? Can't it be computed from existing > > checkpoints by walking through the DAG. We probably do this anyway and I > > suspect there is a minor bug somewhere in there. If an operator is > > stateless you could assume checkpoint as long max for sake of computation > > and compute the committed window to be the lowest common checkpoint. If > > they are all stateless and you end up with long max you can start with > > window id that reflects the current timestamp. > > > > Thanks > > > > On Wed, Mar 1, 2017 at 1:09 PM, Amol Kekre <[email protected]> wrote: > > > > > CommitWindowId could be computed from the existing checkpoints. That > > > solution still needs purge to be done after commitWindowId is confirmed > > to > > > be saved in Stram state. Without ths the commitWindowId computed from > the > > > checkpoints may have some checkpoints missing. > > > > > > Thks > > > Amol > > > > > > > > > > > > E:[email protected] | M: 510-449-2606 | Twitter: @*amolhkekre* > > > > > > www.datatorrent.com | apex.apache.org > > > > > > *Join us at Apex Big Data World-San Jose > > > <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!* > > > [image: http://www.apexbigdata.com/san-jose-register.html] > > > <http://www.apexbigdata.com/san-jose-register.html> > > > > > > On Wed, Mar 1, 2017 at 12:36 PM, Pramod Immaneni < > [email protected] > > > > > > wrote: > > > > > > > Can't the commitedWindowId be calculated by looking at the physical > > plan > > > > and the existing checkpoints? > > > > > > > > On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi <[email protected]> > > wrote: > > > > > > > > > Help Needed for APEXCORE-619 > > > > > > > > > > Issue : When application is relaunched after long time with > stateless > > > > > opeartors at the end of the DAG, the stateless operators starts > with > > a > > > > very > > > > > high windowId. In this case the stateless operator ignors all the > > data > > > > > received till upstream operator catches up with it. This breaks the > > > > > *at-least-once* gaurantee while relaunch of the opeartor or when > > master > > > > is > > > > > killed and application is restarted. > > > > > > > > > > Solutions: > > > > > - Fix windowId for stateless leaf operators from upstream opeartor. > > But > > > > it > > > > > has some issues when we have a join with two upstrams operators at > > > > > different windowId. If we set the windowID to min(upstream > windowId), > > > > then > > > > > we need to again recalulate the new recovery window ids for > upstream > > > > paths > > > > > from this operators. > > > > > > > > > > - Other solution is to create a empty file in checkpoint directory > > for > > > > > stateless operators. This will help us to identify the checkpoints > of > > > > > stateless operators during relaunch instead of computing from > latest > > > > > timestamp. > > > > > > > > > > - Bring the entire DAG to committedWindowId. This could be achived > > > using > > > > > writing committedWindowId in a journal. we need to make sure that > we > > > are > > > > > not puring the checkpointed state until the committedWundowId is > > saved > > > in > > > > > journal. > > > > > > > > > > Let me know your thoughs on this and preferred solution. > > > > > > > > > > Regards, > > > > > -Tushar. > > > > > > > > > > > > > > >
