On Mon, May 20, 2019 at 8:24 AM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
This discussion brings many really interesting questions
for me. :-)
> I don't see batch vs. streaming as part of the model.
One can have
microbatch, or even a runner that alternates between
different modes.
Although I understand motivation of this statement, this
project name is
"Apache Beam: An advanced unified programming model".
What does the
model unify, if "streaming vs. batch" is not part of the
model?
Using microbatching, chaining of batch jobs, or pure
streaming are
exactly the "runtime conditions/characteristics" I refer
to. All these
define several runtime parameters, which in turn define
how well/badly
will the pipeline perform and how many resources might
be needed. From
my point of view, pure streaming should be the most
resource demanding
(if not, why bother with batch? why not run everything
in streaming
only? what will there remain to "unify"?).
> Fortunately, for batch, only the state for a single
key needs to be
preserved at a time, rather than the state for all keys
across the range
of skew. Of course if you have few or hot keys, one can
still have
issues (and this is not specific to StatefulDoFns).
Yes, but here is still the presumption that my stateful
DoFn can
tolerate arbitrary shuffling of inputs. Let me explain
the use case in
more detail.
Suppose you have input stream consisting of 1s and 0s
(and some key for
each element, which is irrelevant for the
demonstration). Your task is
to calculate in running global window the actual number
of changes
between state 0 and state 1 and vice versa. When the
state doesn't
change, you don't calculate anything. If input (for
given key) would be
(tN denotes timestamp N):
t1: 1
t2: 0
t3: 0
t4: 1
t5: 1
t6: 0
then the output should yield (supposing that default
state is zero):
t1: (one: 1, zero: 0)
t2: (one: 1, zero: 1)
t3: (one: 1, zero: 1)
t4: (one: 2, zero: 1)
t5: (one: 2, zero: 1)
t6: (one: 2, zero: 2)
How would you implement this in current Beam semantics?
I think your saying here that I know that my input is
ordered in a specific way and since I assume the order when
writing my pipeline I can perform this optimization. But
there is nothing preventing a runner from noticing that your
processing in the global window with a specific type of
trigger and re-ordering your inputs/processing to get better
performance (since you can't use an AfterWatermark trigger
for your pipeline in streaming for the GlobalWindow).
Today, if you must have a strict order, you must guarantee
that your StatefulParDo implements the necessary "buffering
& sorting" into state. I can see why you would want an
annotation that says I must have timestamp ordered elements,
since it makes writing certain StatefulParDos much easier.
StatefulParDo is a low-level function, it really is the
"here you go and do whatever you need to but here be
dragons" function while windowing and triggering is meant to
keep many people from writing StatefulParDo in the first place.
> Pipelines that fail in the "worst case" batch
scenario are likely to
degrade poorly (possibly catastrophically) when the
watermark falls
behind in streaming mode as well.
But the worst case is defined by input of size
(available resources +
single byte) -> pipeline fail. Although it could have
finished, given
the right conditions.
> This might be reasonable, implemented by default by
buffering
everything and releasing elements as the watermark
(+lateness) advances,
but would likely lead to inefficient (though *maybe*
easier to reason
about) code.
Sure, the pipeline will be less efficient, because it
would have to
buffer and sort the inputs. But at least it will produce
correct results
in cases where updates to state are order-sensitive.
> Would it be roughly equivalent to GBK +
FlatMap(lambda (key, values):
[(key, value) for value in values])?
I'd say roughly yes, but difference would be in the
trigger. The trigger
should ideally fire as soon as watermark (+lateness)
crosses element
with lowest timestamp in the buffer. Although this could
be somehow
emulated by fixed trigger each X millis.
> Or is the underlying desire just to be able to hint
to the runner
that the code may perform better (e.g. require less
resources) as skew
is reduced (and hence to order by timestamp iff it's cheap)?
No, the sorting would have to be done in streaming case
as well. That is
an imperative of the unified model. I think it is
possible to sort by
timestamp only in batch case (and do it for *all* batch
stateful pardos
without annotation), or introduce annotation, but then
make the same
guarantees for streaming case as well.
Jan
On 5/20/19 4:41 PM, Robert Bradshaw wrote:
> On Mon, May 20, 2019 at 1:19 PM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
>> Hi Robert,
>>
>> yes, I think you rephrased my point - although no
*explicit* guarantees
>> of ordering are given in either mode, there is
*implicit* ordering in
>> streaming case that is due to nature of the
processing - the difference
>> between watermark and timestamp of elements flowing
through the pipeline
>> are generally low (too high difference leads to the
overbuffering
>> problem), but there is no such bound on batch.
> Fortunately, for batch, only the state for a single
key needs to be
> preserved at a time, rather than the state for all
keys across the
> range of skew. Of course if you have few or hot keys,
one can still
> have issues (and this is not specific to StatefulDoFns).
>
>> As a result, I see a few possible solutions:
>>
>> - the best and most natural seems to be extension
of the model, so
>> that it defines batch as not only "streaming pipeline
executed in batch
>> fashion", but "pipeline with at least as good runtime
characteristics as
>> in streaming case, executed in batch fashion", I
really don't think that
>> there are any conflicts with the current model, or
that this could
>> affect performance, because the required sorting (as
pointed by
>> Aljoscha) is very probably already done during
translation of stateful
>> pardos. Also note that this definition only affects
user defined
>> stateful pardos
> I don't see batch vs. streaming as part of the model.
One can have
> microbatch, or even a runner that alternates between
different modes.
> The model describes what the valid outputs are given a
(sometimes
> partial) set of inputs. It becomes really hard to
define things like
> "as good runtime characteristics." Once you allow any
> out-of-orderedness, it is not very feasible to try and
define (and
> more cheaply implement) a "upper bound" of acceptable
> out-of-orderedness.
>
> Pipelines that fail in the "worst case" batch scenario
are likely to
> degrade poorly (possibly catastrophically) when the
watermark falls
> behind in streaming mode as well.
>
>> - another option would be to introduce annotation
for DoFns (e.g.
>> @RequiresStableTimeCharacteristics), which would
result in the sorting
>> in batch case - but - this extension would have to
ensure the sorting in
>> streaming mode also - it would require definition of
allowed lateness,
>> and triggger (essentially similar to window)
> This might be reasonable, implemented by default by
buffering
> everything and releasing elements as the watermark
(+lateness)
> advances, but would likely lead to inefficient (though
*maybe* easier
> to reason about) code. Not sure about the semantics of
triggering
> here, especially data-driven triggers. Would it be
roughly equivalent
> to GBK + FlatMap(lambda (key, values): [(key, value)
for value in
> values])?
>
> Or is the underlying desire just to be able to hint to
the runner that
> the code may perform better (e.g. require less
resources) as skew is
> reduced (and hence to order by timestamp iff it's cheap)?
>
>> - last option would be to introduce these "higher
order guarantees" in
>> some extension DSL (e.g. Euphoria), but that seems to
be the worst
>> option to me
>>
>> I see the first two options quite equally good,
although the letter one
>> is probably more time consuming to implement. But it
would bring
>> additional feature to streaming case as well.
>>
>> Thanks for any thoughts.
>>
>> Jan
>>
>> On 5/20/19 12:41 PM, Robert Bradshaw wrote:
>>> On Fri, May 17, 2019 at 4:48 PM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
>>>> Hi Reuven,
>>>>
>>>>> How so? AFAIK stateful DoFns work just fine in
batch runners.
>>>> Stateful ParDo works in batch as far, as the logic
inside the state works for absolutely unbounded
out-of-orderness of elements. That basically
(practically) can work only for cases, where the order
of input elements doesn't matter. But, "state" can refer
to "state machine", and any time you have a state
machine involved, then the ordering of elements would
matter.
>>> No guarantees on order are provided in *either*
streaming or batch
>>> mode by the model. However, it is the case that in
order to make
>>> forward progress most streaming runners attempt to
limit the amount of
>>> out-of-orderedness of elements (in terms of event
time vs. processing
>>> time) to make forward progress, which in turn could
help cap the
>>> amount of state that must be held concurrently,
whereas a batch runner
>>> may not allow any state to be safely discarded until
the whole
>>> timeline from infinite past to infinite future has
been observed.
>>>
>>> Also, as pointed out, state is not preserved "batch
to batch" in batch mode.
>>>
>>>
>>> On Thu, May 16, 2019 at 3:59 PM Maximilian Michels
<[email protected] <mailto:[email protected]>> wrote:
>>>
>>>>> batch semantics and streaming semantics differs
only in that I can have GlobalWindow with default
trigger on batch and cannot on stream
>>>> You can have a GlobalWindow in streaming with a
default trigger. You
>>>> could define additional triggers that do early
firings. And you could
>>>> even trigger the global window by advancing the
watermark to +inf.
>>> IIRC, as a pragmatic note, we prohibited global
window with default
>>> trigger on unbounded PCollections in the SDK because
this is more
>>> likely to be user error than an actual desire to
have no output until
>>> drain. But it's semantically valid in the model.