In a lot of cases the initial combiner can dramatically reduce the amount
of data in this last phase making it tractable for a lot of use cases.

 I assume in your example the first phase would not provide enough
compression?

Cheers

Reza

On Tue, 21 May 2019, 16:47 Jan Lukavský, <[email protected]> wrote:

> Hi Reza, thanks for reaction, comments inline.
> On 5/21/19 1:02 AM, Reza Rokni wrote:
>
> Hi,
>
> If I have understood the use case correctly, your output is an ordered
> counter of state changes.
>
> One approach  which might be worth exploring is outlined below, haven't
> had a chance to test it so could be missing pieces or be plane old wrong (
> will try and come up with a test example later on to try it out).
>
> 1 - Window into a small enough Duration such that the number of
> elements in a window per key can be read into memory structure for sorting.
>
> 2 - GBK
> 3 - In a DoFn do the ordering and output a Timestamped<V> elements that
> contain the state changes for just that window and the value of the last
> element  {timestamp-00:00:00: (one: 1, zero: 0, lastElement : 0)}. This
> will cause memory pressure so your step 1 is important.
>
> This is just an optimization, right?
>
> 4- Window these outputs into the Global Window with a Stateful DoFn
>
> Because you finally have to do the stateful ParDo in Global window, you
> will end up with the same problem - the first three steps just might give
> you some extra time. But if you have enough data (long enough history, of
> very frequent changes, or both), then you will run into the same issues as
> without the optimization here. The BagState simply would not be able to
> hold all the data in batch case.
>
> Jan
>
> 5-  Add elements to a BagState in the stateful dofn
> 6 - In the Global Window set an EventTimer to fire at time boundaries that
> match the time window that you need. Note Timers do not have a read
> function for the time that they are set. (Here is one way to set metadata
> to emulate a read function
> <https://stackoverflow.com/questions/55912522/setting-a-timer-to-the-minimum-timestamp-seen/55912542#55912542>)
> Again this can cause memory pressure.
> 7 - At each OnTimer,
> 7a-  read and sort the elements in the BagState,
> 7b - True up the state changes with the cross-window state changes from
> the list.
> 7c - Store the last accumulator into a different State
>
> Sorry that was off the top of my head so could be missing things. For
> example LateData would need to be dealt with outside of this flow...
>
> Cheers
> Reza
>
> On Tue, 21 May 2019 at 07:00, Kenneth Knowles <[email protected]> wrote:
>
>> Thanks for the nice small example of a calculation that depends on order.
>> You are right that many state machines have this property. I agree w/ you
>> and Luke that it is convenient for batch processing to sort by event
>> timestamp before running a stateful ParDo. In streaming you could also
>> implement "sort by event timestamp" by buffering until you know all earlier
>> data will be dropped - a slack buffer up to allowed lateness.
>>
>> I do not think that it is OK to sort in batch and not in streaming. Many
>> state machines diverge very rapidly when things are out of order. So each
>> runner if they see the "@OrderByTimestamp" annotation (or whatever) needs
>> to deliver sorted data (by some mix of buffering and dropping), or to
>> reject the pipeline as unsupported.
>>
>> And also want to say that this is not the default case - many uses of
>> state & timers in ParDo yield different results at the element level, but
>> the results are equivalent at in the big picture. Such as the example of
>> "assign a unique sequence number to each element" or "group into batches"
>> it doesn't matter exactly what the result is, only that it meets the spec.
>> And other cases like user funnels are monotonic enough that you also don't
>> actually need sorting.
>>
>> Kenn
>>
>> On Mon, May 20, 2019 at 2:59 PM Jan Lukavský <[email protected]> wrote:
>>
>>> Yes, the problem will arise probably mostly when you have not well
>>> distributed keys (or too few keys). I'm really not sure if a pure GBK with
>>> a trigger can solve this - it might help to have data driven trigger. There
>>> would still be some doubts, though. The main question is still here -
>>> people say, that sorting by timestamp before stateful ParDo would be
>>> prohibitively slow, but I don't really see why - the sorting is very
>>> probably already there. And if not (hash grouping instead of sorted
>>> grouping), then the sorting would affect only user defined StatefulParDos.
>>>
>>> This would suggest that the best way out of this would be really to add
>>> annotation, so that the author of the pipeline can decide.
>>>
>>> If that would be acceptable I think I can try to prepare some basic
>>> functionality, but I'm not sure, if I would be able to cover all runners /
>>> sdks.
>>> On 5/20/19 11:36 PM, Lukasz Cwik wrote:
>>>
>>> It is read all per key and window and not just read all (this still
>>> won't scale with hot keys in the global window). The GBK preceding the
>>> StatefulParDo will guarantee that you are processing all the values for a
>>> specific key and window at any given time. Is there a specific
>>> window/trigger that is missing that you feel would remove the need for you
>>> to use StatefulParDo?
>>>
>>> On Mon, May 20, 2019 at 12:54 PM Jan Lukavský <[email protected]> wrote:
>>>
>>>> Hi Lukasz,
>>>>
>>>> > Today, if you must have a strict order, you must guarantee that your
>>>> StatefulParDo implements the necessary "buffering & sorting" into state.
>>>>
>>>> Yes, no problem with that. But this whole discussion started, because
>>>> *this doesn't work on batch*. You simply cannot first read everything from
>>>> distributed storage and then buffer it all into memory, just to read it
>>>> again, but sorted. That will not work. And even if it would, it would be a
>>>> terrible waste of resources.
>>>>
>>>> Jan
>>>> On 5/20/19 8:39 PM, Lukasz Cwik wrote:
>>>>
>>>>
>>>>
>>>> On Mon, May 20, 2019 at 8:24 AM Jan Lukavský <[email protected]> wrote:
>>>>
>>>>> This discussion brings many really interesting questions for me. :-)
>>>>>
>>>>>  > I don't see batch vs. streaming as part of the model. One can have
>>>>> microbatch, or even a runner that alternates between different modes.
>>>>>
>>>>> Although I understand motivation of this statement, this project name
>>>>> is
>>>>> "Apache Beam: An advanced unified programming model". What does the
>>>>> model unify, if "streaming vs. batch" is not part of the model?
>>>>>
>>>>> Using microbatching, chaining of batch jobs, or pure streaming are
>>>>> exactly the "runtime conditions/characteristics" I refer to. All these
>>>>> define several runtime parameters, which in turn define how well/badly
>>>>> will the pipeline perform and how many resources might be needed. From
>>>>> my point of view, pure streaming should be the most resource demanding
>>>>> (if not, why bother with batch? why not run everything in streaming
>>>>> only? what will there remain to "unify"?).
>>>>>
>>>>>  > Fortunately, for batch, only the state for a single key needs to be
>>>>> preserved at a time, rather than the state for all keys across the
>>>>> range
>>>>> of skew. Of course if you have few or hot keys, one can still have
>>>>> issues (and this is not specific to StatefulDoFns).
>>>>>
>>>>> Yes, but here is still the presumption that my stateful DoFn can
>>>>> tolerate arbitrary shuffling of inputs. Let me explain the use case in
>>>>> more detail.
>>>>>
>>>>> Suppose you have input stream consisting of 1s and 0s (and some key
>>>>> for
>>>>> each element, which is irrelevant for the demonstration). Your task is
>>>>> to calculate in running global window the actual number of changes
>>>>> between state 0 and state 1 and vice versa. When the state doesn't
>>>>> change, you don't calculate anything. If input (for given key) would
>>>>> be
>>>>> (tN denotes timestamp N):
>>>>>
>>>>>   t1: 1
>>>>>
>>>>>   t2: 0
>>>>>
>>>>>   t3: 0
>>>>>
>>>>>   t4: 1
>>>>>
>>>>>   t5: 1
>>>>>
>>>>>   t6: 0
>>>>>
>>>>> then the output should yield (supposing that default state is zero):
>>>>>
>>>>>   t1: (one: 1, zero: 0)
>>>>>
>>>>>   t2: (one: 1, zero: 1)
>>>>>
>>>>>   t3: (one: 1, zero: 1)
>>>>>
>>>>>   t4: (one: 2, zero: 1)
>>>>>
>>>>>   t5: (one: 2, zero: 1)
>>>>>
>>>>>   t6: (one: 2, zero: 2)
>>>>>
>>>>> How would you implement this in current Beam semantics?
>>>>>
>>>>
>>>> I think your saying here that I know that my input is ordered in a
>>>> specific way and since I assume the order when writing my pipeline I can
>>>> perform this optimization. But there is nothing preventing a runner from
>>>> noticing that your processing in the global window with a specific type of
>>>> trigger and re-ordering your inputs/processing to get better performance
>>>> (since you can't use an AfterWatermark trigger for your pipeline in
>>>> streaming for the GlobalWindow).
>>>>
>>>> Today, if you must have a strict order, you must guarantee that your
>>>> StatefulParDo implements the necessary "buffering & sorting" into state. I
>>>> can see why you would want an annotation that says I must have timestamp
>>>> ordered elements, since it makes writing certain StatefulParDos much
>>>> easier. StatefulParDo is a low-level function, it really is the "here you
>>>> go and do whatever you need to but here be dragons" function while
>>>> windowing and triggering is meant to keep many people from writing
>>>> StatefulParDo in the first place.
>>>>
>>>>
>>>>>  > Pipelines that fail in the "worst case" batch scenario are likely
>>>>> to
>>>>> degrade poorly (possibly catastrophically) when the watermark falls
>>>>> behind in streaming mode as well.
>>>>>
>>>>> But the worst case is defined by input of size (available resources +
>>>>> single byte) -> pipeline fail. Although it could have finished, given
>>>>> the right conditions.
>>>>>
>>>>>  > This might be reasonable, implemented by default by buffering
>>>>> everything and releasing elements as the watermark (+lateness)
>>>>> advances,
>>>>> but would likely lead to inefficient (though *maybe* easier to reason
>>>>> about) code.
>>>>>
>>>>> Sure, the pipeline will be less efficient, because it would have to
>>>>> buffer and sort the inputs. But at least it will produce correct
>>>>> results
>>>>> in cases where updates to state are order-sensitive.
>>>>>
>>>>>  > Would it be roughly equivalent to GBK + FlatMap(lambda (key,
>>>>> values):
>>>>> [(key, value) for value in values])?
>>>>>
>>>>> I'd say roughly yes, but difference would be in the trigger. The
>>>>> trigger
>>>>> should ideally fire as soon as watermark (+lateness) crosses element
>>>>> with lowest timestamp in the buffer. Although this could be somehow
>>>>> emulated by fixed trigger each X millis.
>>>>>
>>>>>  > Or is the underlying desire just to be able to hint to the runner
>>>>> that the code may perform better (e.g. require less resources) as skew
>>>>> is reduced (and hence to order by timestamp iff it's cheap)?
>>>>>
>>>>> No, the sorting would have to be done in streaming case as well. That
>>>>> is
>>>>> an imperative of the unified model. I think it is possible to sort by
>>>>> timestamp only in batch case (and do it for *all* batch stateful
>>>>> pardos
>>>>> without annotation), or introduce annotation, but then make the same
>>>>> guarantees for streaming case as well.
>>>>>
>>>>> Jan
>>>>>
>>>>> On 5/20/19 4:41 PM, Robert Bradshaw wrote:
>>>>> > On Mon, May 20, 2019 at 1:19 PM Jan Lukavský <[email protected]>
>>>>> wrote:
>>>>> >> Hi Robert,
>>>>> >>
>>>>> >> yes, I think you rephrased my point - although no *explicit*
>>>>> guarantees
>>>>> >> of ordering are given in either mode, there is *implicit* ordering
>>>>> in
>>>>> >> streaming case that is due to nature of the processing - the
>>>>> difference
>>>>> >> between watermark and timestamp of elements flowing through the
>>>>> pipeline
>>>>> >> are generally low (too high difference leads to the overbuffering
>>>>> >> problem), but there is no such bound on batch.
>>>>> > Fortunately, for batch, only the state for a single key needs to be
>>>>> > preserved at a time, rather than the state for all keys across the
>>>>> > range of skew. Of course if you have few or hot keys, one can still
>>>>> > have issues (and this is not specific to StatefulDoFns).
>>>>> >
>>>>> >> As a result, I see a few possible solutions:
>>>>> >>
>>>>> >>    - the best and most natural seems to be extension of the model,
>>>>> so
>>>>> >> that it defines batch as not only "streaming pipeline executed in
>>>>> batch
>>>>> >> fashion", but "pipeline with at least as good runtime
>>>>> characteristics as
>>>>> >> in streaming case, executed in batch fashion", I really don't think
>>>>> that
>>>>> >> there are any conflicts with the current model, or that this could
>>>>> >> affect performance, because the required sorting (as pointed by
>>>>> >> Aljoscha) is very probably already done during translation of
>>>>> stateful
>>>>> >> pardos. Also note that this definition only affects user defined
>>>>> >> stateful pardos
>>>>> > I don't see batch vs. streaming as part of the model. One can have
>>>>> > microbatch, or even a runner that alternates between different modes.
>>>>> > The model describes what the valid outputs are given a (sometimes
>>>>> > partial) set of inputs. It becomes really hard to define things like
>>>>> > "as good runtime characteristics." Once you allow any
>>>>> > out-of-orderedness, it is not very feasible to try and define (and
>>>>> > more cheaply implement) a "upper bound" of acceptable
>>>>> > out-of-orderedness.
>>>>> >
>>>>> > Pipelines that fail in the "worst case" batch scenario are likely to
>>>>> > degrade poorly (possibly catastrophically) when the watermark falls
>>>>> > behind in streaming mode as well.
>>>>> >
>>>>> >>    - another option would be to introduce annotation for DoFns (e.g.
>>>>> >> @RequiresStableTimeCharacteristics), which would result in the
>>>>> sorting
>>>>> >> in batch case - but - this extension would have to ensure the
>>>>> sorting in
>>>>> >> streaming mode also - it would require definition of allowed
>>>>> lateness,
>>>>> >> and triggger (essentially similar to window)
>>>>> > This might be reasonable, implemented by default by buffering
>>>>> > everything and releasing elements as the watermark (+lateness)
>>>>> > advances, but would likely lead to inefficient (though *maybe* easier
>>>>> > to reason about) code. Not sure about the semantics of triggering
>>>>> > here, especially data-driven triggers. Would it be roughly equivalent
>>>>> > to GBK + FlatMap(lambda (key, values): [(key, value) for value in
>>>>> > values])?
>>>>> >
>>>>> > Or is the underlying desire just to be able to hint to the runner
>>>>> that
>>>>> > the code may perform better (e.g. require less resources) as skew is
>>>>> > reduced (and hence to order by timestamp iff it's cheap)?
>>>>> >
>>>>> >>    - last option would be to introduce these "higher order
>>>>> guarantees" in
>>>>> >> some extension DSL (e.g. Euphoria), but that seems to be the worst
>>>>> >> option to me
>>>>> >>
>>>>> >> I see the first two options quite equally good, although the letter
>>>>> one
>>>>> >> is probably more time consuming to implement. But it would bring
>>>>> >> additional feature to streaming case as well.
>>>>> >>
>>>>> >> Thanks for any thoughts.
>>>>> >>
>>>>> >>    Jan
>>>>> >>
>>>>> >> On 5/20/19 12:41 PM, Robert Bradshaw wrote:
>>>>> >>> On Fri, May 17, 2019 at 4:48 PM Jan Lukavský <[email protected]>
>>>>> wrote:
>>>>> >>>> Hi Reuven,
>>>>> >>>>
>>>>> >>>>> How so? AFAIK stateful DoFns work just fine in batch runners.
>>>>> >>>> Stateful ParDo works in batch as far, as the logic inside the
>>>>> state works for absolutely unbounded out-of-orderness of elements. That
>>>>> basically (practically) can work only for cases, where the order of input
>>>>> elements doesn't matter. But, "state" can refer to "state machine", and 
>>>>> any
>>>>> time you have a state machine involved, then the ordering of elements 
>>>>> would
>>>>> matter.
>>>>> >>> No guarantees on order are provided in *either* streaming or batch
>>>>> >>> mode by the model. However, it is the case that in order to make
>>>>> >>> forward progress most streaming runners attempt to limit the
>>>>> amount of
>>>>> >>> out-of-orderedness of elements (in terms of event time vs.
>>>>> processing
>>>>> >>> time) to make forward progress, which in turn could help cap the
>>>>> >>> amount of state that must be held concurrently, whereas a batch
>>>>> runner
>>>>> >>> may not allow any state to be safely discarded until the whole
>>>>> >>> timeline from infinite past to infinite future has been observed.
>>>>> >>>
>>>>> >>> Also, as pointed out, state is not preserved "batch to batch" in
>>>>> batch mode.
>>>>> >>>
>>>>> >>>
>>>>> >>> On Thu, May 16, 2019 at 3:59 PM Maximilian Michels <[email protected]>
>>>>> wrote:
>>>>> >>>
>>>>> >>>>>    batch semantics and streaming semantics differs only in that
>>>>> I can have GlobalWindow with default trigger on batch and cannot on stream
>>>>> >>>> You can have a GlobalWindow in streaming with a default trigger.
>>>>> You
>>>>> >>>> could define additional triggers that do early firings. And you
>>>>> could
>>>>> >>>> even trigger the global window by advancing the watermark to +inf.
>>>>> >>> IIRC, as a pragmatic note, we prohibited global window with default
>>>>> >>> trigger on unbounded PCollections in the SDK because this is more
>>>>> >>> likely to be user error than an actual desire to have no output
>>>>> until
>>>>> >>> drain. But it's semantically valid in the model.
>>>>>
>>>>
>
> --
>
> This email may be confidential and privileged. If you received this
> communication by mistake, please don't forward it to anyone else, please
> erase all copies and attachments, and please let me know that it has gone
> to the wrong person.
>
> The above terms reflect a potential business arrangement, are provided
> solely as a basis for further discussion, and are not intended to be and do
> not constitute a legally binding obligation. No legally binding obligations
> will be created, implied, or inferred until an agreement in final form is
> executed in writing by all parties involved.
>
>

Reply via email to