To be more clear, I think it's useful if we can achieve the following that
you wrote

PCollection mainStream = ...;
PCollection lookupStream = ...;
PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"), new
TupleTag("LookupTable"));
tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));

-Rui

On Thu, Jul 25, 2019 at 1:56 PM Rui Wang <ruw...@google.com> wrote:

> Hi Rahul, thanks for your detailed writeup. It pretty much summarizes the
> slow changing table join problem.
>
> To your question: "Can we implement SideInputJoin for this case", there
> are two perspectives.
>
> In terms of implementing the slowing changing lookup cache pattern
> <https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs>
>  in
> BeamSQL, such sidinput join can be done that way. At least it worth
> exploring it until we identify blockers. I also think this pattern is
> already useful to users.
>
> In terms of Join schematic, I think it's hard to reason data completeness
> since one side of join is changing.
>
> -Rui
>
>
> On Thu, Jul 25, 2019 at 12:55 PM rahul patwari <rahulpatwari8...@gmail.com>
> wrote:
>
>> Hi Kenn,
>>
>> If we consider the following two *Unbounded* PCollections:
>> - PCollection1 => [*Non-Global* Window with Default Trigger]
>> - PCollection2 => [Global Window with *Non-Default* Trigger] :)
>> coincidentally turned out to be the opposite
>>
>> Joining these two PCollections in BeamSql currently is not possible
>> because of https://jira.apache.org/jira/browse/BEAM-3345(WindowFn
>> Mismatch)
>> But in this case, PCollection1 can be joined with PCollection2 using
>> SideInputJoin (
>> https://github.com/apache/beam/blob/c7911043510a266078a3dc8faef7a1dbe1f598c5/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L456),
>> which is being done for Joining an Unbounded PCollection with Bounded
>> PCollection. I am thinking that Beam can guarantee it joins all input
>> elements once per window for this case.
>> The result of the join might be fuzzy for the window when the Trigger for
>> PCollection2 fires and sideinput gets loaded into Memory.
>>
>> PCollection2 can be considered as Slowly Changing Lookup Cache and
>> BeamSql can support Pattern:
>> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs,
>> which is currently not possible.
>> I am working on https://jira.apache.org/jira/browse/BEAM-7758 for
>> BeamSql to natively support PCollectionView so that BeamSql supports
>> "Slowly Updating Global Window Sideinput Pattern" using SqlTransform's
>> TableProvider.
>>
>> If we can support this, User will be able to do:
>> PCollection mainStream = ...;
>> PCollection lookupStream = ...;
>> PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"),
>> new TupleTag("LookupTable"));
>> tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));
>>
>> Can we implement SideInputJoin for this case?
>> I might be wrong in my understanding. Please let me know your thoughts.
>>
>> Thanks,
>> Rahul
>>
>> On Thu, Jul 25, 2019 at 9:28 AM Kenneth Knowles <k...@apache.org> wrote:
>>
>>> I think the best way to approach this is probably to have an example SQL
>>> statement and to discuss what the relational semantics should be.
>>>
>>> Windowing is not really part of SQL (yet) and in a way it just needs
>>> very minimal extensions. See https://arxiv.org/abs/1905.12133. In this
>>> proposal for SQL, windowed aggregation is explicitly be part of the GROUP
>>> BY operation, where you GROUP BY window columns that were added. So it is
>>> more explicit than in Beam. Relations do not have a WindowFn so there is no
>>> problem of them being incompatible.
>>>
>>> With Beam SQL there are basically two ways of windowing that work
>>> totally differently:
>>>
>>> 1. SQL style windowing where you GROUP BY windows. This does not use the
>>> input PCollection windowfn
>>> 2. PCollection windowing where the SQL does not do any windowing - this
>>> should apply the SQL expression to each window independently
>>>
>>> In order to support a hybrid of these, it might be:
>>>
>>> 3. SQL style windowing, where when a PCollection has window assigned,
>>> the window columns are added before the SQL is applied. It is a bit strange
>>> but might enable your use.
>>>
>>> Kenn
>>>
>>> On Mon, Jul 22, 2019 at 10:39 AM rahul patwari <
>>> rahulpatwari8...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Beam currently doesn't support Join of Unbounded PCollections of
>>>> different WindowFns (
>>>> https://beam.apache.org/documentation/programming-guide/#groupbykey-and-unbounded-pcollections
>>>> ).
>>>>
>>>> BeamSql performs [Unbounded PCollection] JOIN [Bounded PCollection], by
>>>> performing 'SideInputJoin' with Bounded PCollection as a SideInput.
>>>>
>>>> Can we support [Unbounded PCollection] JOIN [Unbounded PCollection],
>>>> when one of the Unbounded PCollection has [GlobalWindows Applied with
>>>> Non-Default Trigger(probably a slow-changing lookup cache
>>>> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs)]
>>>> by performing 'SideInputJoin'?
>>>>
>>>> Regards,
>>>> Rahul
>>>>
>>>

Reply via email to