In a lot of cases the initial combiner can dramatically reduce the amount of data in this last phase making it tractable for a lot of use cases.
I assume in your example the first phase would not provide enough compression? Cheers Reza On Tue, 21 May 2019, 16:47 Jan Lukavský, <[email protected]> wrote: > Hi Reza, thanks for reaction, comments inline. > On 5/21/19 1:02 AM, Reza Rokni wrote: > > Hi, > > If I have understood the use case correctly, your output is an ordered > counter of state changes. > > One approach which might be worth exploring is outlined below, haven't > had a chance to test it so could be missing pieces or be plane old wrong ( > will try and come up with a test example later on to try it out). > > 1 - Window into a small enough Duration such that the number of > elements in a window per key can be read into memory structure for sorting. > > 2 - GBK > 3 - In a DoFn do the ordering and output a Timestamped<V> elements that > contain the state changes for just that window and the value of the last > element {timestamp-00:00:00: (one: 1, zero: 0, lastElement : 0)}. This > will cause memory pressure so your step 1 is important. > > This is just an optimization, right? > > 4- Window these outputs into the Global Window with a Stateful DoFn > > Because you finally have to do the stateful ParDo in Global window, you > will end up with the same problem - the first three steps just might give > you some extra time. But if you have enough data (long enough history, of > very frequent changes, or both), then you will run into the same issues as > without the optimization here. The BagState simply would not be able to > hold all the data in batch case. > > Jan > > 5- Add elements to a BagState in the stateful dofn > 6 - In the Global Window set an EventTimer to fire at time boundaries that > match the time window that you need. Note Timers do not have a read > function for the time that they are set. (Here is one way to set metadata > to emulate a read function > <https://stackoverflow.com/questions/55912522/setting-a-timer-to-the-minimum-timestamp-seen/55912542#55912542>) > Again this can cause memory pressure. > 7 - At each OnTimer, > 7a- read and sort the elements in the BagState, > 7b - True up the state changes with the cross-window state changes from > the list. > 7c - Store the last accumulator into a different State > > Sorry that was off the top of my head so could be missing things. For > example LateData would need to be dealt with outside of this flow... > > Cheers > Reza > > On Tue, 21 May 2019 at 07:00, Kenneth Knowles <[email protected]> wrote: > >> Thanks for the nice small example of a calculation that depends on order. >> You are right that many state machines have this property. I agree w/ you >> and Luke that it is convenient for batch processing to sort by event >> timestamp before running a stateful ParDo. In streaming you could also >> implement "sort by event timestamp" by buffering until you know all earlier >> data will be dropped - a slack buffer up to allowed lateness. >> >> I do not think that it is OK to sort in batch and not in streaming. Many >> state machines diverge very rapidly when things are out of order. So each >> runner if they see the "@OrderByTimestamp" annotation (or whatever) needs >> to deliver sorted data (by some mix of buffering and dropping), or to >> reject the pipeline as unsupported. >> >> And also want to say that this is not the default case - many uses of >> state & timers in ParDo yield different results at the element level, but >> the results are equivalent at in the big picture. Such as the example of >> "assign a unique sequence number to each element" or "group into batches" >> it doesn't matter exactly what the result is, only that it meets the spec. >> And other cases like user funnels are monotonic enough that you also don't >> actually need sorting. >> >> Kenn >> >> On Mon, May 20, 2019 at 2:59 PM Jan Lukavský <[email protected]> wrote: >> >>> Yes, the problem will arise probably mostly when you have not well >>> distributed keys (or too few keys). I'm really not sure if a pure GBK with >>> a trigger can solve this - it might help to have data driven trigger. There >>> would still be some doubts, though. The main question is still here - >>> people say, that sorting by timestamp before stateful ParDo would be >>> prohibitively slow, but I don't really see why - the sorting is very >>> probably already there. And if not (hash grouping instead of sorted >>> grouping), then the sorting would affect only user defined StatefulParDos. >>> >>> This would suggest that the best way out of this would be really to add >>> annotation, so that the author of the pipeline can decide. >>> >>> If that would be acceptable I think I can try to prepare some basic >>> functionality, but I'm not sure, if I would be able to cover all runners / >>> sdks. >>> On 5/20/19 11:36 PM, Lukasz Cwik wrote: >>> >>> It is read all per key and window and not just read all (this still >>> won't scale with hot keys in the global window). The GBK preceding the >>> StatefulParDo will guarantee that you are processing all the values for a >>> specific key and window at any given time. Is there a specific >>> window/trigger that is missing that you feel would remove the need for you >>> to use StatefulParDo? >>> >>> On Mon, May 20, 2019 at 12:54 PM Jan Lukavský <[email protected]> wrote: >>> >>>> Hi Lukasz, >>>> >>>> > Today, if you must have a strict order, you must guarantee that your >>>> StatefulParDo implements the necessary "buffering & sorting" into state. >>>> >>>> Yes, no problem with that. But this whole discussion started, because >>>> *this doesn't work on batch*. You simply cannot first read everything from >>>> distributed storage and then buffer it all into memory, just to read it >>>> again, but sorted. That will not work. And even if it would, it would be a >>>> terrible waste of resources. >>>> >>>> Jan >>>> On 5/20/19 8:39 PM, Lukasz Cwik wrote: >>>> >>>> >>>> >>>> On Mon, May 20, 2019 at 8:24 AM Jan Lukavský <[email protected]> wrote: >>>> >>>>> This discussion brings many really interesting questions for me. :-) >>>>> >>>>> > I don't see batch vs. streaming as part of the model. One can have >>>>> microbatch, or even a runner that alternates between different modes. >>>>> >>>>> Although I understand motivation of this statement, this project name >>>>> is >>>>> "Apache Beam: An advanced unified programming model". What does the >>>>> model unify, if "streaming vs. batch" is not part of the model? >>>>> >>>>> Using microbatching, chaining of batch jobs, or pure streaming are >>>>> exactly the "runtime conditions/characteristics" I refer to. All these >>>>> define several runtime parameters, which in turn define how well/badly >>>>> will the pipeline perform and how many resources might be needed. From >>>>> my point of view, pure streaming should be the most resource demanding >>>>> (if not, why bother with batch? why not run everything in streaming >>>>> only? what will there remain to "unify"?). >>>>> >>>>> > Fortunately, for batch, only the state for a single key needs to be >>>>> preserved at a time, rather than the state for all keys across the >>>>> range >>>>> of skew. Of course if you have few or hot keys, one can still have >>>>> issues (and this is not specific to StatefulDoFns). >>>>> >>>>> Yes, but here is still the presumption that my stateful DoFn can >>>>> tolerate arbitrary shuffling of inputs. Let me explain the use case in >>>>> more detail. >>>>> >>>>> Suppose you have input stream consisting of 1s and 0s (and some key >>>>> for >>>>> each element, which is irrelevant for the demonstration). Your task is >>>>> to calculate in running global window the actual number of changes >>>>> between state 0 and state 1 and vice versa. When the state doesn't >>>>> change, you don't calculate anything. If input (for given key) would >>>>> be >>>>> (tN denotes timestamp N): >>>>> >>>>> t1: 1 >>>>> >>>>> t2: 0 >>>>> >>>>> t3: 0 >>>>> >>>>> t4: 1 >>>>> >>>>> t5: 1 >>>>> >>>>> t6: 0 >>>>> >>>>> then the output should yield (supposing that default state is zero): >>>>> >>>>> t1: (one: 1, zero: 0) >>>>> >>>>> t2: (one: 1, zero: 1) >>>>> >>>>> t3: (one: 1, zero: 1) >>>>> >>>>> t4: (one: 2, zero: 1) >>>>> >>>>> t5: (one: 2, zero: 1) >>>>> >>>>> t6: (one: 2, zero: 2) >>>>> >>>>> How would you implement this in current Beam semantics? >>>>> >>>> >>>> I think your saying here that I know that my input is ordered in a >>>> specific way and since I assume the order when writing my pipeline I can >>>> perform this optimization. But there is nothing preventing a runner from >>>> noticing that your processing in the global window with a specific type of >>>> trigger and re-ordering your inputs/processing to get better performance >>>> (since you can't use an AfterWatermark trigger for your pipeline in >>>> streaming for the GlobalWindow). >>>> >>>> Today, if you must have a strict order, you must guarantee that your >>>> StatefulParDo implements the necessary "buffering & sorting" into state. I >>>> can see why you would want an annotation that says I must have timestamp >>>> ordered elements, since it makes writing certain StatefulParDos much >>>> easier. StatefulParDo is a low-level function, it really is the "here you >>>> go and do whatever you need to but here be dragons" function while >>>> windowing and triggering is meant to keep many people from writing >>>> StatefulParDo in the first place. >>>> >>>> >>>>> > Pipelines that fail in the "worst case" batch scenario are likely >>>>> to >>>>> degrade poorly (possibly catastrophically) when the watermark falls >>>>> behind in streaming mode as well. >>>>> >>>>> But the worst case is defined by input of size (available resources + >>>>> single byte) -> pipeline fail. Although it could have finished, given >>>>> the right conditions. >>>>> >>>>> > This might be reasonable, implemented by default by buffering >>>>> everything and releasing elements as the watermark (+lateness) >>>>> advances, >>>>> but would likely lead to inefficient (though *maybe* easier to reason >>>>> about) code. >>>>> >>>>> Sure, the pipeline will be less efficient, because it would have to >>>>> buffer and sort the inputs. But at least it will produce correct >>>>> results >>>>> in cases where updates to state are order-sensitive. >>>>> >>>>> > Would it be roughly equivalent to GBK + FlatMap(lambda (key, >>>>> values): >>>>> [(key, value) for value in values])? >>>>> >>>>> I'd say roughly yes, but difference would be in the trigger. The >>>>> trigger >>>>> should ideally fire as soon as watermark (+lateness) crosses element >>>>> with lowest timestamp in the buffer. Although this could be somehow >>>>> emulated by fixed trigger each X millis. >>>>> >>>>> > Or is the underlying desire just to be able to hint to the runner >>>>> that the code may perform better (e.g. require less resources) as skew >>>>> is reduced (and hence to order by timestamp iff it's cheap)? >>>>> >>>>> No, the sorting would have to be done in streaming case as well. That >>>>> is >>>>> an imperative of the unified model. I think it is possible to sort by >>>>> timestamp only in batch case (and do it for *all* batch stateful >>>>> pardos >>>>> without annotation), or introduce annotation, but then make the same >>>>> guarantees for streaming case as well. >>>>> >>>>> Jan >>>>> >>>>> On 5/20/19 4:41 PM, Robert Bradshaw wrote: >>>>> > On Mon, May 20, 2019 at 1:19 PM Jan Lukavský <[email protected]> >>>>> wrote: >>>>> >> Hi Robert, >>>>> >> >>>>> >> yes, I think you rephrased my point - although no *explicit* >>>>> guarantees >>>>> >> of ordering are given in either mode, there is *implicit* ordering >>>>> in >>>>> >> streaming case that is due to nature of the processing - the >>>>> difference >>>>> >> between watermark and timestamp of elements flowing through the >>>>> pipeline >>>>> >> are generally low (too high difference leads to the overbuffering >>>>> >> problem), but there is no such bound on batch. >>>>> > Fortunately, for batch, only the state for a single key needs to be >>>>> > preserved at a time, rather than the state for all keys across the >>>>> > range of skew. Of course if you have few or hot keys, one can still >>>>> > have issues (and this is not specific to StatefulDoFns). >>>>> > >>>>> >> As a result, I see a few possible solutions: >>>>> >> >>>>> >> - the best and most natural seems to be extension of the model, >>>>> so >>>>> >> that it defines batch as not only "streaming pipeline executed in >>>>> batch >>>>> >> fashion", but "pipeline with at least as good runtime >>>>> characteristics as >>>>> >> in streaming case, executed in batch fashion", I really don't think >>>>> that >>>>> >> there are any conflicts with the current model, or that this could >>>>> >> affect performance, because the required sorting (as pointed by >>>>> >> Aljoscha) is very probably already done during translation of >>>>> stateful >>>>> >> pardos. Also note that this definition only affects user defined >>>>> >> stateful pardos >>>>> > I don't see batch vs. streaming as part of the model. One can have >>>>> > microbatch, or even a runner that alternates between different modes. >>>>> > The model describes what the valid outputs are given a (sometimes >>>>> > partial) set of inputs. It becomes really hard to define things like >>>>> > "as good runtime characteristics." Once you allow any >>>>> > out-of-orderedness, it is not very feasible to try and define (and >>>>> > more cheaply implement) a "upper bound" of acceptable >>>>> > out-of-orderedness. >>>>> > >>>>> > Pipelines that fail in the "worst case" batch scenario are likely to >>>>> > degrade poorly (possibly catastrophically) when the watermark falls >>>>> > behind in streaming mode as well. >>>>> > >>>>> >> - another option would be to introduce annotation for DoFns (e.g. >>>>> >> @RequiresStableTimeCharacteristics), which would result in the >>>>> sorting >>>>> >> in batch case - but - this extension would have to ensure the >>>>> sorting in >>>>> >> streaming mode also - it would require definition of allowed >>>>> lateness, >>>>> >> and triggger (essentially similar to window) >>>>> > This might be reasonable, implemented by default by buffering >>>>> > everything and releasing elements as the watermark (+lateness) >>>>> > advances, but would likely lead to inefficient (though *maybe* easier >>>>> > to reason about) code. Not sure about the semantics of triggering >>>>> > here, especially data-driven triggers. Would it be roughly equivalent >>>>> > to GBK + FlatMap(lambda (key, values): [(key, value) for value in >>>>> > values])? >>>>> > >>>>> > Or is the underlying desire just to be able to hint to the runner >>>>> that >>>>> > the code may perform better (e.g. require less resources) as skew is >>>>> > reduced (and hence to order by timestamp iff it's cheap)? >>>>> > >>>>> >> - last option would be to introduce these "higher order >>>>> guarantees" in >>>>> >> some extension DSL (e.g. Euphoria), but that seems to be the worst >>>>> >> option to me >>>>> >> >>>>> >> I see the first two options quite equally good, although the letter >>>>> one >>>>> >> is probably more time consuming to implement. But it would bring >>>>> >> additional feature to streaming case as well. >>>>> >> >>>>> >> Thanks for any thoughts. >>>>> >> >>>>> >> Jan >>>>> >> >>>>> >> On 5/20/19 12:41 PM, Robert Bradshaw wrote: >>>>> >>> On Fri, May 17, 2019 at 4:48 PM Jan Lukavský <[email protected]> >>>>> wrote: >>>>> >>>> Hi Reuven, >>>>> >>>> >>>>> >>>>> How so? AFAIK stateful DoFns work just fine in batch runners. >>>>> >>>> Stateful ParDo works in batch as far, as the logic inside the >>>>> state works for absolutely unbounded out-of-orderness of elements. That >>>>> basically (practically) can work only for cases, where the order of input >>>>> elements doesn't matter. But, "state" can refer to "state machine", and >>>>> any >>>>> time you have a state machine involved, then the ordering of elements >>>>> would >>>>> matter. >>>>> >>> No guarantees on order are provided in *either* streaming or batch >>>>> >>> mode by the model. However, it is the case that in order to make >>>>> >>> forward progress most streaming runners attempt to limit the >>>>> amount of >>>>> >>> out-of-orderedness of elements (in terms of event time vs. >>>>> processing >>>>> >>> time) to make forward progress, which in turn could help cap the >>>>> >>> amount of state that must be held concurrently, whereas a batch >>>>> runner >>>>> >>> may not allow any state to be safely discarded until the whole >>>>> >>> timeline from infinite past to infinite future has been observed. >>>>> >>> >>>>> >>> Also, as pointed out, state is not preserved "batch to batch" in >>>>> batch mode. >>>>> >>> >>>>> >>> >>>>> >>> On Thu, May 16, 2019 at 3:59 PM Maximilian Michels <[email protected]> >>>>> wrote: >>>>> >>> >>>>> >>>>> batch semantics and streaming semantics differs only in that >>>>> I can have GlobalWindow with default trigger on batch and cannot on stream >>>>> >>>> You can have a GlobalWindow in streaming with a default trigger. >>>>> You >>>>> >>>> could define additional triggers that do early firings. And you >>>>> could >>>>> >>>> even trigger the global window by advancing the watermark to +inf. >>>>> >>> IIRC, as a pragmatic note, we prohibited global window with default >>>>> >>> trigger on unbounded PCollections in the SDK because this is more >>>>> >>> likely to be user error than an actual desire to have no output >>>>> until >>>>> >>> drain. But it's semantically valid in the model. >>>>> >>>> > > -- > > This email may be confidential and privileged. If you received this > communication by mistake, please don't forward it to anyone else, please > erase all copies and attachments, and please let me know that it has gone > to the wrong person. > > The above terms reflect a potential business arrangement, are provided > solely as a basis for further discussion, and are not intended to be and do > not constitute a legally binding obligation. No legally binding obligations > will be created, implied, or inferred until an agreement in final form is > executed in writing by all parties involved. > >
