+Mikhail Gryzykhin <mailto:[email protected]> +Rui Wang
<mailto:[email protected]> +Reza Rokni
<mailto:[email protected]> who have all done some investigations here.
On Fri, Nov 22, 2019 at 11:48 AM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
On 11/22/19 7:54 PM, Reuven Lax wrote:
On Fri, Nov 22, 2019 at 10:19 AM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
Hi Reuven,
I didn't investigate that particular one, but looking
into that now, it looks that is (same as the "classic"
join library) builds around CoGBK. Is that correct? If
yes, then it essentially means that it:
- works only for cases where both sides have the same
windowfn (that is limitation of Flatten that precedes CoGBK)
Correct. Did you want to join different windows? If so what
are the semantics? If the lhs has FixedWindows and the rhs
has SessionWindows, what do you want the join semantics to
be? The only thing I could imagine would be for the user to
provide some function telling the join how to map the
windows together, but that could be pretty complicated.
I don't want to go too far into details, but generally both
lhs and rhs can be put onto time line and then full join can
be defined as each pair of (lhs, first preceding rhs) and
(rhs, first preceding lhs). Then the end of window is
semantically just clearing the joined value (setting it to
null, thus at the end of window there will be pair (lhs,
null) or (null, rhs) in case of full outer join). This way
any combination of windows is possible, because all window
does is that it "scopes" validity of respective values (lhs,
rhs).
I think it is very valid to hope to do a join in the sense of a
relational join where it is row-to-row. In this case, Beam's
concept of windowing may or may not make sense. It is just a tool
for the job. It is just a grouping key that provides a time when
state can be deleted. So I would say your use case is more global
window to global window join. That is what I think of as a true
stream-to-stream join anyhow. You probably don't want to wait
forever for output. So you'll need to use some knob other than
Beam windows or triggers.
Reza has prototyped a join like you describe here:
https://github.com/apache/beam/pull/9032
If your join condition explicitly includes the event time
distance between elements, then it could "just work". If that
isn't really part of your join condition, then you will have to
see this restriction as a "knob" that you tweak on your results.
- when using global window, there has to be trigger and
(afaik) there is no trigger that would guarantee firing
after each data element (for early panes) (because
triggers are there to express cost-latency tradeoff, not
semantics)
Can you explain the use case where this matters? If you do
trigger elementCountAtLeast(1) on the join, then the
consumer will simply see a continuous stream of outputs. I'm
not sure I understand why the consumer cares that some of
those outputs were in a pane that really held 3 outputs
instead of 1.
What I'm trying to solve is basically this:
- lhs is event stream
- rhs is stream of a "state updates"
purpose of the join is "take each event, pair it with
currently valid state and produce output and possibly
modified state". I cannot process two events at a time,
because first event can modify the state and the subsequent
event should see this. It is not a "simple" stateful pardo
either, because the state can be modified externally (not
going into too much detail here, but e.g. by writing into
kafka topic).
Reuven's explanation is missing some detail. If the CoGBK is in
discarding mode, then it will miss join results. If the CoGBK is
in accumulating mode, it will duplicate join results. This is a
known problem and the general solution is retractions.
Basically, CoGBK-based joins just don't work with triggers until
we have retractions.
Moreover, I'd like to define the join semantics so that
when there are available elements from both sides, the
fired pane should be ON_TIME, not EARLY. That
essentially means that the fully general case would not
be built around (Co)GBK, but stateful ParDo. There are
specific options where this fully general case
"degrades" into forms that can be efficiently expressed
using (Co)GBK, that is true.
BTW building this around stateful DoFn might be a better
fit. The main reason I didn't is because we would need a
good distributed MapState (something discussed fairly
recently on the list), and that is not yet built. Once we
had that, I might be inclined to rewrite this join on
stateful DoFn.
Yes, the sorted state helps for streaming case. But I'd be
careful about that for batch case, where this might lead to
high pressure on the state (and InMemoryStateInternals might
OOME for instance).
However can you explain what you are expecting from the
pane? An EARLY pane simply means that we are producing
output before the end of the window. If you are in the
global window triggering every element, then every output is
EARLY. It might seem weird if you are interpreting EARLY as
"outputting data that isn't ready," however that's not what
EARLY is defined to be. Any change to the pane semantics
would be a major breaking change to very fundamental semantics.
I wonder if you are really objecting to the name EARLY and
ON_TIME? Maybe we would've been better off tagging it
BEFORE_WINDOW_END instead of EARLY, to make it clear what is
meant?
Essentially I don't object anything here. I'm missing
solution to the "event vs. state" join described above. I was
thinking about how to make these types of problems more user
friendly and it essentially leads to creating a somewhat more
generic semantics of join, where end-of-window is converted
into "'value-delete events" and then just joining by the
"previous" or "valid" value (yes, this relates to validity
windows mentioned on Beam Summit Europe). It actually turns
out that with some work we could define quite "naturally" a
join on two streams with global window and no trigger. It
would even function with lowest latency possible (but yes,
with the highest expenses, it is actually the introduction of
(same!) windows that enable certain optimizations). It the
correctly defines semantics for different windows, although
the result would be (probably unexpectedly) windowed using
global window. But that doesn't seem to be any breaking
change, because it is currently not possible (any such
pipeline will not be validated).
Maybe for reference, the unwindowed join would be what is
described here [1]
[1]
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-KStream-KTableJoin
Jan
On 11/22/19 6:47 PM, Reuven Lax wrote:
Have you seen the Join library that is part of schemas?
I'm curious whether this fits your needs, or there's
something lacking there.
On Fri, Nov 22, 2019 at 12:31 AM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
Hi,
based on roadmap [1], we would like to define and
implement a full set
of (unified) stream-stream joins. That would include:
- joins (left, right, full outer) on global
window with "immediate
trigger"
- joins with different windowing functions on
left and right side
The approach would be to define these operations in
a natural way, so
that the definition is aligned with how current
joins work (same
windows, cartesian product of values with same
keys, output timestamp
projected to the end of window, etc.). Because this
should be a generic
approach, this effort should probably be part of
join library, that can
the be reused by other components, too (e.g. SQL).
The question is - is (or was) there any effort that
we can build upon?
Or should this be designed from scratch?
Jan
[1] https://beam.apache.org/roadmap/euphoria/