Hi Rahul, thanks for your detailed writeup. It pretty much summarizes the
slow changing table join problem.

To your question: "Can we implement SideInputJoin for this case", there are
two perspectives.

In terms of implementing the slowing changing lookup cache pattern
<https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs>
in
BeamSQL, such sidinput join can be done that way. At least it worth
exploring it until we identify blockers. I also think this pattern is
already useful to users.

In terms of Join schematic, I think it's hard to reason data completeness
since one side of join is changing.

-Rui


On Thu, Jul 25, 2019 at 12:55 PM rahul patwari <rahulpatwari8...@gmail.com>
wrote:

> Hi Kenn,
>
> If we consider the following two *Unbounded* PCollections:
> - PCollection1 => [*Non-Global* Window with Default Trigger]
> - PCollection2 => [Global Window with *Non-Default* Trigger] :)
> coincidentally turned out to be the opposite
>
> Joining these two PCollections in BeamSql currently is not possible
> because of https://jira.apache.org/jira/browse/BEAM-3345(WindowFn
> Mismatch)
> But in this case, PCollection1 can be joined with PCollection2 using
> SideInputJoin (
> https://github.com/apache/beam/blob/c7911043510a266078a3dc8faef7a1dbe1f598c5/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L456),
> which is being done for Joining an Unbounded PCollection with Bounded
> PCollection. I am thinking that Beam can guarantee it joins all input
> elements once per window for this case.
> The result of the join might be fuzzy for the window when the Trigger for
> PCollection2 fires and sideinput gets loaded into Memory.
>
> PCollection2 can be considered as Slowly Changing Lookup Cache and BeamSql
> can support Pattern:
> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs,
> which is currently not possible.
> I am working on https://jira.apache.org/jira/browse/BEAM-7758 for BeamSql
> to natively support PCollectionView so that BeamSql supports "Slowly
> Updating Global Window Sideinput Pattern" using SqlTransform's
> TableProvider.
>
> If we can support this, User will be able to do:
> PCollection mainStream = ...;
> PCollection lookupStream = ...;
> PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"),
> new TupleTag("LookupTable"));
> tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));
>
> Can we implement SideInputJoin for this case?
> I might be wrong in my understanding. Please let me know your thoughts.
>
> Thanks,
> Rahul
>
> On Thu, Jul 25, 2019 at 9:28 AM Kenneth Knowles <k...@apache.org> wrote:
>
>> I think the best way to approach this is probably to have an example SQL
>> statement and to discuss what the relational semantics should be.
>>
>> Windowing is not really part of SQL (yet) and in a way it just needs very
>> minimal extensions. See https://arxiv.org/abs/1905.12133. In this
>> proposal for SQL, windowed aggregation is explicitly be part of the GROUP
>> BY operation, where you GROUP BY window columns that were added. So it is
>> more explicit than in Beam. Relations do not have a WindowFn so there is no
>> problem of them being incompatible.
>>
>> With Beam SQL there are basically two ways of windowing that work totally
>> differently:
>>
>> 1. SQL style windowing where you GROUP BY windows. This does not use the
>> input PCollection windowfn
>> 2. PCollection windowing where the SQL does not do any windowing - this
>> should apply the SQL expression to each window independently
>>
>> In order to support a hybrid of these, it might be:
>>
>> 3. SQL style windowing, where when a PCollection has window assigned, the
>> window columns are added before the SQL is applied. It is a bit strange but
>> might enable your use.
>>
>> Kenn
>>
>> On Mon, Jul 22, 2019 at 10:39 AM rahul patwari <
>> rahulpatwari8...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Beam currently doesn't support Join of Unbounded PCollections of
>>> different WindowFns (
>>> https://beam.apache.org/documentation/programming-guide/#groupbykey-and-unbounded-pcollections
>>> ).
>>>
>>> BeamSql performs [Unbounded PCollection] JOIN [Bounded PCollection], by
>>> performing 'SideInputJoin' with Bounded PCollection as a SideInput.
>>>
>>> Can we support [Unbounded PCollection] JOIN [Unbounded PCollection],
>>> when one of the Unbounded PCollection has [GlobalWindows Applied with
>>> Non-Default Trigger(probably a slow-changing lookup cache
>>> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs)]
>>> by performing 'SideInputJoin'?
>>>
>>> Regards,
>>> Rahul
>>>
>>

Reply via email to