On Mon, May 20, 2019 at 5:24 PM Jan Lukavský <je...@seznam.cz> wrote:
> I don't see batch vs. streaming as part of the model. One
can have
microbatch, or even a runner that alternates between different
modes.
Although I understand motivation of this statement, this project
name is
"Apache Beam: An advanced unified programming model". What does the
model unify, if "streaming vs. batch" is not part of the model?
What I mean is that streaming vs. batch is no longer part of the
model
(or ideally API), but pushed down to be a concern of the runner
(executor) of the pipeline.
On Tue, May 21, 2019 at 10:32 AM Jan Lukavský <je...@seznam.cz>
wrote:
Hi Kenn,
OK, so if we introduce annotation, we can have stateful ParDo
with sorting, that would perfectly resolve my issues. I still
have some doubts, though. Let me explain. The current behavior of
stateful ParDo has the following properties:
a) might fail in batch, although runs fine in streaming (that
is due to the buffering, and unbounded lateness in batch, which
was discussed back and forth in this thread)
b) might be non deterministic (this is because the elements
arrive at somewhat random order, and even if you do the operation
"assign unique ID to elements" this might produce different
results when run multiple times)
PCollections are *explicitly* unordered. Any operations that
assume or
depend on a specific ordering for correctness (or determinism) must
provide that ordering themselves (i.e. tolerate "arbitrary shuffling
of inputs"). As you point out, that may be very expensive if you have
very hot keys with very large (unbounded) timestamp skew.
StatefulDoFns are low-level operations that should be used with care;
the simpler windowing model gives determinism in the face of
unordered
data (though late data and non-end-of-window triggering introduces
some of the non-determanism back in).
What worries me most is the property b), because it seems to me
to have serious consequences - not only that if you run twice
batch pipeline you would get different results, but even on
streaming, when pipeline fails and gets restarted from
checkpoint, produced output might differ from the previous run
and data from the first run might have already been persisted
into sink. That would create somewhat messy outputs.
Beam has an exactly-once model. If the data was consumed, state
mutated, and outputs written downstream (these three are committed
together atomically) it will not be replayed. That does not, of
course, solve the non-determanism due to ordering (including the fact
that two operations reading the same PCollection may view different
ordering).
These two properties makes me think that the current
implementation is more of a _special case_ than the general one.
The general one would be that your state doesn't have the
properties to be able to tolerate buffering problems and/or
non-determinism. Which is the case where you need sorting in both
streaming and batch to be part of the model.
Let me point out one more analogy - that is merging vs.
non-merging windows. The general case (merging windows) implies
sorting by timestamp in both batch case (explicit) and streaming
(buffering). The special case (non-merging windows) doesn't rely
on any timestamp ordering, so the sorting and buffering can be
dropped. The underlying root cause of this is the same for both
stateful ParDo and windowing (essentially, assigning window
labels is a stateful operation when windowing function is merging).
The reason for the current behavior of stateful ParDo seems to be
performance, but is it right to abandon correctness in favor of
performance? Wouldn't it be more consistent to have the default
behavior prefer correctness and when you have the specific
conditions of state function having special properties, then you
can annotate your DoFn (with something like
@TimeOrderingAgnostic), which would yield a better performance in
that case?
There are two separable questions here.
1) Is it correct for a (Stateful)DoFn to assume elements are received
in a specific order? In the current model, it is not. Being able to
read, handle, and produced out-of-order data, including late data, is
a pretty fundamental property of distributed systems.
2) Given that some operations are easier (or possibly only possible)
to write when operating on ordered data, and that different runners
may have (significantly) cheaper ways to provide this ordering than
can be done by the user themselves, should we elevate this to a
property of (Stateful?)DoFns that the runner can provide? I think a
compelling argument can be made here that we should.
- Robert
On 5/21/19 1:00 AM, Kenneth Knowles wrote:
Thanks for the nice small example of a calculation that depends
on order. You are right that many state machines have this
property. I agree w/ you and Luke that it is convenient for batch
processing to sort by event timestamp before running a stateful
ParDo. In streaming you could also implement "sort by event
timestamp" by buffering until you know all earlier data will be
dropped - a slack buffer up to allowed lateness.
I do not think that it is OK to sort in batch and not in
streaming. Many state machines diverge very rapidly when things
are out of order. So each runner if they see the
"@OrderByTimestamp" annotation (or whatever) needs to deliver
sorted data (by some mix of buffering and dropping), or to reject
the pipeline as unsupported.
And also want to say that this is not the default case - many
uses of state & timers in ParDo yield different results at the
element level, but the results are equivalent at in the big
picture. Such as the example of "assign a unique sequence number
to each element" or "group into batches" it doesn't matter
exactly what the result is, only that it meets the spec. And
other cases like user funnels are monotonic enough that you also
don't actually need sorting.
Kenn
On Mon, May 20, 2019 at 2:59 PM Jan Lukavský <je...@seznam.cz>
wrote:
Yes, the problem will arise probably mostly when you have not
well distributed keys (or too few keys). I'm really not sure if
a pure GBK with a trigger can solve this - it might help to have
data driven trigger. There would still be some doubts, though.
The main question is still here - people say, that sorting by
timestamp before stateful ParDo would be prohibitively slow, but
I don't really see why - the sorting is very probably already
there. And if not (hash grouping instead of sorted grouping),
then the sorting would affect only user defined StatefulParDos.
This would suggest that the best way out of this would be really
to add annotation, so that the author of the pipeline can decide.
If that would be acceptable I think I can try to prepare some
basic functionality, but I'm not sure, if I would be able to
cover all runners / sdks.
On 5/20/19 11:36 PM, Lukasz Cwik wrote:
It is read all per key and window and not just read all (this
still won't scale with hot keys in the global window). The GBK
preceding the StatefulParDo will guarantee that you are
processing all the values for a specific key and window at any
given time. Is there a specific window/trigger that is missing
that you feel would remove the need for you to use StatefulParDo?
On Mon, May 20, 2019 at 12:54 PM Jan Lukavský <je...@seznam.cz>
wrote:
Hi Lukasz,
Today, if you must have a strict order, you must guarantee
that your StatefulParDo implements the necessary "buffering &
sorting" into state.
Yes, no problem with that. But this whole discussion started,
because *this doesn't work on batch*. You simply cannot first
read everything from distributed storage and then buffer it all
into memory, just to read it again, but sorted. That will not
work. And even if it would, it would be a terrible waste of
resources.
Jan
On 5/20/19 8:39 PM, Lukasz Cwik wrote:
On Mon, May 20, 2019 at 8:24 AM Jan Lukavský <je...@seznam.cz>
wrote:
This discussion brings many really interesting questions for
me. :-)
> I don't see batch vs. streaming as part of the model. One
can have
microbatch, or even a runner that alternates between different
modes.
Although I understand motivation of this statement, this
project name is
"Apache Beam: An advanced unified programming model". What
does the
model unify, if "streaming vs. batch" is not part of the model?
Using microbatching, chaining of batch jobs, or pure streaming
are
exactly the "runtime conditions/characteristics" I refer to.
All these
define several runtime parameters, which in turn define how
well/badly
will the pipeline perform and how many resources might be
needed. From
my point of view, pure streaming should be the most resource
demanding
(if not, why bother with batch? why not run everything in
streaming
only? what will there remain to "unify"?).
> Fortunately, for batch, only the state for a single key
needs to be
preserved at a time, rather than the state for all keys across
the range
of skew. Of course if you have few or hot keys, one can still
have
issues (and this is not specific to StatefulDoFns).
Yes, but here is still the presumption that my stateful DoFn can
tolerate arbitrary shuffling of inputs. Let me explain the use
case in
more detail.
Suppose you have input stream consisting of 1s and 0s (and
some key for
each element, which is irrelevant for the demonstration). Your
task is
to calculate in running global window the actual number of
changes
between state 0 and state 1 and vice versa. When the state
doesn't
change, you don't calculate anything. If input (for given key)
would be
(tN denotes timestamp N):
t1: 1
t2: 0
t3: 0
t4: 1
t5: 1
t6: 0
then the output should yield (supposing that default state is
zero):
t1: (one: 1, zero: 0)
t2: (one: 1, zero: 1)
t3: (one: 1, zero: 1)
t4: (one: 2, zero: 1)
t5: (one: 2, zero: 1)
t6: (one: 2, zero: 2)
How would you implement this in current Beam semantics?
I think your saying here that I know that my input is ordered
in a specific way and since I assume the order when writing my
pipeline I can perform this optimization. But there is nothing
preventing a runner from noticing that your processing in the
global window with a specific type of trigger and re-ordering
your inputs/processing to get better performance (since you
can't use an AfterWatermark trigger for your pipeline in
streaming for the GlobalWindow).
Today, if you must have a strict order, you must guarantee that
your StatefulParDo implements the necessary "buffering &
sorting" into state. I can see why you would want an annotation
that says I must have timestamp ordered elements, since it
makes writing certain StatefulParDos much easier. StatefulParDo
is a low-level function, it really is the "here you go and do
whatever you need to but here be dragons" function while
windowing and triggering is meant to keep many people from
writing StatefulParDo in the first place.
> Pipelines that fail in the "worst case" batch scenario
are likely to
degrade poorly (possibly catastrophically) when the watermark
falls
behind in streaming mode as well.
But the worst case is defined by input of size (available
resources +
single byte) -> pipeline fail. Although it could have
finished, given
the right conditions.
> This might be reasonable, implemented by default by
buffering
everything and releasing elements as the watermark (+lateness)
advances,
but would likely lead to inefficient (though *maybe* easier to
reason
about) code.
Sure, the pipeline will be less efficient, because it would
have to
buffer and sort the inputs. But at least it will produce
correct results
in cases where updates to state are order-sensitive.
> Would it be roughly equivalent to GBK + FlatMap(lambda
(key, values):
[(key, value) for value in values])?
I'd say roughly yes, but difference would be in the trigger.
The trigger
should ideally fire as soon as watermark (+lateness) crosses
element
with lowest timestamp in the buffer. Although this could be
somehow
emulated by fixed trigger each X millis.
> Or is the underlying desire just to be able to hint to
the runner
that the code may perform better (e.g. require less resources)
as skew
is reduced (and hence to order by timestamp iff it's cheap)?
No, the sorting would have to be done in streaming case as
well. That is
an imperative of the unified model. I think it is possible to
sort by
timestamp only in batch case (and do it for *all* batch
stateful pardos
without annotation), or introduce annotation, but then make
the same
guarantees for streaming case as well.
Jan
On 5/20/19 4:41 PM, Robert Bradshaw wrote:
On Mon, May 20, 2019 at 1:19 PM Jan Lukavský
<je...@seznam.cz> wrote:
Hi Robert,
yes, I think you rephrased my point - although no *explicit*
guarantees
of ordering are given in either mode, there is *implicit*
ordering in
streaming case that is due to nature of the processing - the
difference
between watermark and timestamp of elements flowing through
the pipeline
are generally low (too high difference leads to the
overbuffering
problem), but there is no such bound on batch.
Fortunately, for batch, only the state for a single key needs
to be
preserved at a time, rather than the state for all keys
across the
range of skew. Of course if you have few or hot keys, one can
still
have issues (and this is not specific to StatefulDoFns).
As a result, I see a few possible solutions:
- the best and most natural seems to be extension of
the model, so
that it defines batch as not only "streaming pipeline
executed in batch
fashion", but "pipeline with at least as good runtime
characteristics as
in streaming case, executed in batch fashion", I really
don't think that
there are any conflicts with the current model, or that this
could
affect performance, because the required sorting (as pointed by
Aljoscha) is very probably already done during translation
of stateful
pardos. Also note that this definition only affects user
defined
stateful pardos
I don't see batch vs. streaming as part of the model. One can
have
microbatch, or even a runner that alternates between
different modes.
The model describes what the valid outputs are given a
(sometimes
partial) set of inputs. It becomes really hard to define
things like
"as good runtime characteristics." Once you allow any
out-of-orderedness, it is not very feasible to try and define
(and
more cheaply implement) a "upper bound" of acceptable
out-of-orderedness.
Pipelines that fail in the "worst case" batch scenario are
likely to
degrade poorly (possibly catastrophically) when the watermark
falls
behind in streaming mode as well.
- another option would be to introduce annotation for
DoFns (e.g.
@RequiresStableTimeCharacteristics), which would result in
the sorting
in batch case - but - this extension would have to ensure
the sorting in
streaming mode also - it would require definition of allowed
lateness,
and triggger (essentially similar to window)
This might be reasonable, implemented by default by buffering
everything and releasing elements as the watermark (+lateness)
advances, but would likely lead to inefficient (though
*maybe* easier
to reason about) code. Not sure about the semantics of
triggering
here, especially data-driven triggers. Would it be roughly
equivalent
to GBK + FlatMap(lambda (key, values): [(key, value) for
value in
values])?
Or is the underlying desire just to be able to hint to the
runner that
the code may perform better (e.g. require less resources) as
skew is
reduced (and hence to order by timestamp iff it's cheap)?
- last option would be to introduce these "higher order
guarantees" in
some extension DSL (e.g. Euphoria), but that seems to be the
worst
option to me
I see the first two options quite equally good, although the
letter one
is probably more time consuming to implement. But it would
bring
additional feature to streaming case as well.
Thanks for any thoughts.
Jan
On 5/20/19 12:41 PM, Robert Bradshaw wrote:
On Fri, May 17, 2019 at 4:48 PM Jan Lukavský
<je...@seznam.cz> wrote:
Hi Reuven,
How so? AFAIK stateful DoFns work just fine in batch
runners.
Stateful ParDo works in batch as far, as the logic inside
the state works for absolutely unbounded out-of-orderness
of elements. That basically (practically) can work only
for cases, where the order of input elements doesn't
matter. But, "state" can refer to "state machine", and any
time you have a state machine involved, then the ordering
of elements would matter.
No guarantees on order are provided in *either* streaming
or batch
mode by the model. However, it is the case that in order to
make
forward progress most streaming runners attempt to limit
the amount of
out-of-orderedness of elements (in terms of event time vs.
processing
time) to make forward progress, which in turn could help
cap the
amount of state that must be held concurrently, whereas a
batch runner
may not allow any state to be safely discarded until the whole
timeline from infinite past to infinite future has been
observed.
Also, as pointed out, state is not preserved "batch to
batch" in batch mode.
On Thu, May 16, 2019 at 3:59 PM Maximilian Michels
<m...@apache.org> wrote:
batch semantics and streaming semantics differs only
in that I can have GlobalWindow with default trigger on
batch and cannot on stream
You can have a GlobalWindow in streaming with a default
trigger. You
could define additional triggers that do early firings.
And you could
even trigger the global window by advancing the watermark
to +inf.
IIRC, as a pragmatic note, we prohibited global window with
default
trigger on unbounded PCollections in the SDK because this
is more
likely to be user error than an actual desire to have no
output until
drain. But it's semantically valid in the model.