On Mon, Nov 25, 2019 at 1:56 PM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Rui,
>
> > Hi Kenn, you think stateful DoFn based join can emit joined rows that
> never to be retracted because in stateful DoFn case joined rows will be
> controlled by timers and emit will be only once? If so I will agree with
> it. Generally speaking, if only emit once is the factor of needing
> retraction or not.
>
> that would imply buffering elements up until watermark, then sorting and
> so reduces to the option a) again, is that true? This also has to deal with
> allowed lateness, that would mean, that with allowed lateness greater than
> zero, there can still be multiple firings and so retractions are needed.
>
Specifically, when I say "bi-temporal join" I mean unbounded-to-unbounded
join where one of the join conditions is that elements are within event
time distance d of one another. An element at time t will be saved until
time t + 2d and then garbage collected. Every matching pair can be emitted
immediately.

In the triggered CoGBK + join-product implementation, you do need
retractions as a model concept. But you don't need full support, since they
only need to be shipped as deltas and only from the CoGBK to the
join-product transform where they are all consumed to create only positive
elements. Again a delay is not required; this yields correct results with
the "always" trigger.

Neither case requires waiting or time sorting a whole buffer. The
bi-temporal join requires something more, in a way, since you need to query
by time range and GC time prefixes.

Kenn

Jan
> On 11/25/19 10:17 PM, Rui Wang wrote:
>
>
>
> On Mon, Nov 25, 2019 at 11:29 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>>
>> On 11/25/19 7:47 PM, Kenneth Knowles wrote:
>>
>>
>>
>> On Sun, Nov 24, 2019 at 12:57 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> I can put down a design document, but before that I need to clarify some
>>> things for me. I'm struggling to put all of this into a bigger picture.
>>> Sorry if the arguments are circulating, but I didn't notice any proposal of
>>> how to solve these. If anyone can disprove any of this logic it would be
>>> very much appreciated as I might be able to get from a dead end:
>>>
>>>  a) in the bi-temporal join you can either buffer until watermark, or
>>> emit false data that has to be retracted
>>>
>> This is not the case. A stateful DoFn based join can emit immediately
>> joined rows that will never need to be retracted. The need for retractions
>> has to do with CoGBK-based implementation of a join.
>>
>> I fail to see how this could work. If I emit joined rows immediately
>> without waiting for watermark to pass, I can join two elements, that don't
>> belong to each other, because later can arrive element with lower time
>> distance, that should have been joint in the place of the previously
>> emitted one. This is wrong result that has to be retracted. Or what I'm
>> missing?
>>
>
> Hi Kenn, you think stateful DoFn based join can emit joined rows that
> never to be retracted because in stateful DoFn case joined rows will be
> controlled by timers and emit will be only once? If so I will agree with
> it. Generally speaking, if only emit once is the factor of needing
> retraction or not.
>
> In the past brainstorming, even having retractions ready, streaming join
> with windowing are likely be implemented by a style of CoGBK + stateful
> DoFn.
>
>
>
> I suggest that you work out the definition of the join you are interested
>> in, with a good amount of mathematical rigor, and then consider the ways
>> you can implement it. That is where a design doc will probably clarify
>> things.
>>
>> Kenn
>>
>>  b) until retractions are 100% functional (and that is sort of holy grail
>>> for now), then the only solution is using a buffer holding data up to
>>> watermark *and then sort by event time*
>>>
>>  c) even if retractions were 100% functional, there would have to be
>>> special implementation for batch case, because otherwise this would simply
>>> blow up downstream processing with insanely many false additions and
>>> subsequent retractions
>>>
>>> Property b) means that if we want this feature now, we must sort by
>>> event time and there is no way around. Property c) shows that even in the
>>> future, we must make (in certain cases) distinction between batch and
>>> streaming code paths, which seems weird to me, but it might be an option.
>>> But still, there is no way to express this join in batch case, because it
>>> would require either buffering (up to) whole input on local worker (doesn't
>>> look like viable option) or provide a way in user code to signal the need
>>> for ordering of data inside GBK (and we are there again :)). Yes, we might
>>> shift this need from stateful dofn to GBK like
>>>
>>>  input.apply(GroupByKey.sorted())
>>>
>>> I cannot find a good reasoning why this would be better than giving this
>>> semantics to (stateful) ParDo.
>>>
>>> Maybe someone can help me out here?
>>>
>>> Jan
>>> On 11/24/19 5:05 AM, Kenneth Knowles wrote:
>>>
>>> I don't actually see how event time sorting simplifies this case much.
>>> You still need to buffer elements until they can no longer be matched in
>>> the join, and you still need to query that buffer for elements that might
>>> match. The general "bi-temporal join" (without sorting) requires one new
>>> state type and then it has identical API, does not require any novel data
>>> structures or reasoning, yields better latency (no sort buffer delay), and
>>> discards less data (no sort buffer cutoff; watermark is better). Perhaps a
>>> design document about this specific case would clarify.
>>>
>>> Kenn
>>>
>>> On Fri, Nov 22, 2019 at 10:08 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> I didn't want to go too much into detail, but to describe the idea
>>>> roughly (ignoring the problem of different window fns on both sides to keep
>>>> it as simple as possible):
>>>>
>>>> rhs -----  \
>>>>
>>>>                 flatten (on global window) ---- stateful par do (sorted
>>>> by event time)  ---- output
>>>>
>>>> lhs -----  /
>>>>
>>>> If we can guarantee event time order arrival of events into the
>>>> stateful pardo, then the whole complexity reduces to keep current value of
>>>> left and right element and just flush them out each time there is an
>>>> update. That is the "knob" is actually when watermark moves, because it is
>>>> what tells the join operation that there will be no more (not late) input.
>>>> This is very, very simplified, but depicts the solution. The "classical"
>>>> windowed join reduces to this if all data in each window is projected onto
>>>> window end boundary. Then there will be a cartesian product, because all
>>>> the elements have the same timestamp. I can put this into a design doc with
>>>> all the details, I was trying to find out if there is or was any effort
>>>> around this.
>>>>
>>>> I was in touch with Reza in the PR #9032, I think that it currently
>>>> suffers from problems with running this on batch.
>>>>
>>>> I think I can even (partly) resolve the retraction issue (for joins),
>>>> as described on the thread [1]. Shortly, there can be two copies of the
>>>> stateful dofn, one running at watermark and the other at (watermark -
>>>> allowed lateness). One would produce ON_TIME (maybe wrong) results, the
>>>> other would produce LATE but correct ones. Being able to compare them, the
>>>> outcome would be that it would be possible to retract the wrong results.
>>>>
>>>> Yes, this is also about providing more evidence of why I think
>>>> event-time sorting should be (somehow) part of the model. :-)
>>>>
>>>> Jan
>>>>
>>>> [1]
>>>> https://lists.apache.org/thread.html/a736ebd6b0913d2e06dfa54797716d022adf2c45140530d5c3bd538d@%3Cdev.beam.apache.org%3E
>>>> On 11/23/19 5:54 AM, Kenneth Knowles wrote:
>>>>
>>>> +Mikhail Gryzykhin <mig...@google.com> +Rui Wang <ruw...@google.com> +Reza
>>>> Rokni <r...@google.com> who have all done some investigations here.
>>>>
>>>>
>>>> On Fri, Nov 22, 2019 at 11:48 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>>>
>>>>> On 11/22/19 7:54 PM, Reuven Lax wrote:
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Nov 22, 2019 at 10:19 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>>
>>>>>> Hi Reuven,
>>>>>>
>>>>>> I didn't investigate that particular one, but looking into that now,
>>>>>> it looks that is (same as the "classic" join library) builds around 
>>>>>> CoGBK.
>>>>>> Is that correct? If yes, then it essentially means that it:
>>>>>>
>>>>>  - works only for cases where both sides have the same windowfn (that
>>>>>> is limitation of Flatten that precedes CoGBK)
>>>>>>
>>>>> Correct. Did you want to join different windows? If so what are the
>>>>> semantics? If the lhs has FixedWindows and the rhs has SessionWindows, 
>>>>> what
>>>>> do you want the join semantics to be? The only thing I could imagine would
>>>>> be for the user to provide some function telling the join how to map the
>>>>> windows together, but that could be pretty complicated.
>>>>>
>>>>> I don't want to go too far into details, but generally both lhs and
>>>>> rhs can be put onto time line and then full join can be defined as each
>>>>> pair of (lhs, first preceding rhs) and (rhs, first preceding lhs). Then 
>>>>> the
>>>>> end of window is semantically just clearing the joined value (setting it 
>>>>> to
>>>>> null, thus at the end of window there will be pair (lhs, null) or (null,
>>>>> rhs) in case of full outer join). This way any combination of windows is
>>>>> possible, because all window does is that it "scopes" validity of
>>>>> respective values (lhs, rhs).
>>>>>
>>>>
>>>> I think it is very valid to hope to do a join in the sense of a
>>>> relational join where it is row-to-row. In this case, Beam's concept of
>>>> windowing may or may not make sense. It is just a tool for the job. It is
>>>> just a grouping key that provides a time when state can be deleted. So I
>>>> would say your use case is more global window to global window join. That
>>>> is what I think of as a true stream-to-stream join anyhow. You probably
>>>> don't want to wait forever for output. So you'll need to use some knob
>>>> other than Beam windows or triggers.
>>>>
>>>>> Reza has prototyped a join like you describe here:
>>>> https://github.com/apache/beam/pull/9032
>>>>
>>>> If your join condition explicitly includes the event time distance
>>>> between elements, then it could "just work". If that isn't really part of
>>>> your join condition, then you will have to see this restriction as a "knob"
>>>> that you tweak on your results.
>>>>
>>>>>  - when using global window, there has to be trigger and (afaik) there
>>>>>> is no trigger that would guarantee firing after each data element (for
>>>>>> early panes) (because triggers are there to express cost-latency 
>>>>>> tradeoff,
>>>>>> not semantics)
>>>>>>
>>>>>
>>>>> Can you explain the use case where this matters? If you do trigger
>>>>> elementCountAtLeast(1) on the join, then the consumer will simply see a
>>>>> continuous stream of outputs. I'm not sure I understand why the consumer
>>>>> cares that some of those outputs were in a pane that really held 3 outputs
>>>>> instead of 1.
>>>>>
>>>>> What I'm trying to solve is basically this:
>>>>>
>>>>>  - lhs is event stream
>>>>>
>>>>>  - rhs is stream of a "state updates"
>>>>>
>>>>> purpose of the join is "take each event, pair it with currently valid
>>>>> state and produce output and possibly modified state". I cannot process 
>>>>> two
>>>>> events at a time, because first event can modify the state and the
>>>>> subsequent event should see this. It is not a "simple" stateful pardo
>>>>> either, because the state can be modified externally (not going into too
>>>>> much detail here, but e.g. by writing into kafka topic).
>>>>>
>>>> Reuven's explanation is missing some detail. If the CoGBK is in
>>>> discarding mode, then it will miss join results. If the CoGBK is in
>>>> accumulating mode, it will duplicate join results. This is a known problem
>>>> and the general solution is retractions.
>>>>
>>>> Basically, CoGBK-based joins just don't work with triggers until we
>>>> have retractions.
>>>>
>>>>
>>>>
>>>>> Moreover, I'd like to define the join semantics so that when there are
>>>>>> available elements from both sides, the fired pane should be ON_TIME, not
>>>>>> EARLY. That essentially means that the fully general case would not be
>>>>>> built around (Co)GBK, but stateful ParDo. There are specific options 
>>>>>> where
>>>>>> this fully general case "degrades" into forms that can be efficiently
>>>>>> expressed using (Co)GBK, that is true.
>>>>>>
>>>>>
>>>>> BTW building this around stateful DoFn might be a better fit. The main
>>>>> reason I didn't is because we would need a good distributed MapState
>>>>> (something discussed fairly recently on the list), and that is not yet
>>>>> built. Once we had that, I might be inclined to rewrite this join on
>>>>> stateful DoFn.
>>>>>
>>>>> Yes, the sorted state helps for streaming case. But I'd be careful
>>>>> about that for batch case, where this might lead to high pressure on the
>>>>> state (and InMemoryStateInternals might OOME for instance).
>>>>>
>>>>>
>>>>> However can you explain what you are expecting from the pane? An EARLY
>>>>> pane simply means that we are producing output before the end of the
>>>>> window. If you are in the global window triggering every element, then
>>>>> every output is EARLY. It might seem weird if you are interpreting EARLY 
>>>>> as
>>>>> "outputting data that isn't ready," however that's not what EARLY is
>>>>> defined to be. Any change to the pane semantics would be a major breaking
>>>>> change to very fundamental semantics.
>>>>>
>>>>> I wonder if you are really objecting to the name EARLY and ON_TIME?
>>>>> Maybe we would've been better off tagging it BEFORE_WINDOW_END instead of
>>>>> EARLY, to make it clear what is meant?
>>>>>
>>>>> Essentially I don't object anything here. I'm missing solution to the
>>>>> "event vs. state" join described above. I was thinking about how to make
>>>>> these types of problems more user friendly and it essentially leads to
>>>>> creating a somewhat more generic semantics of join, where end-of-window is
>>>>> converted into "'value-delete events" and then just joining by the
>>>>> "previous" or "valid" value (yes, this relates to validity windows
>>>>> mentioned on Beam Summit Europe). It actually turns out that with some 
>>>>> work
>>>>> we could define quite "naturally" a join on two streams with global window
>>>>> and no trigger. It would even function with lowest latency possible (but
>>>>> yes, with the highest expenses, it is actually the introduction of (same!)
>>>>> windows that enable certain optimizations). It the correctly defines
>>>>> semantics for different windows, although the result would be (probably
>>>>> unexpectedly) windowed using global window. But that doesn't seem to be 
>>>>> any
>>>>> breaking change, because it is currently not possible (any such pipeline
>>>>> will not be validated).
>>>>>
>>>>> Maybe for reference, the unwindowed join would be what is described
>>>>> here [1]
>>>>>
>>>>> [1]
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-KStream-KTableJoin
>>>>>
>>>>>
>>>>>
>>>>>> Jan
>>>>>> On 11/22/19 6:47 PM, Reuven Lax wrote:
>>>>>>
>>>>>> Have you seen the Join library that is part of schemas? I'm curious
>>>>>> whether this fits your needs, or there's something lacking there.
>>>>>>
>>>>>> On Fri, Nov 22, 2019 at 12:31 AM Jan Lukavský <je...@seznam.cz>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> based on roadmap [1], we would like to define and implement a full
>>>>>>> set
>>>>>>> of (unified) stream-stream joins. That would include:
>>>>>>>
>>>>>>>   - joins (left, right, full outer) on global window with "immediate
>>>>>>> trigger"
>>>>>>>
>>>>>>>   - joins with different windowing functions on left and right side
>>>>>>>
>>>>>>> The approach would be to define these operations in a natural way,
>>>>>>> so
>>>>>>> that the definition is aligned with how current joins work (same
>>>>>>> windows, cartesian product of values with same keys, output
>>>>>>> timestamp
>>>>>>> projected to the end of window, etc.). Because this should be a
>>>>>>> generic
>>>>>>> approach, this effort should probably be part of join library, that
>>>>>>> can
>>>>>>> the be reused by other components, too (e.g. SQL).
>>>>>>>
>>>>>>> The question is - is (or was) there any effort that we can build
>>>>>>> upon?
>>>>>>> Or should this be designed from scratch?
>>>>>>>
>>>>>>> Jan
>>>>>>>
>>>>>>> [1] https://beam.apache.org/roadmap/euphoria/
>>>>>>>
>>>>>>>

Reply via email to