*From: *Jozef Vilcek <[email protected]
<mailto:[email protected]>>
*Date: *Fri, May 17, 2019 at 2:31 AM
*To: * <[email protected] <mailto:[email protected]>>
Interesting discussion. I think it is very important information,
that when user will use a stateful ParDo, he can run into the
situation where it will not behave correctly in "batch operating
mode".
How so? AFAIK stateful DoFns work just fine in batch runners.
But some transforms known to Beam, like fixed-window, will work
fine? Is there a sorting applied to keyed elements before
evaluating window key group? If answer is yes, then why not also
do the same in case of stateful ParDo? It would feel consistent to
me.
Part of SDK or not, I see DataFlow runner is doing this
optimisation, probably precisely for making stateful ParDo
operations stable in batch mode
https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java#L64
On Thu, May 16, 2019 at 5:09 PM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
Hi Max,
answers inline.
---------- Původní e-mail ----------
Od: Maximilian Michels <[email protected] <mailto:[email protected]>>
Komu: [email protected] <mailto:[email protected]>
Datum: 16. 5. 2019 15:59:59
Předmět: Re: Definition of Unified model (WAS: Semantics of
PCollection.isBounded)
Hi Jan,
Thanks for the discussion. Aljoscha already gave great
answers. Just a
couple of remarks:
> a) streaming semantics (i.e. what I can express using
Transforms) are subset of batch semantics
I think you mean streaming is a superset of batch, or
batch is a subset
of streaming. This is the ideal. In practice, the two
execution modes
are sometimes accomplished by two different execution
engines. Even in
Flink, we have independent APIs for batch and streaming
and the
execution semantics are slightly different. For example,
there are no
watermarks in the batch API. Thus, batch rarely is simply
an execution
mode of streaming. However, I still think the unified Beam
model works
in both cases.
> batch semantics and streaming semantics differs only in
that I can have GlobalWindow with default trigger on batch
and cannot on stream
Actually I really thought, that regarding semantics, streaming
should be subset of batch. That is because in batch, you can
be sure that the watermark will eventually approach infinity.
That gives you one additional feature, that streaming
generally doesn't have (if you don't manually forward
watermark to infinity as you suggest).
You can have a GlobalWindow in streaming with a default
trigger. You
could define additional triggers that do early firings.
And you could
even trigger the global window by advancing the watermark
to +inf.
Yes, but then you actually changed streaming to batch, you
just execute batch pipeline in streaming way.
> On batch engines, this is generally not an issue,
because the buffering is eliminated by sorting - when a
Group by operation occurs, batch runners sort elements
with the same key to be together and therefore eliminate
the need for potentially infinite cache.
The batch engines you normally use might do that. However,
I do not see
how sorting is an inherent property of the streaming
model. We do not
guarantee a deterministic order of events in streaming
with respect to
event time. In that regard, batch is a true subset of
streaming because
we make no guarantees on the order. Actually, because we
only advance
the watermark from -inf to +inf once we have read all
data, this nicely
aligns with the streaming model.
Sure, streaming, doesn't have the time ordering guarantees.
Having so would be impractical. But - there is no issues in
having these quarantees in batch mode, moreover, it gives the
pipelines, that need to have "bounded out of orderness" the
chance to ever finish.
I think that there is some issues in how we think about the
properties of batch vs. stream. If we define streaming as the
superset, then we cannot define some properties for batch,
that streaming doesn't have. But - if we just split it on the
part of semantics and on the part of runtime properties and
guarantees, than it is possible to define properties of batch,
that streaming doesn't have.
Jan
-Max
On 16.05.19 15:20, Aljoscha Krettek wrote:
> Hi,
>
> I think it’s helpful to consider that events never truly
arrive in order in the real world (you mentioned as much
yourself). For streaming use cases, there might be some
out-of-orderness (or a lot of it, depending on the use
case) so your implementation has to be able to deal with
that. On the other end of the spectrum we have batch use
cases, where out-of-orderness is potentially even bigger
because it allows for more efficient parallel execution.
If your implementation can deal with out-of-orderness that
also shouldn’t be a problem.
>
> Another angle is completeness vs. latency: you usually
cannot have both in a streaming world. If you want 100 %
completeness, i.e. you want to ensure that you process all
events and never drop anything, you can never advance the
watermark from its initial -Inf if you want to also never
have watermark violations. In typical use cases I would
expect any sorting guarantees to be constantly violated,
unless you are willing to drop late data.
>
> I think these are some reasons why there is no mention
of ordering by timestamp anywhere (unless I’m mistaken and
there is somewhere).
>
> You are right, of course, that batch-style runners can
use grouping/sorting for a GroupByKey operation. Flink
does that and even allows sorting by secondary key, so you
could manually sort by timestamp as a secondary key with
hardly any additional cost. However, exposing that in the
model would make implementing Runners quite hard, or they
would be prohibitively slow.
>
> You’re also right that user functions that do arbitrary
stateful operations can be quite dangerous and lead to
unexpected behaviour. You example of reacting to changes
in 0 and 1 would produce wrong results if events are not
100% sorted by timestamp. In general, state changes that
rely on processing order are problematic while operations
that move monotonously though some space are fine.
Examples of such operations are adding elements to a set
or summing numbers. If you “see” a given set of events you
can apply them to state in any order and as long as you
see the same set of events on different executions the
result will be the same.
>
> As for the Beam execution model in relation to
processing and time, I think the only “guarantees” are:
> - you will eventually see all events
> - the timestamp of those events is usually not less than
the watermark (but not always)
> - the watermark will advance when the system thinks you
won’t see events with a smaller timestamp in the future
(but you sometimes might)
>
> Those seem quite “poor”, but I think you can’t get
better guarantees for general cases for the reasons
mentioned above. Also, this is just of the top of my head
and I might be wrong in my understanding of the Beam
model. :-O
>
> Best,
> Aljoscha
>
>> On 16. May 2019, at 13:53, Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
>>
>> Hi,
>>
>> this is starting to be really exciting. It seems to me
that there is either something wrong with my definition of
"Unified model" or with how it is implemented inside (at
least) Direct and Flink Runners.
>>
>> So, first what I see as properties of Unified model:
>>
>> a) streaming semantics (i.e. what I can express using
Transforms) are subset of batch semantics
>>
>> - this is true, batch semantics and streaming semantics
differs only in that I can have GlobalWindow with default
trigger on batch and cannot on stream
>>
>> b) runtime conditions of batch have to be subset of
streaming conditions
>>
>> - this is because otherwise it might be intractable to
run streaming pipeline on batch engine
>>
>> - generally this is also true - in batch mode watermark
advances only between two states (-inf and +inf), which
makes it possible to turn (most) stateful operations into
group by key operations, and take advantage of many other
optimizations (ability to re-read inputs make it possible
to drop checkpointing, etc, etc)
>>
>> Now there is also one not so obvious runtime condition
of streaming engines - that is how skewed watermark and
event time of elements being processed can be - if this
gets too high (i.e. watermark is not moving, and/or
elements are very out-of-order, then the processing might
become intractable, because everything might have to be
buffered).
>>
>> On batch engines, this is generally not an issue,
because the buffering is eliminated by sorting - when a
Group by operation occurs, batch runners sort elements
with the same key to be together and therefore eliminate
the need for potentially infinite cache.
>>
>> When this turns out to be an issue, is whenever there
is a stateful ParDo operation, because then (without
sorting) there is violation of property b) - on streaming
engine the difference between element timestamp and
watermark will tend to be generally low (and late events
will be dropped to restrict the size of buffers), but on
batch it can be arbitrarily large and therefore size
buffers that would be needed is potentially unbounded.
>>
>> This line of thinking leads me to a conclusion, that if
Beam doesn't (on purpose) sort elements before stateful
ParDo by timestamp, then it basically violates the Unified
model, because pipelines with stateful ParDo will not
function properly on batch engines. Which is what I
observe - there is non determinism on batch pipeline
although everything seems to be "well defined", elements
arrive arbitrarily out of order and are arbitrarily out of
order dropped. This leads to different results everytime
batch pipeline is run.
>>
>> Looking forward to any comments on this.
>>
>> Jan
>>
>> On 5/16/19 10:53 AM, Aljoscha Krettek wrote:
>>> Please take this with a grain of salt, because I might
be a bit rusty on this.
>>>
>>> I think the Beam model does not prescribe any ordering
(by time or otherwise) on inputs. Mostly because always
requiring it would be prohibitively expensive on most
Runners, especially global sorting.
>>>
>>> If you want to have sorting by key, you could do a
GroupByKey and then sort the groups in memory. This only
works, of course, if your groups are not too large.
>>>
>>>> On 15. May 2019, at 21:01, Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
>>>>
>>>> Hmmm, looking into the code of FlinkRunner (and also
by observing results from the stateful ParDo), it seems,
that I got it wrong from the beginning. The data is not
sorted before the stateful ParDo, but that a little
surprises me. How the operator should work in this case?
It would mean, that in the batch case I have to hold
arbitrarily long allowedLateness inside the BagState,
which seems to be kind of suboptimal. Or am I missing
something obvious here? I'll describe the use case in more
detail, let's suppose I have a series of ones and zeros
and I want emit at each time point value of 1 if value
changes from 0 to 1, value of -1 if changes from 1 to 0
and 0 otherwise. So:
>>>>
>>>> 0, 1, 1, 0, 0, 1 -> 0, 1, 0, -1, 0, 1
>>>>
>>>> Does anyone have a better idea how to solve it? And
if not, how to make it running on batch, without possibly
infinite buffer? Should the input to stateful ParDo be
sorted in batch case? My intuition would be that it should
be, because in my understanding of "batch as a special
case of streaming" in batch case, there is (by default)
single window, time advances from -inf to +inf at the end,
and the data contains no out of order data, in places
where this might matter (which therefore enables some
optimizations). The order would be relevant only in the
stateful ParDo, I'd say.
>>>>
>>>> Jan
>>>>
>>>> On 5/15/19 8:34 PM, Jan Lukavský wrote:
>>>>> Just to clarify, I understand, that changing
semantics of the PCollection.isBounded, is probably
impossible now, because would probably introduce chicken
egg problem. Maybe I will state it more clearly - would it
be better to be able to run bounded pipelines using batch
semantics on DirectRunner (including sorting before
stateful ParDos), or would it be better to come up with
some way to notify the pipeline that it will be running in
a streaming way although it consists only of bounded
inputs? And I'm not saying how to do it, just trying to
find out if anyone else ever had such a need.
>>>>>
>>>>> Jan
>>>>>
>>>>> On 5/15/19 5:20 PM, Jan Lukavský wrote:
>>>>>> Hi,
>>>>>>
>>>>>> I have come across unexpected (at least for me)
behavior of some apparent inconsistency of how a
PCollection is processed in DirectRunner and what
PCollection.isBounded signals. Let me explain:
>>>>>>
>>>>>> - I have a stateful ParDo, which needs to make sure
that elements arrive in order - it accomplishes this by
defining BagState for buffering input elements and sorting
them inside this buffer, it also keeps track of element
with highest timestamp to somehow estimate local watermark
(minus some allowed lateness), to know when to remove
elements from the buffer, sort them by time and pass them
to some (time ordered) processing
>>>>>>
>>>>>> - this seems to work well for streaming (unbounded)
data
>>>>>>
>>>>>> - for batch (bounded) data the semantics of
stateful ParDo should be (please correct me if I'm wrong)
that elements always arrive in order, because the runner
can sort them by timestamp
>>>>>>
>>>>>> - this implies that for batch processed input
(bounded) the allowedLateness can be set to zero, so that
the processing is little more effective, because it
doesn't have to use the BagState at all
>>>>>>
>>>>>> - now, the trouble seems to be, that DirectRunner
always uses streaming processing, even if the input is
bounded (that is by definition possible), but there is no
way now to know when it is possible to change allowed
lateness to zero (because input will arrive ordered)
>>>>>>
>>>>>> - so - it seems to me, that either DirectRunner
should apply sorting to stateful ParDo, when it processes
bounded data (the same way that other runners do), or it
can apply streaming processing, but then it should change
PCollection.isBounded to UNBOUNDED, even if the input is
originally bounded
>>>>>>
>>>>>> - that way, the semantics of PCollection.isBounded,
would be not if the data are known in advance to be
finite, but *how* the data are going to be processed,
which is much more valuable (IMO)
>>>>>>
>>>>>> Any thoughts?
>>>>>>
>>>>>> Jan
>>>>>>
>