Please take this with a grain of salt, because I might be a bit rusty on this.

I think the Beam model does not prescribe any ordering (by time or otherwise) 
on inputs. Mostly because always requiring it would be prohibitively expensive 
on most Runners, especially global sorting.

If you want to have sorting by key, you could do a GroupByKey and then sort the 
groups in memory. This only works, of course, if your groups are not too large.

> On 15. May 2019, at 21:01, Jan Lukavský <je...@seznam.cz> wrote:
> 
> Hmmm, looking into the code of FlinkRunner (and also by observing results 
> from the stateful ParDo), it seems, that I got it wrong from the beginning. 
> The data is not sorted before the stateful ParDo, but that a little surprises 
> me. How the operator should work in this case? It would mean, that in the 
> batch case I have to hold arbitrarily long allowedLateness inside the 
> BagState, which seems to be kind of suboptimal. Or am I missing something 
> obvious here? I'll describe the use case in more detail, let's suppose I have 
> a series of ones and zeros and I want emit at each time point value of 1 if 
> value changes from 0 to 1, value of -1 if changes from 1 to 0 and 0 
> otherwise. So:
> 
>  0, 1, 1, 0, 0, 1 -> 0, 1, 0, -1, 0, 1
> 
> Does anyone have a better idea how to solve it? And if not, how to make it 
> running on batch, without possibly infinite buffer? Should the input to 
> stateful ParDo be sorted in batch case? My intuition would be that it should 
> be, because in my understanding of "batch as a special case of streaming" in 
> batch case, there is (by default) single window, time advances from -inf to 
> +inf at the end, and the data contains no out of order data, in places where 
> this might matter (which therefore enables some optimizations). The order 
> would be relevant only in the stateful ParDo, I'd say.
> 
> Jan
> 
> On 5/15/19 8:34 PM, Jan Lukavský wrote:
>> Just to clarify, I understand, that changing semantics of the 
>> PCollection.isBounded,  is probably impossible now, because would probably 
>> introduce chicken egg problem. Maybe I will state it more clearly - would it 
>> be better to be able to run bounded pipelines using batch semantics on 
>> DirectRunner (including sorting before stateful ParDos), or would it be 
>> better to come up with some way to notify the pipeline that it will be 
>> running in a streaming way although it consists only of bounded inputs? And 
>> I'm not saying how to do it, just trying to find out if anyone else ever had 
>> such a need.
>> 
>> Jan
>> 
>> On 5/15/19 5:20 PM, Jan Lukavský wrote:
>>> Hi,
>>> 
>>> I have come across unexpected (at least for me) behavior of some apparent 
>>> inconsistency of how a PCollection is processed in DirectRunner and what 
>>> PCollection.isBounded signals. Let me explain:
>>> 
>>>  - I have a stateful ParDo, which needs to make sure that elements arrive 
>>> in order - it accomplishes this by defining BagState for buffering input 
>>> elements and sorting them inside this buffer, it also keeps track of 
>>> element with highest timestamp to somehow estimate local watermark (minus 
>>> some allowed lateness), to know when to remove elements from the buffer, 
>>> sort them by time and pass them to some (time ordered) processing
>>> 
>>>  - this seems to work well for streaming (unbounded) data
>>> 
>>>  - for batch (bounded) data the semantics of stateful ParDo should be 
>>> (please correct me if I'm wrong) that elements always arrive in order, 
>>> because the runner can sort them by timestamp
>>> 
>>>  - this implies that for batch processed input (bounded) the 
>>> allowedLateness can be set to zero, so that the processing is little more 
>>> effective, because it doesn't have to use the BagState at all
>>> 
>>>  - now, the trouble seems to be, that DirectRunner always uses streaming 
>>> processing, even if the input is bounded (that is by definition possible), 
>>> but there is no way now to know when it is possible to change allowed 
>>> lateness to zero (because input will arrive ordered)
>>> 
>>>  - so - it seems to me, that either DirectRunner should apply sorting to 
>>> stateful ParDo, when it processes bounded data (the same way that other 
>>> runners do), or it can apply streaming processing, but then it should 
>>> change PCollection.isBounded to UNBOUNDED, even if the input is originally 
>>> bounded
>>> 
>>>  - that way, the semantics of PCollection.isBounded, would be not if the 
>>> data are known in advance to be finite, but *how* the data are going to be 
>>> processed, which is much more valuable (IMO)
>>> 
>>> Any thoughts?
>>> 
>>>  Jan
>>> 

Reply via email to