Yes, I am suggesting to add more intelligent state data structures for just
that sort of join. I tagged Reza because his work basically does it, but
explicitly pulls a BagState into memory and sorts it. We just need to avoid
that. It is the sort of thing that already exists in some engines so
there's proof of concept :-). Jan makes the good point that executing the
same join in batch you wouldn't use the same algorithm, because the
disorder will be unbounded. In Beam you'd want a PTransform that expands
differently based on whether the inputs are bounded or unbounded.

Kenn

On Tue, Nov 26, 2019 at 4:16 AM David Morávek <[email protected]>
wrote:

> Yes, in batch case with long-term historical data, this would be O(n^2) as
> it basically a bubble sort. If you have large # of updates for a single
> key, this would be super expensive.
>
> Kenn, can this be re-implemented with your solution?
>
> On Tue, Nov 26, 2019 at 1:10 PM Jan Lukavský <[email protected]> wrote:
>
>> Functionally yes. But this straightforward solution is not working for me
>> for two main reasons:
>>
>>  - it either blows state in batch case or the time complexity of the sort
>> would be O(n^2) (and reprocessing several years of dense time-series data
>> makes it a no go)
>>
>>  - it is not reusable for different time-ordering needs, because the
>> logic implemented purely in user-space cannot be transferred to different
>> problem (there are two states needed, one for buffer, the other for
>> user-state) and extending DoFns does not work (cannot create abstract
>> SortedDoFn, because of the state annotation definitions)
>>
>> Jan
>> On 11/26/19 12:56 PM, David Morávek wrote:
>>
>> Hi,
>>
>> I think what Jan has in mind would look something like this
>> <https://gist.github.com/dmvk/3ea32eb36c6406fa72d70b9b1df1d878>, if
>> implemented in user code. Am I right?
>>
>> D.
>>
>>
>> On Tue, Nov 26, 2019 at 10:23 AM Jan Lukavský <[email protected]> wrote:
>>
>>>
>>> On 11/25/19 11:45 PM, Kenneth Knowles wrote:
>>>
>>>
>>>
>>> On Mon, Nov 25, 2019 at 1:56 PM Jan Lukavský <[email protected]> wrote:
>>>
>>>> Hi Rui,
>>>>
>>>> > Hi Kenn, you think stateful DoFn based join can emit joined rows that
>>>> never to be retracted because in stateful DoFn case joined rows will be
>>>> controlled by timers and emit will be only once? If so I will agree with
>>>> it. Generally speaking, if only emit once is the factor of needing
>>>> retraction or not.
>>>>
>>>> that would imply buffering elements up until watermark, then sorting
>>>> and so reduces to the option a) again, is that true? This also has to deal
>>>> with allowed lateness, that would mean, that with allowed lateness greater
>>>> than zero, there can still be multiple firings and so retractions are
>>>> needed.
>>>>
>>> Specifically, when I say "bi-temporal join" I mean
>>> unbounded-to-unbounded join where one of the join conditions is that
>>> elements are within event time distance d of one another. An element at
>>> time t will be saved until time t + 2d and then garbage collected. Every
>>> matching pair can be emitted immediately.
>>>
>>> OK, this might simplify things a little. Is there a design doc for that?
>>> If there are multiple LHS elements within event time distance from RHS
>>> element, which one should be joined? I suppose all of them, but that is not
>>> "(time-varying-)relational" join semantics. In that semantics only the last
>>> element must be joined, because that is how a (classical) relational
>>> database would see the relation at time T (the old record would have been
>>> overwritten and not be part of the output). Because of the time distance
>>> constraint this is different from the join I have in mind, because that
>>> simply joins every LHS element(s) to most recent RHS element(s) and vice
>>> versa, without any additional time constraints (that is the RHS "update"
>>> can happen arbitrarily far in past).
>>>
>>> Jan
>>>
>>>
>>> In the triggered CoGBK + join-product implementation, you do need
>>> retractions as a model concept. But you don't need full support, since they
>>> only need to be shipped as deltas and only from the CoGBK to the
>>> join-product transform where they are all consumed to create only positive
>>> elements. Again a delay is not required; this yields correct results with
>>> the "always" trigger.
>>>
>>> Neither case requires waiting or time sorting a whole buffer. The
>>> bi-temporal join requires something more, in a way, since you need to query
>>> by time range and GC time prefixes.
>>>
>>> Kenn
>>>
>>> Jan
>>>> On 11/25/19 10:17 PM, Rui Wang wrote:
>>>>
>>>>
>>>>
>>>> On Mon, Nov 25, 2019 at 11:29 AM Jan Lukavský <[email protected]> wrote:
>>>>
>>>>>
>>>>> On 11/25/19 7:47 PM, Kenneth Knowles wrote:
>>>>>
>>>>>
>>>>>
>>>>> On Sun, Nov 24, 2019 at 12:57 AM Jan Lukavský <[email protected]> wrote:
>>>>>
>>>>>> I can put down a design document, but before that I need to clarify
>>>>>> some things for me. I'm struggling to put all of this into a bigger
>>>>>> picture. Sorry if the arguments are circulating, but I didn't notice any
>>>>>> proposal of how to solve these. If anyone can disprove any of this logic 
>>>>>> it
>>>>>> would be very much appreciated as I might be able to get from a dead end:
>>>>>>
>>>>>>  a) in the bi-temporal join you can either buffer until watermark, or
>>>>>> emit false data that has to be retracted
>>>>>>
>>>>> This is not the case. A stateful DoFn based join can emit immediately
>>>>> joined rows that will never need to be retracted. The need for retractions
>>>>> has to do with CoGBK-based implementation of a join.
>>>>>
>>>>> I fail to see how this could work. If I emit joined rows immediately
>>>>> without waiting for watermark to pass, I can join two elements, that don't
>>>>> belong to each other, because later can arrive element with lower time
>>>>> distance, that should have been joint in the place of the previously
>>>>> emitted one. This is wrong result that has to be retracted. Or what I'm
>>>>> missing?
>>>>>
>>>>
>>>> Hi Kenn, you think stateful DoFn based join can emit joined rows that
>>>> never to be retracted because in stateful DoFn case joined rows will be
>>>> controlled by timers and emit will be only once? If so I will agree with
>>>> it. Generally speaking, if only emit once is the factor of needing
>>>> retraction or not.
>>>>
>>>> In the past brainstorming, even having retractions ready, streaming
>>>> join with windowing are likely be implemented by a style of CoGBK +
>>>> stateful DoFn.
>>>>
>>>>
>>>>
>>>> I suggest that you work out the definition of the join you are
>>>>> interested in, with a good amount of mathematical rigor, and then consider
>>>>> the ways you can implement it. That is where a design doc will probably
>>>>> clarify things.
>>>>>
>>>>> Kenn
>>>>>
>>>>>  b) until retractions are 100% functional (and that is sort of holy
>>>>>> grail for now), then the only solution is using a buffer holding data up 
>>>>>> to
>>>>>> watermark *and then sort by event time*
>>>>>>
>>>>>  c) even if retractions were 100% functional, there would have to be
>>>>>> special implementation for batch case, because otherwise this would 
>>>>>> simply
>>>>>> blow up downstream processing with insanely many false additions and
>>>>>> subsequent retractions
>>>>>>
>>>>>> Property b) means that if we want this feature now, we must sort by
>>>>>> event time and there is no way around. Property c) shows that even in the
>>>>>> future, we must make (in certain cases) distinction between batch and
>>>>>> streaming code paths, which seems weird to me, but it might be an option.
>>>>>> But still, there is no way to express this join in batch case, because it
>>>>>> would require either buffering (up to) whole input on local worker 
>>>>>> (doesn't
>>>>>> look like viable option) or provide a way in user code to signal the need
>>>>>> for ordering of data inside GBK (and we are there again :)). Yes, we 
>>>>>> might
>>>>>> shift this need from stateful dofn to GBK like
>>>>>>
>>>>>>  input.apply(GroupByKey.sorted())
>>>>>>
>>>>>> I cannot find a good reasoning why this would be better than giving
>>>>>> this semantics to (stateful) ParDo.
>>>>>>
>>>>>> Maybe someone can help me out here?
>>>>>>
>>>>>> Jan
>>>>>> On 11/24/19 5:05 AM, Kenneth Knowles wrote:
>>>>>>
>>>>>> I don't actually see how event time sorting simplifies this case
>>>>>> much. You still need to buffer elements until they can no longer be 
>>>>>> matched
>>>>>> in the join, and you still need to query that buffer for elements that
>>>>>> might match. The general "bi-temporal join" (without sorting) requires 
>>>>>> one
>>>>>> new state type and then it has identical API, does not require any novel
>>>>>> data structures or reasoning, yields better latency (no sort buffer 
>>>>>> delay),
>>>>>> and discards less data (no sort buffer cutoff; watermark is better).
>>>>>> Perhaps a design document about this specific case would clarify.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Fri, Nov 22, 2019 at 10:08 PM Jan Lukavský <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> I didn't want to go too much into detail, but to describe the idea
>>>>>>> roughly (ignoring the problem of different window fns on both sides to 
>>>>>>> keep
>>>>>>> it as simple as possible):
>>>>>>>
>>>>>>> rhs -----  \
>>>>>>>
>>>>>>>                 flatten (on global window) ---- stateful par do
>>>>>>> (sorted by event time)  ---- output
>>>>>>>
>>>>>>> lhs -----  /
>>>>>>>
>>>>>>> If we can guarantee event time order arrival of events into the
>>>>>>> stateful pardo, then the whole complexity reduces to keep current value 
>>>>>>> of
>>>>>>> left and right element and just flush them out each time there is an
>>>>>>> update. That is the "knob" is actually when watermark moves, because it 
>>>>>>> is
>>>>>>> what tells the join operation that there will be no more (not late) 
>>>>>>> input.
>>>>>>> This is very, very simplified, but depicts the solution. The "classical"
>>>>>>> windowed join reduces to this if all data in each window is projected 
>>>>>>> onto
>>>>>>> window end boundary. Then there will be a cartesian product, because all
>>>>>>> the elements have the same timestamp. I can put this into a design doc 
>>>>>>> with
>>>>>>> all the details, I was trying to find out if there is or was any effort
>>>>>>> around this.
>>>>>>>
>>>>>>> I was in touch with Reza in the PR #9032, I think that it currently
>>>>>>> suffers from problems with running this on batch.
>>>>>>>
>>>>>>> I think I can even (partly) resolve the retraction issue (for
>>>>>>> joins), as described on the thread [1]. Shortly, there can be two 
>>>>>>> copies of
>>>>>>> the stateful dofn, one running at watermark and the other at (watermark 
>>>>>>> -
>>>>>>> allowed lateness). One would produce ON_TIME (maybe wrong) results, the
>>>>>>> other would produce LATE but correct ones. Being able to compare them, 
>>>>>>> the
>>>>>>> outcome would be that it would be possible to retract the wrong results.
>>>>>>>
>>>>>>> Yes, this is also about providing more evidence of why I think
>>>>>>> event-time sorting should be (somehow) part of the model. :-)
>>>>>>>
>>>>>>> Jan
>>>>>>>
>>>>>>> [1]
>>>>>>> https://lists.apache.org/thread.html/a736ebd6b0913d2e06dfa54797716d022adf2c45140530d5c3bd538d@%3Cdev.beam.apache.org%3E
>>>>>>> On 11/23/19 5:54 AM, Kenneth Knowles wrote:
>>>>>>>
>>>>>>> +Mikhail Gryzykhin <[email protected]> +Rui Wang <[email protected]>
>>>>>>>  +Reza Rokni <[email protected]> who have all done some investigations
>>>>>>> here.
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Nov 22, 2019 at 11:48 AM Jan Lukavský <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> On 11/22/19 7:54 PM, Reuven Lax wrote:
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Nov 22, 2019 at 10:19 AM Jan Lukavský <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Reuven,
>>>>>>>>>
>>>>>>>>> I didn't investigate that particular one, but looking into that
>>>>>>>>> now, it looks that is (same as the "classic" join library) builds 
>>>>>>>>> around
>>>>>>>>> CoGBK. Is that correct? If yes, then it essentially means that it:
>>>>>>>>>
>>>>>>>>  - works only for cases where both sides have the same windowfn
>>>>>>>>> (that is limitation of Flatten that precedes CoGBK)
>>>>>>>>>
>>>>>>>> Correct. Did you want to join different windows? If so what are the
>>>>>>>> semantics? If the lhs has FixedWindows and the rhs has SessionWindows, 
>>>>>>>> what
>>>>>>>> do you want the join semantics to be? The only thing I could imagine 
>>>>>>>> would
>>>>>>>> be for the user to provide some function telling the join how to map 
>>>>>>>> the
>>>>>>>> windows together, but that could be pretty complicated.
>>>>>>>>
>>>>>>>> I don't want to go too far into details, but generally both lhs and
>>>>>>>> rhs can be put onto time line and then full join can be defined as each
>>>>>>>> pair of (lhs, first preceding rhs) and (rhs, first preceding lhs). 
>>>>>>>> Then the
>>>>>>>> end of window is semantically just clearing the joined value (setting 
>>>>>>>> it to
>>>>>>>> null, thus at the end of window there will be pair (lhs, null) or 
>>>>>>>> (null,
>>>>>>>> rhs) in case of full outer join). This way any combination of windows 
>>>>>>>> is
>>>>>>>> possible, because all window does is that it "scopes" validity of
>>>>>>>> respective values (lhs, rhs).
>>>>>>>>
>>>>>>>
>>>>>>> I think it is very valid to hope to do a join in the sense of a
>>>>>>> relational join where it is row-to-row. In this case, Beam's concept of
>>>>>>> windowing may or may not make sense. It is just a tool for the job. It 
>>>>>>> is
>>>>>>> just a grouping key that provides a time when state can be deleted. So I
>>>>>>> would say your use case is more global window to global window join. 
>>>>>>> That
>>>>>>> is what I think of as a true stream-to-stream join anyhow. You probably
>>>>>>> don't want to wait forever for output. So you'll need to use some knob
>>>>>>> other than Beam windows or triggers.
>>>>>>>
>>>>>>>> Reza has prototyped a join like you describe here:
>>>>>>> https://github.com/apache/beam/pull/9032
>>>>>>>
>>>>>>> If your join condition explicitly includes the event time distance
>>>>>>> between elements, then it could "just work". If that isn't really part 
>>>>>>> of
>>>>>>> your join condition, then you will have to see this restriction as a 
>>>>>>> "knob"
>>>>>>> that you tweak on your results.
>>>>>>>
>>>>>>>>  - when using global window, there has to be trigger and (afaik)
>>>>>>>>> there is no trigger that would guarantee firing after each data 
>>>>>>>>> element
>>>>>>>>> (for early panes) (because triggers are there to express cost-latency
>>>>>>>>> tradeoff, not semantics)
>>>>>>>>>
>>>>>>>>
>>>>>>>> Can you explain the use case where this matters? If you do trigger
>>>>>>>> elementCountAtLeast(1) on the join, then the consumer will simply see a
>>>>>>>> continuous stream of outputs. I'm not sure I understand why the 
>>>>>>>> consumer
>>>>>>>> cares that some of those outputs were in a pane that really held 3 
>>>>>>>> outputs
>>>>>>>> instead of 1.
>>>>>>>>
>>>>>>>> What I'm trying to solve is basically this:
>>>>>>>>
>>>>>>>>  - lhs is event stream
>>>>>>>>
>>>>>>>>  - rhs is stream of a "state updates"
>>>>>>>>
>>>>>>>> purpose of the join is "take each event, pair it with currently
>>>>>>>> valid state and produce output and possibly modified state". I cannot
>>>>>>>> process two events at a time, because first event can modify the state 
>>>>>>>> and
>>>>>>>> the subsequent event should see this. It is not a "simple" stateful 
>>>>>>>> pardo
>>>>>>>> either, because the state can be modified externally (not going into 
>>>>>>>> too
>>>>>>>> much detail here, but e.g. by writing into kafka topic).
>>>>>>>>
>>>>>>> Reuven's explanation is missing some detail. If the CoGBK is in
>>>>>>> discarding mode, then it will miss join results. If the CoGBK is in
>>>>>>> accumulating mode, it will duplicate join results. This is a known 
>>>>>>> problem
>>>>>>> and the general solution is retractions.
>>>>>>>
>>>>>>> Basically, CoGBK-based joins just don't work with triggers until we
>>>>>>> have retractions.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>> Moreover, I'd like to define the join semantics so that when there
>>>>>>>>> are available elements from both sides, the fired pane should be 
>>>>>>>>> ON_TIME,
>>>>>>>>> not EARLY. That essentially means that the fully general case would 
>>>>>>>>> not be
>>>>>>>>> built around (Co)GBK, but stateful ParDo. There are specific options 
>>>>>>>>> where
>>>>>>>>> this fully general case "degrades" into forms that can be efficiently
>>>>>>>>> expressed using (Co)GBK, that is true.
>>>>>>>>>
>>>>>>>>
>>>>>>>> BTW building this around stateful DoFn might be a better fit. The
>>>>>>>> main reason I didn't is because we would need a good distributed 
>>>>>>>> MapState
>>>>>>>> (something discussed fairly recently on the list), and that is not yet
>>>>>>>> built. Once we had that, I might be inclined to rewrite this join on
>>>>>>>> stateful DoFn.
>>>>>>>>
>>>>>>>> Yes, the sorted state helps for streaming case. But I'd be careful
>>>>>>>> about that for batch case, where this might lead to high pressure on 
>>>>>>>> the
>>>>>>>> state (and InMemoryStateInternals might OOME for instance).
>>>>>>>>
>>>>>>>>
>>>>>>>> However can you explain what you are expecting from the pane? An
>>>>>>>> EARLY pane simply means that we are producing output before the end of 
>>>>>>>> the
>>>>>>>> window. If you are in the global window triggering every element, then
>>>>>>>> every output is EARLY. It might seem weird if you are interpreting 
>>>>>>>> EARLY as
>>>>>>>> "outputting data that isn't ready," however that's not what EARLY is
>>>>>>>> defined to be. Any change to the pane semantics would be a major 
>>>>>>>> breaking
>>>>>>>> change to very fundamental semantics.
>>>>>>>>
>>>>>>>> I wonder if you are really objecting to the name EARLY and ON_TIME?
>>>>>>>> Maybe we would've been better off tagging it BEFORE_WINDOW_END instead 
>>>>>>>> of
>>>>>>>> EARLY, to make it clear what is meant?
>>>>>>>>
>>>>>>>> Essentially I don't object anything here. I'm missing solution to
>>>>>>>> the "event vs. state" join described above. I was thinking about how to
>>>>>>>> make these types of problems more user friendly and it essentially 
>>>>>>>> leads to
>>>>>>>> creating a somewhat more generic semantics of join, where 
>>>>>>>> end-of-window is
>>>>>>>> converted into "'value-delete events" and then just joining by the
>>>>>>>> "previous" or "valid" value (yes, this relates to validity windows
>>>>>>>> mentioned on Beam Summit Europe). It actually turns out that with some 
>>>>>>>> work
>>>>>>>> we could define quite "naturally" a join on two streams with global 
>>>>>>>> window
>>>>>>>> and no trigger. It would even function with lowest latency possible 
>>>>>>>> (but
>>>>>>>> yes, with the highest expenses, it is actually the introduction of 
>>>>>>>> (same!)
>>>>>>>> windows that enable certain optimizations). It the correctly defines
>>>>>>>> semantics for different windows, although the result would be (probably
>>>>>>>> unexpectedly) windowed using global window. But that doesn't seem to 
>>>>>>>> be any
>>>>>>>> breaking change, because it is currently not possible (any such 
>>>>>>>> pipeline
>>>>>>>> will not be validated).
>>>>>>>>
>>>>>>>> Maybe for reference, the unwindowed join would be what is described
>>>>>>>> here [1]
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-KStream-KTableJoin
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>> Jan
>>>>>>>>> On 11/22/19 6:47 PM, Reuven Lax wrote:
>>>>>>>>>
>>>>>>>>> Have you seen the Join library that is part of schemas? I'm
>>>>>>>>> curious whether this fits your needs, or there's something lacking 
>>>>>>>>> there.
>>>>>>>>>
>>>>>>>>> On Fri, Nov 22, 2019 at 12:31 AM Jan Lukavský <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> based on roadmap [1], we would like to define and implement a
>>>>>>>>>> full set
>>>>>>>>>> of (unified) stream-stream joins. That would include:
>>>>>>>>>>
>>>>>>>>>>   - joins (left, right, full outer) on global window with
>>>>>>>>>> "immediate
>>>>>>>>>> trigger"
>>>>>>>>>>
>>>>>>>>>>   - joins with different windowing functions on left and right
>>>>>>>>>> side
>>>>>>>>>>
>>>>>>>>>> The approach would be to define these operations in a natural
>>>>>>>>>> way, so
>>>>>>>>>> that the definition is aligned with how current joins work (same
>>>>>>>>>> windows, cartesian product of values with same keys, output
>>>>>>>>>> timestamp
>>>>>>>>>> projected to the end of window, etc.). Because this should be a
>>>>>>>>>> generic
>>>>>>>>>> approach, this effort should probably be part of join library,
>>>>>>>>>> that can
>>>>>>>>>> the be reused by other components, too (e.g. SQL).
>>>>>>>>>>
>>>>>>>>>> The question is - is (or was) there any effort that we can build
>>>>>>>>>> upon?
>>>>>>>>>> Or should this be designed from scratch?
>>>>>>>>>>
>>>>>>>>>> Jan
>>>>>>>>>>
>>>>>>>>>> [1] https://beam.apache.org/roadmap/euphoria/
>>>>>>>>>>
>>>>>>>>>>

Reply via email to