Hi, I think what Jan has in mind would look something like this <https://gist.github.com/dmvk/3ea32eb36c6406fa72d70b9b1df1d878>, if implemented in user code. Am I right?
D. On Tue, Nov 26, 2019 at 10:23 AM Jan Lukavský <[email protected]> wrote: > > On 11/25/19 11:45 PM, Kenneth Knowles wrote: > > > > On Mon, Nov 25, 2019 at 1:56 PM Jan Lukavský <[email protected]> wrote: > >> Hi Rui, >> >> > Hi Kenn, you think stateful DoFn based join can emit joined rows that >> never to be retracted because in stateful DoFn case joined rows will be >> controlled by timers and emit will be only once? If so I will agree with >> it. Generally speaking, if only emit once is the factor of needing >> retraction or not. >> >> that would imply buffering elements up until watermark, then sorting and >> so reduces to the option a) again, is that true? This also has to deal with >> allowed lateness, that would mean, that with allowed lateness greater than >> zero, there can still be multiple firings and so retractions are needed. >> > Specifically, when I say "bi-temporal join" I mean unbounded-to-unbounded > join where one of the join conditions is that elements are within event > time distance d of one another. An element at time t will be saved until > time t + 2d and then garbage collected. Every matching pair can be emitted > immediately. > > OK, this might simplify things a little. Is there a design doc for that? > If there are multiple LHS elements within event time distance from RHS > element, which one should be joined? I suppose all of them, but that is not > "(time-varying-)relational" join semantics. In that semantics only the last > element must be joined, because that is how a (classical) relational > database would see the relation at time T (the old record would have been > overwritten and not be part of the output). Because of the time distance > constraint this is different from the join I have in mind, because that > simply joins every LHS element(s) to most recent RHS element(s) and vice > versa, without any additional time constraints (that is the RHS "update" > can happen arbitrarily far in past). > > Jan > > > In the triggered CoGBK + join-product implementation, you do need > retractions as a model concept. But you don't need full support, since they > only need to be shipped as deltas and only from the CoGBK to the > join-product transform where they are all consumed to create only positive > elements. Again a delay is not required; this yields correct results with > the "always" trigger. > > Neither case requires waiting or time sorting a whole buffer. The > bi-temporal join requires something more, in a way, since you need to query > by time range and GC time prefixes. > > Kenn > > Jan >> On 11/25/19 10:17 PM, Rui Wang wrote: >> >> >> >> On Mon, Nov 25, 2019 at 11:29 AM Jan Lukavský <[email protected]> wrote: >> >>> >>> On 11/25/19 7:47 PM, Kenneth Knowles wrote: >>> >>> >>> >>> On Sun, Nov 24, 2019 at 12:57 AM Jan Lukavský <[email protected]> wrote: >>> >>>> I can put down a design document, but before that I need to clarify >>>> some things for me. I'm struggling to put all of this into a bigger >>>> picture. Sorry if the arguments are circulating, but I didn't notice any >>>> proposal of how to solve these. If anyone can disprove any of this logic it >>>> would be very much appreciated as I might be able to get from a dead end: >>>> >>>> a) in the bi-temporal join you can either buffer until watermark, or >>>> emit false data that has to be retracted >>>> >>> This is not the case. A stateful DoFn based join can emit immediately >>> joined rows that will never need to be retracted. The need for retractions >>> has to do with CoGBK-based implementation of a join. >>> >>> I fail to see how this could work. If I emit joined rows immediately >>> without waiting for watermark to pass, I can join two elements, that don't >>> belong to each other, because later can arrive element with lower time >>> distance, that should have been joint in the place of the previously >>> emitted one. This is wrong result that has to be retracted. Or what I'm >>> missing? >>> >> >> Hi Kenn, you think stateful DoFn based join can emit joined rows that >> never to be retracted because in stateful DoFn case joined rows will be >> controlled by timers and emit will be only once? If so I will agree with >> it. Generally speaking, if only emit once is the factor of needing >> retraction or not. >> >> In the past brainstorming, even having retractions ready, streaming join >> with windowing are likely be implemented by a style of CoGBK + stateful >> DoFn. >> >> >> >> I suggest that you work out the definition of the join you are interested >>> in, with a good amount of mathematical rigor, and then consider the ways >>> you can implement it. That is where a design doc will probably clarify >>> things. >>> >>> Kenn >>> >>> b) until retractions are 100% functional (and that is sort of holy >>>> grail for now), then the only solution is using a buffer holding data up to >>>> watermark *and then sort by event time* >>>> >>> c) even if retractions were 100% functional, there would have to be >>>> special implementation for batch case, because otherwise this would simply >>>> blow up downstream processing with insanely many false additions and >>>> subsequent retractions >>>> >>>> Property b) means that if we want this feature now, we must sort by >>>> event time and there is no way around. Property c) shows that even in the >>>> future, we must make (in certain cases) distinction between batch and >>>> streaming code paths, which seems weird to me, but it might be an option. >>>> But still, there is no way to express this join in batch case, because it >>>> would require either buffering (up to) whole input on local worker (doesn't >>>> look like viable option) or provide a way in user code to signal the need >>>> for ordering of data inside GBK (and we are there again :)). Yes, we might >>>> shift this need from stateful dofn to GBK like >>>> >>>> input.apply(GroupByKey.sorted()) >>>> >>>> I cannot find a good reasoning why this would be better than giving >>>> this semantics to (stateful) ParDo. >>>> >>>> Maybe someone can help me out here? >>>> >>>> Jan >>>> On 11/24/19 5:05 AM, Kenneth Knowles wrote: >>>> >>>> I don't actually see how event time sorting simplifies this case much. >>>> You still need to buffer elements until they can no longer be matched in >>>> the join, and you still need to query that buffer for elements that might >>>> match. The general "bi-temporal join" (without sorting) requires one new >>>> state type and then it has identical API, does not require any novel data >>>> structures or reasoning, yields better latency (no sort buffer delay), and >>>> discards less data (no sort buffer cutoff; watermark is better). Perhaps a >>>> design document about this specific case would clarify. >>>> >>>> Kenn >>>> >>>> On Fri, Nov 22, 2019 at 10:08 PM Jan Lukavský <[email protected]> wrote: >>>> >>>>> I didn't want to go too much into detail, but to describe the idea >>>>> roughly (ignoring the problem of different window fns on both sides to >>>>> keep >>>>> it as simple as possible): >>>>> >>>>> rhs ----- \ >>>>> >>>>> flatten (on global window) ---- stateful par do >>>>> (sorted by event time) ---- output >>>>> >>>>> lhs ----- / >>>>> >>>>> If we can guarantee event time order arrival of events into the >>>>> stateful pardo, then the whole complexity reduces to keep current value of >>>>> left and right element and just flush them out each time there is an >>>>> update. That is the "knob" is actually when watermark moves, because it is >>>>> what tells the join operation that there will be no more (not late) input. >>>>> This is very, very simplified, but depicts the solution. The "classical" >>>>> windowed join reduces to this if all data in each window is projected onto >>>>> window end boundary. Then there will be a cartesian product, because all >>>>> the elements have the same timestamp. I can put this into a design doc >>>>> with >>>>> all the details, I was trying to find out if there is or was any effort >>>>> around this. >>>>> >>>>> I was in touch with Reza in the PR #9032, I think that it currently >>>>> suffers from problems with running this on batch. >>>>> >>>>> I think I can even (partly) resolve the retraction issue (for joins), >>>>> as described on the thread [1]. Shortly, there can be two copies of the >>>>> stateful dofn, one running at watermark and the other at (watermark - >>>>> allowed lateness). One would produce ON_TIME (maybe wrong) results, the >>>>> other would produce LATE but correct ones. Being able to compare them, the >>>>> outcome would be that it would be possible to retract the wrong results. >>>>> >>>>> Yes, this is also about providing more evidence of why I think >>>>> event-time sorting should be (somehow) part of the model. :-) >>>>> >>>>> Jan >>>>> >>>>> [1] >>>>> https://lists.apache.org/thread.html/a736ebd6b0913d2e06dfa54797716d022adf2c45140530d5c3bd538d@%3Cdev.beam.apache.org%3E >>>>> On 11/23/19 5:54 AM, Kenneth Knowles wrote: >>>>> >>>>> +Mikhail Gryzykhin <[email protected]> +Rui Wang <[email protected]> +Reza >>>>> Rokni <[email protected]> who have all done some investigations here. >>>>> >>>>> >>>>> On Fri, Nov 22, 2019 at 11:48 AM Jan Lukavský <[email protected]> wrote: >>>>> >>>>>> >>>>>> On 11/22/19 7:54 PM, Reuven Lax wrote: >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Nov 22, 2019 at 10:19 AM Jan Lukavský <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi Reuven, >>>>>>> >>>>>>> I didn't investigate that particular one, but looking into that now, >>>>>>> it looks that is (same as the "classic" join library) builds around >>>>>>> CoGBK. >>>>>>> Is that correct? If yes, then it essentially means that it: >>>>>>> >>>>>> - works only for cases where both sides have the same windowfn (that >>>>>>> is limitation of Flatten that precedes CoGBK) >>>>>>> >>>>>> Correct. Did you want to join different windows? If so what are the >>>>>> semantics? If the lhs has FixedWindows and the rhs has SessionWindows, >>>>>> what >>>>>> do you want the join semantics to be? The only thing I could imagine >>>>>> would >>>>>> be for the user to provide some function telling the join how to map the >>>>>> windows together, but that could be pretty complicated. >>>>>> >>>>>> I don't want to go too far into details, but generally both lhs and >>>>>> rhs can be put onto time line and then full join can be defined as each >>>>>> pair of (lhs, first preceding rhs) and (rhs, first preceding lhs). Then >>>>>> the >>>>>> end of window is semantically just clearing the joined value (setting it >>>>>> to >>>>>> null, thus at the end of window there will be pair (lhs, null) or (null, >>>>>> rhs) in case of full outer join). This way any combination of windows is >>>>>> possible, because all window does is that it "scopes" validity of >>>>>> respective values (lhs, rhs). >>>>>> >>>>> >>>>> I think it is very valid to hope to do a join in the sense of a >>>>> relational join where it is row-to-row. In this case, Beam's concept of >>>>> windowing may or may not make sense. It is just a tool for the job. It is >>>>> just a grouping key that provides a time when state can be deleted. So I >>>>> would say your use case is more global window to global window join. That >>>>> is what I think of as a true stream-to-stream join anyhow. You probably >>>>> don't want to wait forever for output. So you'll need to use some knob >>>>> other than Beam windows or triggers. >>>>> >>>>>> Reza has prototyped a join like you describe here: >>>>> https://github.com/apache/beam/pull/9032 >>>>> >>>>> If your join condition explicitly includes the event time distance >>>>> between elements, then it could "just work". If that isn't really part of >>>>> your join condition, then you will have to see this restriction as a >>>>> "knob" >>>>> that you tweak on your results. >>>>> >>>>>> - when using global window, there has to be trigger and (afaik) >>>>>>> there is no trigger that would guarantee firing after each data element >>>>>>> (for early panes) (because triggers are there to express cost-latency >>>>>>> tradeoff, not semantics) >>>>>>> >>>>>> >>>>>> Can you explain the use case where this matters? If you do trigger >>>>>> elementCountAtLeast(1) on the join, then the consumer will simply see a >>>>>> continuous stream of outputs. I'm not sure I understand why the consumer >>>>>> cares that some of those outputs were in a pane that really held 3 >>>>>> outputs >>>>>> instead of 1. >>>>>> >>>>>> What I'm trying to solve is basically this: >>>>>> >>>>>> - lhs is event stream >>>>>> >>>>>> - rhs is stream of a "state updates" >>>>>> >>>>>> purpose of the join is "take each event, pair it with currently valid >>>>>> state and produce output and possibly modified state". I cannot process >>>>>> two >>>>>> events at a time, because first event can modify the state and the >>>>>> subsequent event should see this. It is not a "simple" stateful pardo >>>>>> either, because the state can be modified externally (not going into too >>>>>> much detail here, but e.g. by writing into kafka topic). >>>>>> >>>>> Reuven's explanation is missing some detail. If the CoGBK is in >>>>> discarding mode, then it will miss join results. If the CoGBK is in >>>>> accumulating mode, it will duplicate join results. This is a known problem >>>>> and the general solution is retractions. >>>>> >>>>> Basically, CoGBK-based joins just don't work with triggers until we >>>>> have retractions. >>>>> >>>>> >>>>> >>>>>> Moreover, I'd like to define the join semantics so that when there >>>>>>> are available elements from both sides, the fired pane should be >>>>>>> ON_TIME, >>>>>>> not EARLY. That essentially means that the fully general case would not >>>>>>> be >>>>>>> built around (Co)GBK, but stateful ParDo. There are specific options >>>>>>> where >>>>>>> this fully general case "degrades" into forms that can be efficiently >>>>>>> expressed using (Co)GBK, that is true. >>>>>>> >>>>>> >>>>>> BTW building this around stateful DoFn might be a better fit. The >>>>>> main reason I didn't is because we would need a good distributed MapState >>>>>> (something discussed fairly recently on the list), and that is not yet >>>>>> built. Once we had that, I might be inclined to rewrite this join on >>>>>> stateful DoFn. >>>>>> >>>>>> Yes, the sorted state helps for streaming case. But I'd be careful >>>>>> about that for batch case, where this might lead to high pressure on the >>>>>> state (and InMemoryStateInternals might OOME for instance). >>>>>> >>>>>> >>>>>> However can you explain what you are expecting from the pane? An >>>>>> EARLY pane simply means that we are producing output before the end of >>>>>> the >>>>>> window. If you are in the global window triggering every element, then >>>>>> every output is EARLY. It might seem weird if you are interpreting EARLY >>>>>> as >>>>>> "outputting data that isn't ready," however that's not what EARLY is >>>>>> defined to be. Any change to the pane semantics would be a major breaking >>>>>> change to very fundamental semantics. >>>>>> >>>>>> I wonder if you are really objecting to the name EARLY and ON_TIME? >>>>>> Maybe we would've been better off tagging it BEFORE_WINDOW_END instead of >>>>>> EARLY, to make it clear what is meant? >>>>>> >>>>>> Essentially I don't object anything here. I'm missing solution to the >>>>>> "event vs. state" join described above. I was thinking about how to make >>>>>> these types of problems more user friendly and it essentially leads to >>>>>> creating a somewhat more generic semantics of join, where end-of-window >>>>>> is >>>>>> converted into "'value-delete events" and then just joining by the >>>>>> "previous" or "valid" value (yes, this relates to validity windows >>>>>> mentioned on Beam Summit Europe). It actually turns out that with some >>>>>> work >>>>>> we could define quite "naturally" a join on two streams with global >>>>>> window >>>>>> and no trigger. It would even function with lowest latency possible (but >>>>>> yes, with the highest expenses, it is actually the introduction of >>>>>> (same!) >>>>>> windows that enable certain optimizations). It the correctly defines >>>>>> semantics for different windows, although the result would be (probably >>>>>> unexpectedly) windowed using global window. But that doesn't seem to be >>>>>> any >>>>>> breaking change, because it is currently not possible (any such pipeline >>>>>> will not be validated). >>>>>> >>>>>> Maybe for reference, the unwindowed join would be what is described >>>>>> here [1] >>>>>> >>>>>> [1] >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-KStream-KTableJoin >>>>>> >>>>>> >>>>>> >>>>>>> Jan >>>>>>> On 11/22/19 6:47 PM, Reuven Lax wrote: >>>>>>> >>>>>>> Have you seen the Join library that is part of schemas? I'm curious >>>>>>> whether this fits your needs, or there's something lacking there. >>>>>>> >>>>>>> On Fri, Nov 22, 2019 at 12:31 AM Jan Lukavský <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> based on roadmap [1], we would like to define and implement a full >>>>>>>> set >>>>>>>> of (unified) stream-stream joins. That would include: >>>>>>>> >>>>>>>> - joins (left, right, full outer) on global window with >>>>>>>> "immediate >>>>>>>> trigger" >>>>>>>> >>>>>>>> - joins with different windowing functions on left and right side >>>>>>>> >>>>>>>> The approach would be to define these operations in a natural way, >>>>>>>> so >>>>>>>> that the definition is aligned with how current joins work (same >>>>>>>> windows, cartesian product of values with same keys, output >>>>>>>> timestamp >>>>>>>> projected to the end of window, etc.). Because this should be a >>>>>>>> generic >>>>>>>> approach, this effort should probably be part of join library, that >>>>>>>> can >>>>>>>> the be reused by other components, too (e.g. SQL). >>>>>>>> >>>>>>>> The question is - is (or was) there any effort that we can build >>>>>>>> upon? >>>>>>>> Or should this be designed from scratch? >>>>>>>> >>>>>>>> Jan >>>>>>>> >>>>>>>> [1] https://beam.apache.org/roadmap/euphoria/ >>>>>>>> >>>>>>>>
