Hi,

I think what Jan has in mind would look something like this
<https://gist.github.com/dmvk/3ea32eb36c6406fa72d70b9b1df1d878>, if
implemented in user code. Am I right?

D.


On Tue, Nov 26, 2019 at 10:23 AM Jan Lukavský <[email protected]> wrote:

>
> On 11/25/19 11:45 PM, Kenneth Knowles wrote:
>
>
>
> On Mon, Nov 25, 2019 at 1:56 PM Jan Lukavský <[email protected]> wrote:
>
>> Hi Rui,
>>
>> > Hi Kenn, you think stateful DoFn based join can emit joined rows that
>> never to be retracted because in stateful DoFn case joined rows will be
>> controlled by timers and emit will be only once? If so I will agree with
>> it. Generally speaking, if only emit once is the factor of needing
>> retraction or not.
>>
>> that would imply buffering elements up until watermark, then sorting and
>> so reduces to the option a) again, is that true? This also has to deal with
>> allowed lateness, that would mean, that with allowed lateness greater than
>> zero, there can still be multiple firings and so retractions are needed.
>>
> Specifically, when I say "bi-temporal join" I mean unbounded-to-unbounded
> join where one of the join conditions is that elements are within event
> time distance d of one another. An element at time t will be saved until
> time t + 2d and then garbage collected. Every matching pair can be emitted
> immediately.
>
> OK, this might simplify things a little. Is there a design doc for that?
> If there are multiple LHS elements within event time distance from RHS
> element, which one should be joined? I suppose all of them, but that is not
> "(time-varying-)relational" join semantics. In that semantics only the last
> element must be joined, because that is how a (classical) relational
> database would see the relation at time T (the old record would have been
> overwritten and not be part of the output). Because of the time distance
> constraint this is different from the join I have in mind, because that
> simply joins every LHS element(s) to most recent RHS element(s) and vice
> versa, without any additional time constraints (that is the RHS "update"
> can happen arbitrarily far in past).
>
> Jan
>
>
> In the triggered CoGBK + join-product implementation, you do need
> retractions as a model concept. But you don't need full support, since they
> only need to be shipped as deltas and only from the CoGBK to the
> join-product transform where they are all consumed to create only positive
> elements. Again a delay is not required; this yields correct results with
> the "always" trigger.
>
> Neither case requires waiting or time sorting a whole buffer. The
> bi-temporal join requires something more, in a way, since you need to query
> by time range and GC time prefixes.
>
> Kenn
>
> Jan
>> On 11/25/19 10:17 PM, Rui Wang wrote:
>>
>>
>>
>> On Mon, Nov 25, 2019 at 11:29 AM Jan Lukavský <[email protected]> wrote:
>>
>>>
>>> On 11/25/19 7:47 PM, Kenneth Knowles wrote:
>>>
>>>
>>>
>>> On Sun, Nov 24, 2019 at 12:57 AM Jan Lukavský <[email protected]> wrote:
>>>
>>>> I can put down a design document, but before that I need to clarify
>>>> some things for me. I'm struggling to put all of this into a bigger
>>>> picture. Sorry if the arguments are circulating, but I didn't notice any
>>>> proposal of how to solve these. If anyone can disprove any of this logic it
>>>> would be very much appreciated as I might be able to get from a dead end:
>>>>
>>>>  a) in the bi-temporal join you can either buffer until watermark, or
>>>> emit false data that has to be retracted
>>>>
>>> This is not the case. A stateful DoFn based join can emit immediately
>>> joined rows that will never need to be retracted. The need for retractions
>>> has to do with CoGBK-based implementation of a join.
>>>
>>> I fail to see how this could work. If I emit joined rows immediately
>>> without waiting for watermark to pass, I can join two elements, that don't
>>> belong to each other, because later can arrive element with lower time
>>> distance, that should have been joint in the place of the previously
>>> emitted one. This is wrong result that has to be retracted. Or what I'm
>>> missing?
>>>
>>
>> Hi Kenn, you think stateful DoFn based join can emit joined rows that
>> never to be retracted because in stateful DoFn case joined rows will be
>> controlled by timers and emit will be only once? If so I will agree with
>> it. Generally speaking, if only emit once is the factor of needing
>> retraction or not.
>>
>> In the past brainstorming, even having retractions ready, streaming join
>> with windowing are likely be implemented by a style of CoGBK + stateful
>> DoFn.
>>
>>
>>
>> I suggest that you work out the definition of the join you are interested
>>> in, with a good amount of mathematical rigor, and then consider the ways
>>> you can implement it. That is where a design doc will probably clarify
>>> things.
>>>
>>> Kenn
>>>
>>>  b) until retractions are 100% functional (and that is sort of holy
>>>> grail for now), then the only solution is using a buffer holding data up to
>>>> watermark *and then sort by event time*
>>>>
>>>  c) even if retractions were 100% functional, there would have to be
>>>> special implementation for batch case, because otherwise this would simply
>>>> blow up downstream processing with insanely many false additions and
>>>> subsequent retractions
>>>>
>>>> Property b) means that if we want this feature now, we must sort by
>>>> event time and there is no way around. Property c) shows that even in the
>>>> future, we must make (in certain cases) distinction between batch and
>>>> streaming code paths, which seems weird to me, but it might be an option.
>>>> But still, there is no way to express this join in batch case, because it
>>>> would require either buffering (up to) whole input on local worker (doesn't
>>>> look like viable option) or provide a way in user code to signal the need
>>>> for ordering of data inside GBK (and we are there again :)). Yes, we might
>>>> shift this need from stateful dofn to GBK like
>>>>
>>>>  input.apply(GroupByKey.sorted())
>>>>
>>>> I cannot find a good reasoning why this would be better than giving
>>>> this semantics to (stateful) ParDo.
>>>>
>>>> Maybe someone can help me out here?
>>>>
>>>> Jan
>>>> On 11/24/19 5:05 AM, Kenneth Knowles wrote:
>>>>
>>>> I don't actually see how event time sorting simplifies this case much.
>>>> You still need to buffer elements until they can no longer be matched in
>>>> the join, and you still need to query that buffer for elements that might
>>>> match. The general "bi-temporal join" (without sorting) requires one new
>>>> state type and then it has identical API, does not require any novel data
>>>> structures or reasoning, yields better latency (no sort buffer delay), and
>>>> discards less data (no sort buffer cutoff; watermark is better). Perhaps a
>>>> design document about this specific case would clarify.
>>>>
>>>> Kenn
>>>>
>>>> On Fri, Nov 22, 2019 at 10:08 PM Jan Lukavský <[email protected]> wrote:
>>>>
>>>>> I didn't want to go too much into detail, but to describe the idea
>>>>> roughly (ignoring the problem of different window fns on both sides to 
>>>>> keep
>>>>> it as simple as possible):
>>>>>
>>>>> rhs -----  \
>>>>>
>>>>>                 flatten (on global window) ---- stateful par do
>>>>> (sorted by event time)  ---- output
>>>>>
>>>>> lhs -----  /
>>>>>
>>>>> If we can guarantee event time order arrival of events into the
>>>>> stateful pardo, then the whole complexity reduces to keep current value of
>>>>> left and right element and just flush them out each time there is an
>>>>> update. That is the "knob" is actually when watermark moves, because it is
>>>>> what tells the join operation that there will be no more (not late) input.
>>>>> This is very, very simplified, but depicts the solution. The "classical"
>>>>> windowed join reduces to this if all data in each window is projected onto
>>>>> window end boundary. Then there will be a cartesian product, because all
>>>>> the elements have the same timestamp. I can put this into a design doc 
>>>>> with
>>>>> all the details, I was trying to find out if there is or was any effort
>>>>> around this.
>>>>>
>>>>> I was in touch with Reza in the PR #9032, I think that it currently
>>>>> suffers from problems with running this on batch.
>>>>>
>>>>> I think I can even (partly) resolve the retraction issue (for joins),
>>>>> as described on the thread [1]. Shortly, there can be two copies of the
>>>>> stateful dofn, one running at watermark and the other at (watermark -
>>>>> allowed lateness). One would produce ON_TIME (maybe wrong) results, the
>>>>> other would produce LATE but correct ones. Being able to compare them, the
>>>>> outcome would be that it would be possible to retract the wrong results.
>>>>>
>>>>> Yes, this is also about providing more evidence of why I think
>>>>> event-time sorting should be (somehow) part of the model. :-)
>>>>>
>>>>> Jan
>>>>>
>>>>> [1]
>>>>> https://lists.apache.org/thread.html/a736ebd6b0913d2e06dfa54797716d022adf2c45140530d5c3bd538d@%3Cdev.beam.apache.org%3E
>>>>> On 11/23/19 5:54 AM, Kenneth Knowles wrote:
>>>>>
>>>>> +Mikhail Gryzykhin <[email protected]> +Rui Wang <[email protected]> +Reza
>>>>> Rokni <[email protected]> who have all done some investigations here.
>>>>>
>>>>>
>>>>> On Fri, Nov 22, 2019 at 11:48 AM Jan Lukavský <[email protected]> wrote:
>>>>>
>>>>>>
>>>>>> On 11/22/19 7:54 PM, Reuven Lax wrote:
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Nov 22, 2019 at 10:19 AM Jan Lukavský <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Reuven,
>>>>>>>
>>>>>>> I didn't investigate that particular one, but looking into that now,
>>>>>>> it looks that is (same as the "classic" join library) builds around 
>>>>>>> CoGBK.
>>>>>>> Is that correct? If yes, then it essentially means that it:
>>>>>>>
>>>>>>  - works only for cases where both sides have the same windowfn (that
>>>>>>> is limitation of Flatten that precedes CoGBK)
>>>>>>>
>>>>>> Correct. Did you want to join different windows? If so what are the
>>>>>> semantics? If the lhs has FixedWindows and the rhs has SessionWindows, 
>>>>>> what
>>>>>> do you want the join semantics to be? The only thing I could imagine 
>>>>>> would
>>>>>> be for the user to provide some function telling the join how to map the
>>>>>> windows together, but that could be pretty complicated.
>>>>>>
>>>>>> I don't want to go too far into details, but generally both lhs and
>>>>>> rhs can be put onto time line and then full join can be defined as each
>>>>>> pair of (lhs, first preceding rhs) and (rhs, first preceding lhs). Then 
>>>>>> the
>>>>>> end of window is semantically just clearing the joined value (setting it 
>>>>>> to
>>>>>> null, thus at the end of window there will be pair (lhs, null) or (null,
>>>>>> rhs) in case of full outer join). This way any combination of windows is
>>>>>> possible, because all window does is that it "scopes" validity of
>>>>>> respective values (lhs, rhs).
>>>>>>
>>>>>
>>>>> I think it is very valid to hope to do a join in the sense of a
>>>>> relational join where it is row-to-row. In this case, Beam's concept of
>>>>> windowing may or may not make sense. It is just a tool for the job. It is
>>>>> just a grouping key that provides a time when state can be deleted. So I
>>>>> would say your use case is more global window to global window join. That
>>>>> is what I think of as a true stream-to-stream join anyhow. You probably
>>>>> don't want to wait forever for output. So you'll need to use some knob
>>>>> other than Beam windows or triggers.
>>>>>
>>>>>> Reza has prototyped a join like you describe here:
>>>>> https://github.com/apache/beam/pull/9032
>>>>>
>>>>> If your join condition explicitly includes the event time distance
>>>>> between elements, then it could "just work". If that isn't really part of
>>>>> your join condition, then you will have to see this restriction as a 
>>>>> "knob"
>>>>> that you tweak on your results.
>>>>>
>>>>>>  - when using global window, there has to be trigger and (afaik)
>>>>>>> there is no trigger that would guarantee firing after each data element
>>>>>>> (for early panes) (because triggers are there to express cost-latency
>>>>>>> tradeoff, not semantics)
>>>>>>>
>>>>>>
>>>>>> Can you explain the use case where this matters? If you do trigger
>>>>>> elementCountAtLeast(1) on the join, then the consumer will simply see a
>>>>>> continuous stream of outputs. I'm not sure I understand why the consumer
>>>>>> cares that some of those outputs were in a pane that really held 3 
>>>>>> outputs
>>>>>> instead of 1.
>>>>>>
>>>>>> What I'm trying to solve is basically this:
>>>>>>
>>>>>>  - lhs is event stream
>>>>>>
>>>>>>  - rhs is stream of a "state updates"
>>>>>>
>>>>>> purpose of the join is "take each event, pair it with currently valid
>>>>>> state and produce output and possibly modified state". I cannot process 
>>>>>> two
>>>>>> events at a time, because first event can modify the state and the
>>>>>> subsequent event should see this. It is not a "simple" stateful pardo
>>>>>> either, because the state can be modified externally (not going into too
>>>>>> much detail here, but e.g. by writing into kafka topic).
>>>>>>
>>>>> Reuven's explanation is missing some detail. If the CoGBK is in
>>>>> discarding mode, then it will miss join results. If the CoGBK is in
>>>>> accumulating mode, it will duplicate join results. This is a known problem
>>>>> and the general solution is retractions.
>>>>>
>>>>> Basically, CoGBK-based joins just don't work with triggers until we
>>>>> have retractions.
>>>>>
>>>>>
>>>>>
>>>>>> Moreover, I'd like to define the join semantics so that when there
>>>>>>> are available elements from both sides, the fired pane should be 
>>>>>>> ON_TIME,
>>>>>>> not EARLY. That essentially means that the fully general case would not 
>>>>>>> be
>>>>>>> built around (Co)GBK, but stateful ParDo. There are specific options 
>>>>>>> where
>>>>>>> this fully general case "degrades" into forms that can be efficiently
>>>>>>> expressed using (Co)GBK, that is true.
>>>>>>>
>>>>>>
>>>>>> BTW building this around stateful DoFn might be a better fit. The
>>>>>> main reason I didn't is because we would need a good distributed MapState
>>>>>> (something discussed fairly recently on the list), and that is not yet
>>>>>> built. Once we had that, I might be inclined to rewrite this join on
>>>>>> stateful DoFn.
>>>>>>
>>>>>> Yes, the sorted state helps for streaming case. But I'd be careful
>>>>>> about that for batch case, where this might lead to high pressure on the
>>>>>> state (and InMemoryStateInternals might OOME for instance).
>>>>>>
>>>>>>
>>>>>> However can you explain what you are expecting from the pane? An
>>>>>> EARLY pane simply means that we are producing output before the end of 
>>>>>> the
>>>>>> window. If you are in the global window triggering every element, then
>>>>>> every output is EARLY. It might seem weird if you are interpreting EARLY 
>>>>>> as
>>>>>> "outputting data that isn't ready," however that's not what EARLY is
>>>>>> defined to be. Any change to the pane semantics would be a major breaking
>>>>>> change to very fundamental semantics.
>>>>>>
>>>>>> I wonder if you are really objecting to the name EARLY and ON_TIME?
>>>>>> Maybe we would've been better off tagging it BEFORE_WINDOW_END instead of
>>>>>> EARLY, to make it clear what is meant?
>>>>>>
>>>>>> Essentially I don't object anything here. I'm missing solution to the
>>>>>> "event vs. state" join described above. I was thinking about how to make
>>>>>> these types of problems more user friendly and it essentially leads to
>>>>>> creating a somewhat more generic semantics of join, where end-of-window 
>>>>>> is
>>>>>> converted into "'value-delete events" and then just joining by the
>>>>>> "previous" or "valid" value (yes, this relates to validity windows
>>>>>> mentioned on Beam Summit Europe). It actually turns out that with some 
>>>>>> work
>>>>>> we could define quite "naturally" a join on two streams with global 
>>>>>> window
>>>>>> and no trigger. It would even function with lowest latency possible (but
>>>>>> yes, with the highest expenses, it is actually the introduction of 
>>>>>> (same!)
>>>>>> windows that enable certain optimizations). It the correctly defines
>>>>>> semantics for different windows, although the result would be (probably
>>>>>> unexpectedly) windowed using global window. But that doesn't seem to be 
>>>>>> any
>>>>>> breaking change, because it is currently not possible (any such pipeline
>>>>>> will not be validated).
>>>>>>
>>>>>> Maybe for reference, the unwindowed join would be what is described
>>>>>> here [1]
>>>>>>
>>>>>> [1]
>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-KStream-KTableJoin
>>>>>>
>>>>>>
>>>>>>
>>>>>>> Jan
>>>>>>> On 11/22/19 6:47 PM, Reuven Lax wrote:
>>>>>>>
>>>>>>> Have you seen the Join library that is part of schemas? I'm curious
>>>>>>> whether this fits your needs, or there's something lacking there.
>>>>>>>
>>>>>>> On Fri, Nov 22, 2019 at 12:31 AM Jan Lukavský <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> based on roadmap [1], we would like to define and implement a full
>>>>>>>> set
>>>>>>>> of (unified) stream-stream joins. That would include:
>>>>>>>>
>>>>>>>>   - joins (left, right, full outer) on global window with
>>>>>>>> "immediate
>>>>>>>> trigger"
>>>>>>>>
>>>>>>>>   - joins with different windowing functions on left and right side
>>>>>>>>
>>>>>>>> The approach would be to define these operations in a natural way,
>>>>>>>> so
>>>>>>>> that the definition is aligned with how current joins work (same
>>>>>>>> windows, cartesian product of values with same keys, output
>>>>>>>> timestamp
>>>>>>>> projected to the end of window, etc.). Because this should be a
>>>>>>>> generic
>>>>>>>> approach, this effort should probably be part of join library, that
>>>>>>>> can
>>>>>>>> the be reused by other components, too (e.g. SQL).
>>>>>>>>
>>>>>>>> The question is - is (or was) there any effort that we can build
>>>>>>>> upon?
>>>>>>>> Or should this be designed from scratch?
>>>>>>>>
>>>>>>>> Jan
>>>>>>>>
>>>>>>>> [1] https://beam.apache.org/roadmap/euphoria/
>>>>>>>>
>>>>>>>>

Reply via email to