"*In terms of Join schematic, I think it's hard to reason data completeness
since one side of the join is changing*"
- As it is possible to apply [Global Windows with Non-Default Trigger] to
Unbounded Data Source, say, Kafka, to distinguish this Kafka PCollection
from "Slowly Changing lookup cache" Unbounded PCollection,  If we can check
the condition that one of the PCollection being Joined have WindowFn as
[Global Windows with Trigger Repeatedly.forever(AfterProcessingTime.
pastFirstElementInPane())] is it sufficient to perform the Join of
"MainStream" and this "LookupStream"?

In other words, I mean to say that instead of directly throwing Exception
<https://github.com/apache/beam/blob/f03b6ba12e7c0a1005504612cc6067eebec9ffe8/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L359>
when
Joining two Unbounded PCollections with different WindowFns, If we can
ensure that
MainStream: one side of the join is Unbounded with WindowFn as [Non-Global
Windows with DefaultTrigger] and
LookupStream: the other side of the Join is a "Slowly Changing Lookup
Cache"[Global Windows with Repeatedly.forever(AfterProcessingTime.
pastFirstElementInPane()) Trigger],
we can directly perform a SideInputJoin.

Will we have "data completeness" problem even in "Slowly Changing lookup
Cache Pattern"?

On Fri, Jul 26, 2019 at 2:51 AM Rui Wang <ruw...@google.com> wrote:

> To be more clear, I think it's useful if we can achieve the following that
> you wrote
>
> PCollection mainStream = ...;
> PCollection lookupStream = ...;
> PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"),
> new TupleTag("LookupTable"));
> tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));
>
> -Rui
>
> On Thu, Jul 25, 2019 at 1:56 PM Rui Wang <ruw...@google.com> wrote:
>
>> Hi Rahul, thanks for your detailed writeup. It pretty much summarizes the
>> slow changing table join problem.
>>
>> To your question: "Can we implement SideInputJoin for this case", there
>> are two perspectives.
>>
>> In terms of implementing the slowing changing lookup cache pattern
>> <https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs>
>>  in
>> BeamSQL, such sidinput join can be done that way. At least it worth
>> exploring it until we identify blockers. I also think this pattern is
>> already useful to users.
>>
>> In terms of Join schematic, I think it's hard to reason data completeness
>> since one side of join is changing.
>>
>> -Rui
>>
>>
>> On Thu, Jul 25, 2019 at 12:55 PM rahul patwari <
>> rahulpatwari8...@gmail.com> wrote:
>>
>>> Hi Kenn,
>>>
>>> If we consider the following two *Unbounded* PCollections:
>>> - PCollection1 => [*Non-Global* Window with Default Trigger]
>>> - PCollection2 => [Global Window with *Non-Default* Trigger] :)
>>> coincidentally turned out to be the opposite
>>>
>>> Joining these two PCollections in BeamSql currently is not possible
>>> because of https://jira.apache.org/jira/browse/BEAM-3345(WindowFn
>>> Mismatch)
>>> But in this case, PCollection1 can be joined with PCollection2 using
>>> SideInputJoin (
>>> https://github.com/apache/beam/blob/c7911043510a266078a3dc8faef7a1dbe1f598c5/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L456),
>>> which is being done for Joining an Unbounded PCollection with Bounded
>>> PCollection. I am thinking that Beam can guarantee it joins all input
>>> elements once per window for this case.
>>> The result of the join might be fuzzy for the window when the Trigger
>>> for PCollection2 fires and sideinput gets loaded into Memory.
>>>
>>> PCollection2 can be considered as Slowly Changing Lookup Cache and
>>> BeamSql can support Pattern:
>>> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs,
>>> which is currently not possible.
>>> I am working on https://jira.apache.org/jira/browse/BEAM-7758 for
>>> BeamSql to natively support PCollectionView so that BeamSql supports
>>> "Slowly Updating Global Window Sideinput Pattern" using SqlTransform's
>>> TableProvider.
>>>
>>> If we can support this, User will be able to do:
>>> PCollection mainStream = ...;
>>> PCollection lookupStream = ...;
>>> PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"),
>>> new TupleTag("LookupTable"));
>>> tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));
>>>
>>> Can we implement SideInputJoin for this case?
>>> I might be wrong in my understanding. Please let me know your thoughts.
>>>
>>> Thanks,
>>> Rahul
>>>
>>> On Thu, Jul 25, 2019 at 9:28 AM Kenneth Knowles <k...@apache.org> wrote:
>>>
>>>> I think the best way to approach this is probably to have an example
>>>> SQL statement and to discuss what the relational semantics should be.
>>>>
>>>> Windowing is not really part of SQL (yet) and in a way it just needs
>>>> very minimal extensions. See https://arxiv.org/abs/1905.12133. In this
>>>> proposal for SQL, windowed aggregation is explicitly be part of the GROUP
>>>> BY operation, where you GROUP BY window columns that were added. So it is
>>>> more explicit than in Beam. Relations do not have a WindowFn so there is no
>>>> problem of them being incompatible.
>>>>
>>>> With Beam SQL there are basically two ways of windowing that work
>>>> totally differently:
>>>>
>>>> 1. SQL style windowing where you GROUP BY windows. This does not use
>>>> the input PCollection windowfn
>>>> 2. PCollection windowing where the SQL does not do any windowing - this
>>>> should apply the SQL expression to each window independently
>>>>
>>>> In order to support a hybrid of these, it might be:
>>>>
>>>> 3. SQL style windowing, where when a PCollection has window assigned,
>>>> the window columns are added before the SQL is applied. It is a bit strange
>>>> but might enable your use.
>>>>
>>>> Kenn
>>>>
>>>> On Mon, Jul 22, 2019 at 10:39 AM rahul patwari <
>>>> rahulpatwari8...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Beam currently doesn't support Join of Unbounded PCollections of
>>>>> different WindowFns (
>>>>> https://beam.apache.org/documentation/programming-guide/#groupbykey-and-unbounded-pcollections
>>>>> ).
>>>>>
>>>>> BeamSql performs [Unbounded PCollection] JOIN [Bounded PCollection],
>>>>> by performing 'SideInputJoin' with Bounded PCollection as a SideInput.
>>>>>
>>>>> Can we support [Unbounded PCollection] JOIN [Unbounded PCollection],
>>>>> when one of the Unbounded PCollection has [GlobalWindows Applied with
>>>>> Non-Default Trigger(probably a slow-changing lookup cache
>>>>> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs)]
>>>>> by performing 'SideInputJoin'?
>>>>>
>>>>> Regards,
>>>>> Rahul
>>>>>
>>>>

Reply via email to