I would prefer to go with option 2 (and maybe reuse -force flag to allow launching application that do not validate due to newly introduced rule). I am not sure that it is OK to outsmart application designer and force stateless operator to become statefull.

Thank you,

Vlad

/Join us at Apex Big Data World-San Jose <http://www.apexbigdata.com/san-jose.html>, April 4, 2017/ http://www.apexbigdata.com/san-jose-register.html <http://www.apexbigdata.com/san-jose-register.html>
On 3/10/17 07:38, Thomas Weise wrote:
+1

But keep in mind it will cause unnecessary name node operations and
therefore it would be good to only use it when it is really needed (i.e.
the operator in reality isn't stateless, it stores its state somewhere
else).

Can we look at optimizing the behavior for "stateless" operators that are
really stateless. For example the console operator should by default be
AT_MOST_ONCE?


On Fri, Mar 10, 2017 at 1:45 AM, Bhupesh Chawda <[email protected]>
wrote:

My preference is also for option 3. It looks clean and simple to implement.

~ Bhupesh


_______________________________________________________

Bhupesh Chawda

E: [email protected] | Twitter: @bhupeshsc

www.datatorrent.com  |  apex.apache.org



On Fri, Mar 10, 2017 at 3:06 PM, Tushar Gosavi <[email protected]>
wrote:

Can you please let me know your preference? My preference is for solution
3, by adding a StorageAgent which creates an empty file, and using this
storage agent for leaf stateless operators.

- Tushar.


On Tue, Mar 7, 2017 at 1:52 PM, Tushar Gosavi <[email protected]>
wrote:

Thank you all for the feedback.

Some of the useful output operator can be stateless, they push data
received in a window to output store. for example KafkaOutputOperator/
JDBCOutputOperator,
or the output stores where
writes are idempotent, which covers most of the key-value stores.

I was going to use the existing logic to compute the committedWindowId
with addition of few steps explained below.
solution-1
- Calculate committedWindow with leaf operator checkpoints set to
current
timestamp (current behaviour)
- Update leaf operators recoveryWindowId to committedWindowId
- Calculate committedWindow again, this steps is required because as
downstream operator recoveryWindowId is reduced and hence we may have
to
adjust the recoveryWindowId of upstream operators.

This will prevent leaf stateless opeartors to start from current
timestamp, hence reducing amount of data loss. But As per the concern
raised by Bhupesh about last stateless operator being slow, the
solution
suggested by Vlad is sufficient

solution-1
- as explained above. If little loss is expected we could go with this
appraoch.
solution-2
- Fail validation if last operator is stateless in AT_LEAST_ONCE
scenario
as suggested by Vlad.
   This could break backward compatibility as old applications will fail
to
launch.
solution-3
- Mark last operator stateful in AT_LEAST_ONCE scenario.

Let me know about your preference.

Regards,
- Tushar.


On Mon, Mar 6, 2017 at 8:31 PM, Vlad Rozov <[email protected]>
wrote:

For a long chain of stateless operators at the end of a DAG, it is
possible that time to propagate the end window to a leaf operator is
greater than the time for a checkpoint to be persisted in HDFS.

If at least once processing guarantee is necessary, the leaf operators
should not be STATELESS. Will invalidating DAG that has one or more
leaf
operator marked as STATELESS with AT_LEAST_ONCE processing solve
APEXCORE-619? It is not the best solution, but I think it is
sufficient
for
the described scenario.

Thank you,

Vlad


On 3/2/17 08:43, Thomas Weise wrote:

Good point, that's correct for a stateless leaf operator (operator
that
does not have downstream operators). The minimum of upstream
checkpoints
can be higher than the last windowId seen by the leaf operator.
Although
that is a low probability, because it would mean the time it took for
the
checkpoint to become visible in HDFS is less than propagation of
endWindow
downstream.

It's also not a problem for an intermediate stateless operator,
because
the
downstream checkpoint will inform the recovery windowId. Most of the
time
stateless operators are intermediate.

Leaf operators are the output operators. I suspect in the original
scenario
is was a console output operator? Useful output operators usually
won't
be
stateless, they have to track state to interact with the external
system
correctly. I'm bringing this up for adequate cost/benefit analysis.

In absence of stateful downstream operator, you only have the
committed
windowId, which is essentially a checkpointing watermark. On
application
restart it has to be recomputed from the checkpoints available, and
does
not cover the scenario Tushar reported originally.

Saving committed windowId comes at a cost, it would have to be
written
to
the journal before operators are notified. Care has been taken to no
write
unnecessarily to the journal, as it is blocking I/O and in this case
the
frequency depends on the order of arrival of checkpoint notifications
from
operators. We also don't want to delay commitedWindow notification,
as
that
would introduce latency.

Thomas


On Thu, Mar 2, 2017 at 2:10 AM, Bhupesh Chawda <
[email protected]>
wrote:

What if all operators complete first checkpoints but the stateless
operator
could not cross the first checkpoint window, and the DAG crashed.
If we try to figure out the recovery checkpoint now, we might
conclude
that
checkpoint 1 is the point to start and we may miss some data getting
processed by the stateless operator. Probably in this case at-least
once is
also not guaranteed?

~ Bhupesh


_______________________________________________________

Bhupesh Chawda

E: [email protected] | Twitter: @bhupeshsc

www.datatorrent.com  |  apex.apache.org



On Thu, Mar 2, 2017 at 8:06 AM, Thomas Weise <[email protected]>
wrote:
Dummy checkpoints, continuously writing committed window id and the
like
all introduce overhead that is probably not needed.

All the information to derive what we need is likely available and
IMO
the

discussion should be on what is the correct way of using it. I will
have

a

look when I get to it as well.

Thanks,
Thomas


On Wed, Mar 1, 2017 at 6:29 PM, Sandesh Hegde <
[email protected]
wrote:

Instead of treating the stateless operator in a special way and
missing
corner cases, just have a dummy checkpoint, then there is no need
to
handle

corner cases.

There is a name for this solution,
https://en.wikipedia.org/wiki/Null_Object_pattern



On Wed, Mar 1, 2017 at 2:52 PM Pramod Immaneni <
[email protected]
wrote:

There is code in various places that deals with stateless
operators
in
a

special way even though a physical checkpoint does not exist on
the
disk.
It is probably a matter of applying similar thought process/logic
correctly

here.

On Wed, Mar 1, 2017 at 2:27 PM, Amol Kekre <[email protected]
wrote:
hmm! the fact that commitWindowId has moved up (right now in
memory
of
Stram) should mean that a complete set of checkpoints are
available,
i.e
commitWindowId can be derived. Lets say that next checkpoint
window
also
gets checkpointed across the app, commitwindowID is in memory but
not
written to stram-state yet, then upon relaunch the latest
commitwindowID
should get computed correctly.
This may be just about setting stateless operators to

commitWindowid
on

re-launch? aka bug/feature?
Thks
Amol



E:[email protected] | M: 510-449-2606 <(510)%20449-2606> |

Twitter:
@*amolhkekre*
www.datatorrent.com  |  apex.apache.org

*Join us at Apex Big Data World-San Jose
<http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
[image: http://www.apexbigdata.com/san-jose-register.html]
<http://www.apexbigdata.com/san-jose-register.html>

On Wed, Mar 1, 2017 at 1:41 PM, Pramod Immaneni <

[email protected]>
wrote:
Do we need to save committedWindowId? Can't it be computed from
existing
checkpoints by walking through the DAG. We probably do this
anyway
and
I

suspect there is a minor bug somewhere in there. If an operator
is
stateless you could assume checkpoint as long max for sake of
computation
and compute the committed window to be the lowest common
checkpoint.
If

they are all stateless and you end up with long max you can start
with
window id that reflects the current timestamp.
Thanks

On Wed, Mar 1, 2017 at 1:09 PM, Amol Kekre <
[email protected]
wrote:
CommitWindowId could be computed from the existing checkpoints.
That
solution still needs purge to be done after commitWindowId is
confirmed
to
be saved in Stram state. Without ths the commitWindowId

computed
from
the
checkpoints may have some checkpoints missing.
Thks
Amol



E:[email protected] | M: 510-449-2606 <(510)%20449-2606> |

Twitter: @*amolhkekre*
www.datatorrent.com  |  apex.apache.org
*Join us at Apex Big Data World-San Jose
<http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
[image: http://www.apexbigdata.com/san-jose-register.html]
<http://www.apexbigdata.com/san-jose-register.html>

On Wed, Mar 1, 2017 at 12:36 PM, Pramod Immaneni <

[email protected]
wrote:
Can't the commitedWindowId be calculated by looking at the
physical
plan
and the existing checkpoints?
On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi <

[email protected]
wrote:
Help Needed for APEXCORE-619
Issue : When application is relaunched after long time with

stateless
opeartors at the end of the DAG, the stateless operators
starts
with
a

very
high windowId. In this case the stateless operator ignors

all
the
data
received till upstream operator catches up with it. This
breaks
the
*at-least-once* gaurantee while relaunch of the opeartor or
when
master
is
killed and application is restarted.

Solutions:
- Fix windowId for stateless leaf operators from upstream

opeartor.
But
it
has some issues when we have a join with two upstrams

operators
at
different windowId. If we set the windowID to min(upstream
windowId),
then
we need to again recalulate the new recovery window ids for

upstream
paths
from this operators.

- Other solution is to create a empty file in checkpoint

directory
for
stateless operators. This will help us to identify the
checkpoints
of

stateless operators during relaunch instead of computing
from
latest
timestamp.
- Bring the entire DAG to committedWindowId. This could be

achived
using
writing committedWindowId in a journal. we need to make
sure
that
we
are
not puring the checkpointed state until the
committedWundowId
is
saved
in

journal.
Let me know your thoughs on this and preferred solution.

Regards,
-Tushar.

--
*Join us at Apex Big Data World-San Jose
<http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
[image: http://www.apexbigdata.com/san-jose-register.html]



Reply via email to