To be more clear, I think it's useful if we can achieve the following that
you wrote

PCollection mainStream = ...;
PCollection lookupStream = ...;
PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"), new
tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));


On Thu, Jul 25, 2019 at 1:56 PM Rui Wang <> wrote:

> Hi Rahul, thanks for your detailed writeup. It pretty much summarizes the
> slow changing table join problem.
> To your question: "Can we implement SideInputJoin for this case", there
> are two perspectives.
> In terms of implementing the slowing changing lookup cache pattern
> <>
>  in
> BeamSQL, such sidinput join can be done that way. At least it worth
> exploring it until we identify blockers. I also think this pattern is
> already useful to users.
> In terms of Join schematic, I think it's hard to reason data completeness
> since one side of join is changing.
> -Rui
> On Thu, Jul 25, 2019 at 12:55 PM rahul patwari <>
> wrote:
>> Hi Kenn,
>> If we consider the following two *Unbounded* PCollections:
>> - PCollection1 => [*Non-Global* Window with Default Trigger]
>> - PCollection2 => [Global Window with *Non-Default* Trigger] :)
>> coincidentally turned out to be the opposite
>> Joining these two PCollections in BeamSql currently is not possible
>> because of
>> Mismatch)
>> But in this case, PCollection1 can be joined with PCollection2 using
>> SideInputJoin (
>> which is being done for Joining an Unbounded PCollection with Bounded
>> PCollection. I am thinking that Beam can guarantee it joins all input
>> elements once per window for this case.
>> The result of the join might be fuzzy for the window when the Trigger for
>> PCollection2 fires and sideinput gets loaded into Memory.
>> PCollection2 can be considered as Slowly Changing Lookup Cache and
>> BeamSql can support Pattern:
>> which is currently not possible.
>> I am working on for
>> BeamSql to natively support PCollectionView so that BeamSql supports
>> "Slowly Updating Global Window Sideinput Pattern" using SqlTransform's
>> TableProvider.
>> If we can support this, User will be able to do:
>> PCollection mainStream = ...;
>> PCollection lookupStream = ...;
>> PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"),
>> new TupleTag("LookupTable"));
>> tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));
>> Can we implement SideInputJoin for this case?
>> I might be wrong in my understanding. Please let me know your thoughts.
>> Thanks,
>> Rahul
>> On Thu, Jul 25, 2019 at 9:28 AM Kenneth Knowles <> wrote:
>>> I think the best way to approach this is probably to have an example SQL
>>> statement and to discuss what the relational semantics should be.
>>> Windowing is not really part of SQL (yet) and in a way it just needs
>>> very minimal extensions. See In this
>>> proposal for SQL, windowed aggregation is explicitly be part of the GROUP
>>> BY operation, where you GROUP BY window columns that were added. So it is
>>> more explicit than in Beam. Relations do not have a WindowFn so there is no
>>> problem of them being incompatible.
>>> With Beam SQL there are basically two ways of windowing that work
>>> totally differently:
>>> 1. SQL style windowing where you GROUP BY windows. This does not use the
>>> input PCollection windowfn
>>> 2. PCollection windowing where the SQL does not do any windowing - this
>>> should apply the SQL expression to each window independently
>>> In order to support a hybrid of these, it might be:
>>> 3. SQL style windowing, where when a PCollection has window assigned,
>>> the window columns are added before the SQL is applied. It is a bit strange
>>> but might enable your use.
>>> Kenn
>>> On Mon, Jul 22, 2019 at 10:39 AM rahul patwari <
>>>> wrote:
>>>> Hi,
>>>> Beam currently doesn't support Join of Unbounded PCollections of
>>>> different WindowFns (
>>>> ).
>>>> BeamSql performs [Unbounded PCollection] JOIN [Bounded PCollection], by
>>>> performing 'SideInputJoin' with Bounded PCollection as a SideInput.
>>>> Can we support [Unbounded PCollection] JOIN [Unbounded PCollection],
>>>> when one of the Unbounded PCollection has [GlobalWindows Applied with
>>>> Non-Default Trigger(probably a slow-changing lookup cache
>>>> by performing 'SideInputJoin'?
>>>> Regards,
>>>> Rahul

Reply via email to