Hmmm, looking into the code of FlinkRunner (and also by observing
results from the stateful ParDo), it seems, that I got it wrong from the
beginning. The data is not sorted before the stateful ParDo, but that a
little surprises me. How the operator should work in this case? It would
mean, that in the batch case I have to hold arbitrarily long
allowedLateness inside the BagState, which seems to be kind of
suboptimal. Or am I missing something obvious here? I'll describe the
use case in more detail, let's suppose I have a series of ones and zeros
and I want emit at each time point value of 1 if value changes from 0 to
1, value of -1 if changes from 1 to 0 and 0 otherwise. So:
0, 1, 1, 0, 0, 1 -> 0, 1, 0, -1, 0, 1
Does anyone have a better idea how to solve it? And if not, how to make
it running on batch, without possibly infinite buffer? Should the input
to stateful ParDo be sorted in batch case? My intuition would be that it
should be, because in my understanding of "batch as a special case of
streaming" in batch case, there is (by default) single window, time
advances from -inf to +inf at the end, and the data contains no out of
order data, in places where this might matter (which therefore enables
some optimizations). The order would be relevant only in the stateful
ParDo, I'd say.
Jan
On 5/15/19 8:34 PM, Jan Lukavský wrote:
Just to clarify, I understand, that changing semantics of the
PCollection.isBounded, is probably impossible now, because would
probably introduce chicken egg problem. Maybe I will state it more
clearly - would it be better to be able to run bounded pipelines using
batch semantics on DirectRunner (including sorting before stateful
ParDos), or would it be better to come up with some way to notify the
pipeline that it will be running in a streaming way although it
consists only of bounded inputs? And I'm not saying how to do it, just
trying to find out if anyone else ever had such a need.
Jan
On 5/15/19 5:20 PM, Jan Lukavský wrote:
Hi,
I have come across unexpected (at least for me) behavior of some
apparent inconsistency of how a PCollection is processed in
DirectRunner and what PCollection.isBounded signals. Let me explain:
- I have a stateful ParDo, which needs to make sure that elements
arrive in order - it accomplishes this by defining BagState for
buffering input elements and sorting them inside this buffer, it also
keeps track of element with highest timestamp to somehow estimate
local watermark (minus some allowed lateness), to know when to remove
elements from the buffer, sort them by time and pass them to some
(time ordered) processing
- this seems to work well for streaming (unbounded) data
- for batch (bounded) data the semantics of stateful ParDo should be
(please correct me if I'm wrong) that elements always arrive in
order, because the runner can sort them by timestamp
- this implies that for batch processed input (bounded) the
allowedLateness can be set to zero, so that the processing is little
more effective, because it doesn't have to use the BagState at all
- now, the trouble seems to be, that DirectRunner always uses
streaming processing, even if the input is bounded (that is by
definition possible), but there is no way now to know when it is
possible to change allowed lateness to zero (because input will
arrive ordered)
- so - it seems to me, that either DirectRunner should apply sorting
to stateful ParDo, when it processes bounded data (the same way that
other runners do), or it can apply streaming processing, but then it
should change PCollection.isBounded to UNBOUNDED, even if the input
is originally bounded
- that way, the semantics of PCollection.isBounded, would be not if
the data are known in advance to be finite, but *how* the data are
going to be processed, which is much more valuable (IMO)
Any thoughts?
Jan