Hi Jan,

Thanks for the discussion. Aljoscha already gave great answers. Just a couple of remarks:

a) streaming semantics (i.e. what I can express using Transforms) are subset of batch semantics

I think you mean streaming is a superset of batch, or batch is a subset of streaming. This is the ideal. In practice, the two execution modes are sometimes accomplished by two different execution engines. Even in Flink, we have independent APIs for batch and streaming and the execution semantics are slightly different. For example, there are no watermarks in the batch API. Thus, batch rarely is simply an execution mode of streaming. However, I still think the unified Beam model works in both cases.

 batch semantics and streaming semantics differs only in that I can have 
GlobalWindow with default trigger on batch and cannot on stream

You can have a GlobalWindow in streaming with a default trigger. You could define additional triggers that do early firings. And you could even trigger the global window by advancing the watermark to +inf.

On batch engines, this is generally not an issue, because the buffering is eliminated by sorting - when a Group by operation occurs, batch runners sort elements with the same key to be together and therefore eliminate the need for potentially infinite cache.

The batch engines you normally use might do that. However, I do not see how sorting is an inherent property of the streaming model. We do not guarantee a deterministic order of events in streaming with respect to event time. In that regard, batch is a true subset of streaming because we make no guarantees on the order. Actually, because we only advance the watermark from -inf to +inf once we have read all data, this nicely aligns with the streaming model.

-Max

On 16.05.19 15:20, Aljoscha Krettek wrote:
Hi,

I think it’s helpful to consider that events never truly arrive in order in the 
real world (you mentioned as much yourself). For streaming use cases, there 
might be some out-of-orderness (or a lot of it, depending on the use case) so 
your implementation has to be able to deal with that. On the other end of the 
spectrum we have batch use cases, where out-of-orderness is potentially even 
bigger because it allows for more efficient parallel execution. If your 
implementation can deal with out-of-orderness that also shouldn’t be a problem.

Another angle is completeness vs. latency: you usually cannot have both in a 
streaming world. If you want 100 % completeness, i.e. you want to ensure that 
you process all events and never drop anything, you can never advance the  
watermark from its initial -Inf if you want to also never have watermark 
violations. In typical use cases I would expect any sorting guarantees to be 
constantly violated, unless you are willing to drop late data.

I think these are some reasons why there is no mention of ordering by timestamp 
anywhere (unless I’m mistaken and there is somewhere).

You are right, of course, that batch-style runners can use grouping/sorting for 
a GroupByKey operation. Flink does that and even allows sorting by secondary 
key, so you could manually sort by timestamp as a secondary key with hardly any 
additional cost. However, exposing that in the model would make implementing 
Runners quite hard, or they would be prohibitively slow.

You’re also right that user functions that do arbitrary stateful operations can 
be quite dangerous and lead to unexpected behaviour. You example of reacting to 
changes in 0 and 1 would produce wrong results if events are not 100% sorted by 
timestamp. In general, state changes that rely on processing order are 
problematic while operations that move monotonously though some space are fine. 
Examples of such operations are adding elements to a set or summing numbers. If 
you “see” a given set of events you can apply them to state in any order and as 
long as you see the same set of events on different executions the result will 
be the same.

As for the Beam execution model in relation to processing and time, I think the 
only “guarantees” are:
  - you will eventually see all events
  - the timestamp of those events is usually not less than the watermark (but 
not always)
  - the watermark will advance when the system thinks you won’t see events with 
a smaller timestamp in the future (but you sometimes might)

Those seem quite “poor”, but I think you can’t get better guarantees for 
general cases for the reasons mentioned above. Also, this is just of the top of 
my head and I might be wrong in my understanding of the Beam model. :-O

Best,
Aljoscha

On 16. May 2019, at 13:53, Jan Lukavský <je...@seznam.cz> wrote:

Hi,

this is starting to be really exciting. It seems to me that there is either something 
wrong with my definition of "Unified model" or with how it is implemented 
inside (at least) Direct and Flink Runners.

So, first what I see as properties of Unified model:

  a) streaming semantics (i.e. what I can express using Transforms) are subset 
of batch semantics

   - this is true, batch semantics and streaming semantics differs only in that 
I can have GlobalWindow with default trigger on batch and cannot on stream

  b) runtime conditions of batch have to be subset of streaming conditions

   - this is because otherwise it might be intractable to run streaming 
pipeline on batch engine

   - generally this is also true - in batch mode watermark advances only 
between two states (-inf and +inf), which makes it possible to turn (most) 
stateful operations into group by key operations, and take advantage of many 
other optimizations (ability to re-read inputs make it possible to drop 
checkpointing, etc, etc)

Now there is also one not so obvious runtime condition of streaming engines - 
that is how skewed watermark and event time of elements being processed can be 
- if this gets too high (i.e. watermark is not moving, and/or elements are very 
out-of-order, then the processing might become intractable, because everything 
might have to be buffered).

On batch engines, this is generally not an issue, because the buffering is 
eliminated by sorting - when a Group by operation occurs, batch runners sort 
elements with the same key to be together and therefore eliminate the need for 
potentially infinite cache.

When this turns out to be an issue, is whenever there is a stateful ParDo 
operation, because then (without sorting) there is violation of property b) - 
on streaming engine the difference between element timestamp and watermark will 
tend to be generally low (and late events will be dropped to restrict the size 
of buffers), but on batch it can be arbitrarily large and therefore size 
buffers that would be needed is potentially unbounded.

This line of thinking leads me to a conclusion, that if Beam doesn't (on purpose) sort 
elements before stateful ParDo by timestamp, then it basically violates the Unified 
model, because pipelines with stateful ParDo will not function properly on batch engines. 
Which is what I observe - there is non determinism on batch pipeline although everything 
seems to be "well defined", elements arrive arbitrarily out of order and are 
arbitrarily out of order dropped. This leads to different results everytime batch 
pipeline is run.

Looking forward to any comments on this.

Jan

On 5/16/19 10:53 AM, Aljoscha Krettek wrote:
Please take this with a grain of salt, because I might be a bit rusty on this.

I think the Beam model does not prescribe any ordering (by time or otherwise) 
on inputs. Mostly because always requiring it would be prohibitively expensive 
on most Runners, especially global sorting.

If you want to have sorting by key, you could do a GroupByKey and then sort the 
groups in memory. This only works, of course, if your groups are not too large.

On 15. May 2019, at 21:01, Jan Lukavský <je...@seznam.cz> wrote:

Hmmm, looking into the code of FlinkRunner (and also by observing results from 
the stateful ParDo), it seems, that I got it wrong from the beginning. The data 
is not sorted before the stateful ParDo, but that a little surprises me. How 
the operator should work in this case? It would mean, that in the batch case I 
have to hold arbitrarily long allowedLateness inside the BagState, which seems 
to be kind of suboptimal. Or am I missing something obvious here? I'll describe 
the use case in more detail, let's suppose I have a series of ones and zeros 
and I want emit at each time point value of 1 if value changes from 0 to 1, 
value of -1 if changes from 1 to 0 and 0 otherwise. So:

  0, 1, 1, 0, 0, 1 -> 0, 1, 0, -1, 0, 1

Does anyone have a better idea how to solve it? And if not, how to make it running on 
batch, without possibly infinite buffer? Should the input to stateful ParDo be sorted in 
batch case? My intuition would be that it should be, because in my understanding of 
"batch as a special case of streaming" in batch case, there is (by default) 
single window, time advances from -inf to +inf at the end, and the data contains no out 
of order data, in places where this might matter (which therefore enables some 
optimizations). The order would be relevant only in the stateful ParDo, I'd say.

Jan

On 5/15/19 8:34 PM, Jan Lukavský wrote:
Just to clarify, I understand, that changing semantics of the 
PCollection.isBounded,  is probably impossible now, because would probably 
introduce chicken egg problem. Maybe I will state it more clearly - would it be 
better to be able to run bounded pipelines using batch semantics on 
DirectRunner (including sorting before stateful ParDos), or would it be better 
to come up with some way to notify the pipeline that it will be running in a 
streaming way although it consists only of bounded inputs? And I'm not saying 
how to do it, just trying to find out if anyone else ever had such a need.

Jan

On 5/15/19 5:20 PM, Jan Lukavský wrote:
Hi,

I have come across unexpected (at least for me) behavior of some apparent 
inconsistency of how a PCollection is processed in DirectRunner and what 
PCollection.isBounded signals. Let me explain:

  - I have a stateful ParDo, which needs to make sure that elements arrive in 
order - it accomplishes this by defining BagState for buffering input elements 
and sorting them inside this buffer, it also keeps track of element with 
highest timestamp to somehow estimate local watermark (minus some allowed 
lateness), to know when to remove elements from the buffer, sort them by time 
and pass them to some (time ordered) processing

  - this seems to work well for streaming (unbounded) data

  - for batch (bounded) data the semantics of stateful ParDo should be (please 
correct me if I'm wrong) that elements always arrive in order, because the 
runner can sort them by timestamp

  - this implies that for batch processed input (bounded) the allowedLateness 
can be set to zero, so that the processing is little more effective, because it 
doesn't have to use the BagState at all

  - now, the trouble seems to be, that DirectRunner always uses streaming 
processing, even if the input is bounded (that is by definition possible), but 
there is no way now to know when it is possible to change allowed lateness to 
zero (because input will arrive ordered)

  - so - it seems to me, that either DirectRunner should apply sorting to 
stateful ParDo, when it processes bounded data (the same way that other runners 
do), or it can apply streaming processing, but then it should change 
PCollection.isBounded to UNBOUNDED, even if the input is originally bounded

  - that way, the semantics of PCollection.isBounded, would be not if the data 
are known in advance to be finite, but *how* the data are going to be 
processed, which is much more valuable (IMO)

Any thoughts?

  Jan


Reply via email to