On Mon, May 20, 2019 at 8:24 AM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
This discussion brings many really interesting
questions for me. :-)
> I don't see batch vs. streaming as part of
the model. One can have
microbatch, or even a runner that alternates
between different modes.
Although I understand motivation of this
statement, this project name is
"Apache Beam: An advanced unified programming
model". What does the
model unify, if "streaming vs. batch" is not
part of the model?
Using microbatching, chaining of batch jobs, or
pure streaming are
exactly the "runtime
conditions/characteristics" I refer to. All these
define several runtime parameters, which in
turn define how well/badly
will the pipeline perform and how many
resources might be needed. From
my point of view, pure streaming should be the
most resource demanding
(if not, why bother with batch? why not run
everything in streaming
only? what will there remain to "unify"?).
> Fortunately, for batch, only the state for a
single key needs to be
preserved at a time, rather than the state for
all keys across the range
of skew. Of course if you have few or hot keys,
one can still have
issues (and this is not specific to StatefulDoFns).
Yes, but here is still the presumption that my
stateful DoFn can
tolerate arbitrary shuffling of inputs. Let me
explain the use case in
more detail.
Suppose you have input stream consisting of 1s
and 0s (and some key for
each element, which is irrelevant for the
demonstration). Your task is
to calculate in running global window the
actual number of changes
between state 0 and state 1 and vice versa.
When the state doesn't
change, you don't calculate anything. If input
(for given key) would be
(tN denotes timestamp N):
t1: 1
t2: 0
t3: 0
t4: 1
t5: 1
t6: 0
then the output should yield (supposing that
default state is zero):
t1: (one: 1, zero: 0)
t2: (one: 1, zero: 1)
t3: (one: 1, zero: 1)
t4: (one: 2, zero: 1)
t5: (one: 2, zero: 1)
t6: (one: 2, zero: 2)
How would you implement this in current Beam
semantics?
I think your saying here that I know that my input
is ordered in a specific way and since I assume the
order when writing my pipeline I can perform this
optimization. But there is nothing preventing a
runner from noticing that your processing in the
global window with a specific type of trigger and
re-ordering your inputs/processing to get better
performance (since you can't use an AfterWatermark
trigger for your pipeline in streaming for the
GlobalWindow).
Today, if you must have a strict order, you must
guarantee that your StatefulParDo implements the
necessary "buffering & sorting" into state. I can
see why you would want an annotation that says I
must have timestamp ordered elements, since it
makes writing certain StatefulParDos much easier.
StatefulParDo is a low-level function, it really is
the "here you go and do whatever you need to but
here be dragons" function while windowing and
triggering is meant to keep many people from
writing StatefulParDo in the first place.
> Pipelines that fail in the "worst case"
batch scenario are likely to
degrade poorly (possibly catastrophically) when
the watermark falls
behind in streaming mode as well.
But the worst case is defined by input of size
(available resources +
single byte) -> pipeline fail. Although it
could have finished, given
the right conditions.
> This might be reasonable, implemented by
default by buffering
everything and releasing elements as the
watermark (+lateness) advances,
but would likely lead to inefficient (though
*maybe* easier to reason
about) code.
Sure, the pipeline will be less efficient,
because it would have to
buffer and sort the inputs. But at least it
will produce correct results
in cases where updates to state are
order-sensitive.
> Would it be roughly equivalent to GBK +
FlatMap(lambda (key, values):
[(key, value) for value in values])?
I'd say roughly yes, but difference would be in
the trigger. The trigger
should ideally fire as soon as watermark
(+lateness) crosses element
with lowest timestamp in the buffer. Although
this could be somehow
emulated by fixed trigger each X millis.
> Or is the underlying desire just to be able
to hint to the runner
that the code may perform better (e.g. require
less resources) as skew
is reduced (and hence to order by timestamp iff
it's cheap)?
No, the sorting would have to be done in
streaming case as well. That is
an imperative of the unified model. I think it
is possible to sort by
timestamp only in batch case (and do it for
*all* batch stateful pardos
without annotation), or introduce annotation,
but then make the same
guarantees for streaming case as well.
Jan
On 5/20/19 4:41 PM, Robert Bradshaw wrote:
> On Mon, May 20, 2019 at 1:19 PM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
>> Hi Robert,
>>
>> yes, I think you rephrased my point -
although no *explicit* guarantees
>> of ordering are given in either mode, there
is *implicit* ordering in
>> streaming case that is due to nature of the
processing - the difference
>> between watermark and timestamp of elements
flowing through the pipeline
>> are generally low (too high difference leads
to the overbuffering
>> problem), but there is no such bound on batch.
> Fortunately, for batch, only the state for a
single key needs to be
> preserved at a time, rather than the state
for all keys across the
> range of skew. Of course if you have few or
hot keys, one can still
> have issues (and this is not specific to
StatefulDoFns).
>
>> As a result, I see a few possible solutions:
>>
>> - the best and most natural seems to be
extension of the model, so
>> that it defines batch as not only "streaming
pipeline executed in batch
>> fashion", but "pipeline with at least as
good runtime characteristics as
>> in streaming case, executed in batch
fashion", I really don't think that
>> there are any conflicts with the current
model, or that this could
>> affect performance, because the required
sorting (as pointed by
>> Aljoscha) is very probably already done
during translation of stateful
>> pardos. Also note that this definition only
affects user defined
>> stateful pardos
> I don't see batch vs. streaming as part of
the model. One can have
> microbatch, or even a runner that alternates
between different modes.
> The model describes what the valid outputs
are given a (sometimes
> partial) set of inputs. It becomes really
hard to define things like
> "as good runtime characteristics." Once you
allow any
> out-of-orderedness, it is not very feasible
to try and define (and
> more cheaply implement) a "upper bound" of
acceptable
> out-of-orderedness.
>
> Pipelines that fail in the "worst case" batch
scenario are likely to
> degrade poorly (possibly catastrophically)
when the watermark falls
> behind in streaming mode as well.
>
>> - another option would be to introduce
annotation for DoFns (e.g.
>> @RequiresStableTimeCharacteristics), which
would result in the sorting
>> in batch case - but - this extension would
have to ensure the sorting in
>> streaming mode also - it would require
definition of allowed lateness,
>> and triggger (essentially similar to window)
> This might be reasonable, implemented by
default by buffering
> everything and releasing elements as the
watermark (+lateness)
> advances, but would likely lead to
inefficient (though *maybe* easier
> to reason about) code. Not sure about the
semantics of triggering
> here, especially data-driven triggers. Would
it be roughly equivalent
> to GBK + FlatMap(lambda (key, values): [(key,
value) for value in
> values])?
>
> Or is the underlying desire just to be able
to hint to the runner that
> the code may perform better (e.g. require
less resources) as skew is
> reduced (and hence to order by timestamp iff
it's cheap)?
>
>> - last option would be to introduce these
"higher order guarantees" in
>> some extension DSL (e.g. Euphoria), but that
seems to be the worst
>> option to me
>>
>> I see the first two options quite equally
good, although the letter one
>> is probably more time consuming to
implement. But it would bring
>> additional feature to streaming case as well.
>>
>> Thanks for any thoughts.
>>
>> Jan
>>
>> On 5/20/19 12:41 PM, Robert Bradshaw wrote:
>>> On Fri, May 17, 2019 at 4:48 PM Jan
Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
>>>> Hi Reuven,
>>>>
>>>>> How so? AFAIK stateful DoFns work just
fine in batch runners.
>>>> Stateful ParDo works in batch as far, as
the logic inside the state works for absolutely
unbounded out-of-orderness of elements. That
basically (practically) can work only for
cases, where the order of input elements
doesn't matter. But, "state" can refer to
"state machine", and any time you have a state
machine involved, then the ordering of elements
would matter.
>>> No guarantees on order are provided in
*either* streaming or batch
>>> mode by the model. However, it is the case
that in order to make
>>> forward progress most streaming runners
attempt to limit the amount of
>>> out-of-orderedness of elements (in terms of
event time vs. processing
>>> time) to make forward progress, which in
turn could help cap the
>>> amount of state that must be held
concurrently, whereas a batch runner
>>> may not allow any state to be safely
discarded until the whole
>>> timeline from infinite past to infinite
future has been observed.
>>>
>>> Also, as pointed out, state is not
preserved "batch to batch" in batch mode.
>>>
>>>
>>> On Thu, May 16, 2019 at 3:59 PM Maximilian
Michels <m...@apache.org
<mailto:m...@apache.org>> wrote:
>>>
>>>>> batch semantics and streaming
semantics differs only in that I can have
GlobalWindow with default trigger on batch and
cannot on stream
>>>> You can have a GlobalWindow in streaming
with a default trigger. You
>>>> could define additional triggers that do
early firings. And you could
>>>> even trigger the global window by
advancing the watermark to +inf.
>>> IIRC, as a pragmatic note, we prohibited
global window with default
>>> trigger on unbounded PCollections in the
SDK because this is more
>>> likely to be user error than an actual
desire to have no output until
>>> drain. But it's semantically valid in the
model.