Yes, I am suggesting to add more intelligent state data structures for just that sort of join. I tagged Reza because his work basically does it, but explicitly pulls a BagState into memory and sorts it. We just need to avoid that. It is the sort of thing that already exists in some engines so there's proof of concept :-). Jan makes the good point that executing the same join in batch you wouldn't use the same algorithm, because the disorder will be unbounded. In Beam you'd want a PTransform that expands differently based on whether the inputs are bounded or unbounded.
Kenn On Tue, Nov 26, 2019 at 4:16 AM David Morávek <[email protected]> wrote: > Yes, in batch case with long-term historical data, this would be O(n^2) as > it basically a bubble sort. If you have large # of updates for a single > key, this would be super expensive. > > Kenn, can this be re-implemented with your solution? > > On Tue, Nov 26, 2019 at 1:10 PM Jan Lukavský <[email protected]> wrote: > >> Functionally yes. But this straightforward solution is not working for me >> for two main reasons: >> >> - it either blows state in batch case or the time complexity of the sort >> would be O(n^2) (and reprocessing several years of dense time-series data >> makes it a no go) >> >> - it is not reusable for different time-ordering needs, because the >> logic implemented purely in user-space cannot be transferred to different >> problem (there are two states needed, one for buffer, the other for >> user-state) and extending DoFns does not work (cannot create abstract >> SortedDoFn, because of the state annotation definitions) >> >> Jan >> On 11/26/19 12:56 PM, David Morávek wrote: >> >> Hi, >> >> I think what Jan has in mind would look something like this >> <https://gist.github.com/dmvk/3ea32eb36c6406fa72d70b9b1df1d878>, if >> implemented in user code. Am I right? >> >> D. >> >> >> On Tue, Nov 26, 2019 at 10:23 AM Jan Lukavský <[email protected]> wrote: >> >>> >>> On 11/25/19 11:45 PM, Kenneth Knowles wrote: >>> >>> >>> >>> On Mon, Nov 25, 2019 at 1:56 PM Jan Lukavský <[email protected]> wrote: >>> >>>> Hi Rui, >>>> >>>> > Hi Kenn, you think stateful DoFn based join can emit joined rows that >>>> never to be retracted because in stateful DoFn case joined rows will be >>>> controlled by timers and emit will be only once? If so I will agree with >>>> it. Generally speaking, if only emit once is the factor of needing >>>> retraction or not. >>>> >>>> that would imply buffering elements up until watermark, then sorting >>>> and so reduces to the option a) again, is that true? This also has to deal >>>> with allowed lateness, that would mean, that with allowed lateness greater >>>> than zero, there can still be multiple firings and so retractions are >>>> needed. >>>> >>> Specifically, when I say "bi-temporal join" I mean >>> unbounded-to-unbounded join where one of the join conditions is that >>> elements are within event time distance d of one another. An element at >>> time t will be saved until time t + 2d and then garbage collected. Every >>> matching pair can be emitted immediately. >>> >>> OK, this might simplify things a little. Is there a design doc for that? >>> If there are multiple LHS elements within event time distance from RHS >>> element, which one should be joined? I suppose all of them, but that is not >>> "(time-varying-)relational" join semantics. In that semantics only the last >>> element must be joined, because that is how a (classical) relational >>> database would see the relation at time T (the old record would have been >>> overwritten and not be part of the output). Because of the time distance >>> constraint this is different from the join I have in mind, because that >>> simply joins every LHS element(s) to most recent RHS element(s) and vice >>> versa, without any additional time constraints (that is the RHS "update" >>> can happen arbitrarily far in past). >>> >>> Jan >>> >>> >>> In the triggered CoGBK + join-product implementation, you do need >>> retractions as a model concept. But you don't need full support, since they >>> only need to be shipped as deltas and only from the CoGBK to the >>> join-product transform where they are all consumed to create only positive >>> elements. Again a delay is not required; this yields correct results with >>> the "always" trigger. >>> >>> Neither case requires waiting or time sorting a whole buffer. The >>> bi-temporal join requires something more, in a way, since you need to query >>> by time range and GC time prefixes. >>> >>> Kenn >>> >>> Jan >>>> On 11/25/19 10:17 PM, Rui Wang wrote: >>>> >>>> >>>> >>>> On Mon, Nov 25, 2019 at 11:29 AM Jan Lukavský <[email protected]> wrote: >>>> >>>>> >>>>> On 11/25/19 7:47 PM, Kenneth Knowles wrote: >>>>> >>>>> >>>>> >>>>> On Sun, Nov 24, 2019 at 12:57 AM Jan Lukavský <[email protected]> wrote: >>>>> >>>>>> I can put down a design document, but before that I need to clarify >>>>>> some things for me. I'm struggling to put all of this into a bigger >>>>>> picture. Sorry if the arguments are circulating, but I didn't notice any >>>>>> proposal of how to solve these. If anyone can disprove any of this logic >>>>>> it >>>>>> would be very much appreciated as I might be able to get from a dead end: >>>>>> >>>>>> a) in the bi-temporal join you can either buffer until watermark, or >>>>>> emit false data that has to be retracted >>>>>> >>>>> This is not the case. A stateful DoFn based join can emit immediately >>>>> joined rows that will never need to be retracted. The need for retractions >>>>> has to do with CoGBK-based implementation of a join. >>>>> >>>>> I fail to see how this could work. If I emit joined rows immediately >>>>> without waiting for watermark to pass, I can join two elements, that don't >>>>> belong to each other, because later can arrive element with lower time >>>>> distance, that should have been joint in the place of the previously >>>>> emitted one. This is wrong result that has to be retracted. Or what I'm >>>>> missing? >>>>> >>>> >>>> Hi Kenn, you think stateful DoFn based join can emit joined rows that >>>> never to be retracted because in stateful DoFn case joined rows will be >>>> controlled by timers and emit will be only once? If so I will agree with >>>> it. Generally speaking, if only emit once is the factor of needing >>>> retraction or not. >>>> >>>> In the past brainstorming, even having retractions ready, streaming >>>> join with windowing are likely be implemented by a style of CoGBK + >>>> stateful DoFn. >>>> >>>> >>>> >>>> I suggest that you work out the definition of the join you are >>>>> interested in, with a good amount of mathematical rigor, and then consider >>>>> the ways you can implement it. That is where a design doc will probably >>>>> clarify things. >>>>> >>>>> Kenn >>>>> >>>>> b) until retractions are 100% functional (and that is sort of holy >>>>>> grail for now), then the only solution is using a buffer holding data up >>>>>> to >>>>>> watermark *and then sort by event time* >>>>>> >>>>> c) even if retractions were 100% functional, there would have to be >>>>>> special implementation for batch case, because otherwise this would >>>>>> simply >>>>>> blow up downstream processing with insanely many false additions and >>>>>> subsequent retractions >>>>>> >>>>>> Property b) means that if we want this feature now, we must sort by >>>>>> event time and there is no way around. Property c) shows that even in the >>>>>> future, we must make (in certain cases) distinction between batch and >>>>>> streaming code paths, which seems weird to me, but it might be an option. >>>>>> But still, there is no way to express this join in batch case, because it >>>>>> would require either buffering (up to) whole input on local worker >>>>>> (doesn't >>>>>> look like viable option) or provide a way in user code to signal the need >>>>>> for ordering of data inside GBK (and we are there again :)). Yes, we >>>>>> might >>>>>> shift this need from stateful dofn to GBK like >>>>>> >>>>>> input.apply(GroupByKey.sorted()) >>>>>> >>>>>> I cannot find a good reasoning why this would be better than giving >>>>>> this semantics to (stateful) ParDo. >>>>>> >>>>>> Maybe someone can help me out here? >>>>>> >>>>>> Jan >>>>>> On 11/24/19 5:05 AM, Kenneth Knowles wrote: >>>>>> >>>>>> I don't actually see how event time sorting simplifies this case >>>>>> much. You still need to buffer elements until they can no longer be >>>>>> matched >>>>>> in the join, and you still need to query that buffer for elements that >>>>>> might match. The general "bi-temporal join" (without sorting) requires >>>>>> one >>>>>> new state type and then it has identical API, does not require any novel >>>>>> data structures or reasoning, yields better latency (no sort buffer >>>>>> delay), >>>>>> and discards less data (no sort buffer cutoff; watermark is better). >>>>>> Perhaps a design document about this specific case would clarify. >>>>>> >>>>>> Kenn >>>>>> >>>>>> On Fri, Nov 22, 2019 at 10:08 PM Jan Lukavský <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> I didn't want to go too much into detail, but to describe the idea >>>>>>> roughly (ignoring the problem of different window fns on both sides to >>>>>>> keep >>>>>>> it as simple as possible): >>>>>>> >>>>>>> rhs ----- \ >>>>>>> >>>>>>> flatten (on global window) ---- stateful par do >>>>>>> (sorted by event time) ---- output >>>>>>> >>>>>>> lhs ----- / >>>>>>> >>>>>>> If we can guarantee event time order arrival of events into the >>>>>>> stateful pardo, then the whole complexity reduces to keep current value >>>>>>> of >>>>>>> left and right element and just flush them out each time there is an >>>>>>> update. That is the "knob" is actually when watermark moves, because it >>>>>>> is >>>>>>> what tells the join operation that there will be no more (not late) >>>>>>> input. >>>>>>> This is very, very simplified, but depicts the solution. The "classical" >>>>>>> windowed join reduces to this if all data in each window is projected >>>>>>> onto >>>>>>> window end boundary. Then there will be a cartesian product, because all >>>>>>> the elements have the same timestamp. I can put this into a design doc >>>>>>> with >>>>>>> all the details, I was trying to find out if there is or was any effort >>>>>>> around this. >>>>>>> >>>>>>> I was in touch with Reza in the PR #9032, I think that it currently >>>>>>> suffers from problems with running this on batch. >>>>>>> >>>>>>> I think I can even (partly) resolve the retraction issue (for >>>>>>> joins), as described on the thread [1]. Shortly, there can be two >>>>>>> copies of >>>>>>> the stateful dofn, one running at watermark and the other at (watermark >>>>>>> - >>>>>>> allowed lateness). One would produce ON_TIME (maybe wrong) results, the >>>>>>> other would produce LATE but correct ones. Being able to compare them, >>>>>>> the >>>>>>> outcome would be that it would be possible to retract the wrong results. >>>>>>> >>>>>>> Yes, this is also about providing more evidence of why I think >>>>>>> event-time sorting should be (somehow) part of the model. :-) >>>>>>> >>>>>>> Jan >>>>>>> >>>>>>> [1] >>>>>>> https://lists.apache.org/thread.html/a736ebd6b0913d2e06dfa54797716d022adf2c45140530d5c3bd538d@%3Cdev.beam.apache.org%3E >>>>>>> On 11/23/19 5:54 AM, Kenneth Knowles wrote: >>>>>>> >>>>>>> +Mikhail Gryzykhin <[email protected]> +Rui Wang <[email protected]> >>>>>>> +Reza Rokni <[email protected]> who have all done some investigations >>>>>>> here. >>>>>>> >>>>>>> >>>>>>> On Fri, Nov 22, 2019 at 11:48 AM Jan Lukavský <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> >>>>>>>> On 11/22/19 7:54 PM, Reuven Lax wrote: >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Nov 22, 2019 at 10:19 AM Jan Lukavský <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Reuven, >>>>>>>>> >>>>>>>>> I didn't investigate that particular one, but looking into that >>>>>>>>> now, it looks that is (same as the "classic" join library) builds >>>>>>>>> around >>>>>>>>> CoGBK. Is that correct? If yes, then it essentially means that it: >>>>>>>>> >>>>>>>> - works only for cases where both sides have the same windowfn >>>>>>>>> (that is limitation of Flatten that precedes CoGBK) >>>>>>>>> >>>>>>>> Correct. Did you want to join different windows? If so what are the >>>>>>>> semantics? If the lhs has FixedWindows and the rhs has SessionWindows, >>>>>>>> what >>>>>>>> do you want the join semantics to be? The only thing I could imagine >>>>>>>> would >>>>>>>> be for the user to provide some function telling the join how to map >>>>>>>> the >>>>>>>> windows together, but that could be pretty complicated. >>>>>>>> >>>>>>>> I don't want to go too far into details, but generally both lhs and >>>>>>>> rhs can be put onto time line and then full join can be defined as each >>>>>>>> pair of (lhs, first preceding rhs) and (rhs, first preceding lhs). >>>>>>>> Then the >>>>>>>> end of window is semantically just clearing the joined value (setting >>>>>>>> it to >>>>>>>> null, thus at the end of window there will be pair (lhs, null) or >>>>>>>> (null, >>>>>>>> rhs) in case of full outer join). This way any combination of windows >>>>>>>> is >>>>>>>> possible, because all window does is that it "scopes" validity of >>>>>>>> respective values (lhs, rhs). >>>>>>>> >>>>>>> >>>>>>> I think it is very valid to hope to do a join in the sense of a >>>>>>> relational join where it is row-to-row. In this case, Beam's concept of >>>>>>> windowing may or may not make sense. It is just a tool for the job. It >>>>>>> is >>>>>>> just a grouping key that provides a time when state can be deleted. So I >>>>>>> would say your use case is more global window to global window join. >>>>>>> That >>>>>>> is what I think of as a true stream-to-stream join anyhow. You probably >>>>>>> don't want to wait forever for output. So you'll need to use some knob >>>>>>> other than Beam windows or triggers. >>>>>>> >>>>>>>> Reza has prototyped a join like you describe here: >>>>>>> https://github.com/apache/beam/pull/9032 >>>>>>> >>>>>>> If your join condition explicitly includes the event time distance >>>>>>> between elements, then it could "just work". If that isn't really part >>>>>>> of >>>>>>> your join condition, then you will have to see this restriction as a >>>>>>> "knob" >>>>>>> that you tweak on your results. >>>>>>> >>>>>>>> - when using global window, there has to be trigger and (afaik) >>>>>>>>> there is no trigger that would guarantee firing after each data >>>>>>>>> element >>>>>>>>> (for early panes) (because triggers are there to express cost-latency >>>>>>>>> tradeoff, not semantics) >>>>>>>>> >>>>>>>> >>>>>>>> Can you explain the use case where this matters? If you do trigger >>>>>>>> elementCountAtLeast(1) on the join, then the consumer will simply see a >>>>>>>> continuous stream of outputs. I'm not sure I understand why the >>>>>>>> consumer >>>>>>>> cares that some of those outputs were in a pane that really held 3 >>>>>>>> outputs >>>>>>>> instead of 1. >>>>>>>> >>>>>>>> What I'm trying to solve is basically this: >>>>>>>> >>>>>>>> - lhs is event stream >>>>>>>> >>>>>>>> - rhs is stream of a "state updates" >>>>>>>> >>>>>>>> purpose of the join is "take each event, pair it with currently >>>>>>>> valid state and produce output and possibly modified state". I cannot >>>>>>>> process two events at a time, because first event can modify the state >>>>>>>> and >>>>>>>> the subsequent event should see this. It is not a "simple" stateful >>>>>>>> pardo >>>>>>>> either, because the state can be modified externally (not going into >>>>>>>> too >>>>>>>> much detail here, but e.g. by writing into kafka topic). >>>>>>>> >>>>>>> Reuven's explanation is missing some detail. If the CoGBK is in >>>>>>> discarding mode, then it will miss join results. If the CoGBK is in >>>>>>> accumulating mode, it will duplicate join results. This is a known >>>>>>> problem >>>>>>> and the general solution is retractions. >>>>>>> >>>>>>> Basically, CoGBK-based joins just don't work with triggers until we >>>>>>> have retractions. >>>>>>> >>>>>>> >>>>>>> >>>>>>>> Moreover, I'd like to define the join semantics so that when there >>>>>>>>> are available elements from both sides, the fired pane should be >>>>>>>>> ON_TIME, >>>>>>>>> not EARLY. That essentially means that the fully general case would >>>>>>>>> not be >>>>>>>>> built around (Co)GBK, but stateful ParDo. There are specific options >>>>>>>>> where >>>>>>>>> this fully general case "degrades" into forms that can be efficiently >>>>>>>>> expressed using (Co)GBK, that is true. >>>>>>>>> >>>>>>>> >>>>>>>> BTW building this around stateful DoFn might be a better fit. The >>>>>>>> main reason I didn't is because we would need a good distributed >>>>>>>> MapState >>>>>>>> (something discussed fairly recently on the list), and that is not yet >>>>>>>> built. Once we had that, I might be inclined to rewrite this join on >>>>>>>> stateful DoFn. >>>>>>>> >>>>>>>> Yes, the sorted state helps for streaming case. But I'd be careful >>>>>>>> about that for batch case, where this might lead to high pressure on >>>>>>>> the >>>>>>>> state (and InMemoryStateInternals might OOME for instance). >>>>>>>> >>>>>>>> >>>>>>>> However can you explain what you are expecting from the pane? An >>>>>>>> EARLY pane simply means that we are producing output before the end of >>>>>>>> the >>>>>>>> window. If you are in the global window triggering every element, then >>>>>>>> every output is EARLY. It might seem weird if you are interpreting >>>>>>>> EARLY as >>>>>>>> "outputting data that isn't ready," however that's not what EARLY is >>>>>>>> defined to be. Any change to the pane semantics would be a major >>>>>>>> breaking >>>>>>>> change to very fundamental semantics. >>>>>>>> >>>>>>>> I wonder if you are really objecting to the name EARLY and ON_TIME? >>>>>>>> Maybe we would've been better off tagging it BEFORE_WINDOW_END instead >>>>>>>> of >>>>>>>> EARLY, to make it clear what is meant? >>>>>>>> >>>>>>>> Essentially I don't object anything here. I'm missing solution to >>>>>>>> the "event vs. state" join described above. I was thinking about how to >>>>>>>> make these types of problems more user friendly and it essentially >>>>>>>> leads to >>>>>>>> creating a somewhat more generic semantics of join, where >>>>>>>> end-of-window is >>>>>>>> converted into "'value-delete events" and then just joining by the >>>>>>>> "previous" or "valid" value (yes, this relates to validity windows >>>>>>>> mentioned on Beam Summit Europe). It actually turns out that with some >>>>>>>> work >>>>>>>> we could define quite "naturally" a join on two streams with global >>>>>>>> window >>>>>>>> and no trigger. It would even function with lowest latency possible >>>>>>>> (but >>>>>>>> yes, with the highest expenses, it is actually the introduction of >>>>>>>> (same!) >>>>>>>> windows that enable certain optimizations). It the correctly defines >>>>>>>> semantics for different windows, although the result would be (probably >>>>>>>> unexpectedly) windowed using global window. But that doesn't seem to >>>>>>>> be any >>>>>>>> breaking change, because it is currently not possible (any such >>>>>>>> pipeline >>>>>>>> will not be validated). >>>>>>>> >>>>>>>> Maybe for reference, the unwindowed join would be what is described >>>>>>>> here [1] >>>>>>>> >>>>>>>> [1] >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-KStream-KTableJoin >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>>> Jan >>>>>>>>> On 11/22/19 6:47 PM, Reuven Lax wrote: >>>>>>>>> >>>>>>>>> Have you seen the Join library that is part of schemas? I'm >>>>>>>>> curious whether this fits your needs, or there's something lacking >>>>>>>>> there. >>>>>>>>> >>>>>>>>> On Fri, Nov 22, 2019 at 12:31 AM Jan Lukavský <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> based on roadmap [1], we would like to define and implement a >>>>>>>>>> full set >>>>>>>>>> of (unified) stream-stream joins. That would include: >>>>>>>>>> >>>>>>>>>> - joins (left, right, full outer) on global window with >>>>>>>>>> "immediate >>>>>>>>>> trigger" >>>>>>>>>> >>>>>>>>>> - joins with different windowing functions on left and right >>>>>>>>>> side >>>>>>>>>> >>>>>>>>>> The approach would be to define these operations in a natural >>>>>>>>>> way, so >>>>>>>>>> that the definition is aligned with how current joins work (same >>>>>>>>>> windows, cartesian product of values with same keys, output >>>>>>>>>> timestamp >>>>>>>>>> projected to the end of window, etc.). Because this should be a >>>>>>>>>> generic >>>>>>>>>> approach, this effort should probably be part of join library, >>>>>>>>>> that can >>>>>>>>>> the be reused by other components, too (e.g. SQL). >>>>>>>>>> >>>>>>>>>> The question is - is (or was) there any effort that we can build >>>>>>>>>> upon? >>>>>>>>>> Or should this be designed from scratch? >>>>>>>>>> >>>>>>>>>> Jan >>>>>>>>>> >>>>>>>>>> [1] https://beam.apache.org/roadmap/euphoria/ >>>>>>>>>> >>>>>>>>>>
