To be more clear, I think it's useful if we can achieve the following that you wrote
PCollection mainStream = ...; PCollection lookupStream = ...; PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"), new TupleTag("LookupTable")); tuple.apply(SqlTransform.query("MainTable JOIN LookupTable")); -Rui On Thu, Jul 25, 2019 at 1:56 PM Rui Wang <ruw...@google.com> wrote: > Hi Rahul, thanks for your detailed writeup. It pretty much summarizes the > slow changing table join problem. > > To your question: "Can we implement SideInputJoin for this case", there > are two perspectives. > > In terms of implementing the slowing changing lookup cache pattern > <https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs> > in > BeamSQL, such sidinput join can be done that way. At least it worth > exploring it until we identify blockers. I also think this pattern is > already useful to users. > > In terms of Join schematic, I think it's hard to reason data completeness > since one side of join is changing. > > -Rui > > > On Thu, Jul 25, 2019 at 12:55 PM rahul patwari <rahulpatwari8...@gmail.com> > wrote: > >> Hi Kenn, >> >> If we consider the following two *Unbounded* PCollections: >> - PCollection1 => [*Non-Global* Window with Default Trigger] >> - PCollection2 => [Global Window with *Non-Default* Trigger] :) >> coincidentally turned out to be the opposite >> >> Joining these two PCollections in BeamSql currently is not possible >> because of https://jira.apache.org/jira/browse/BEAM-3345(WindowFn >> Mismatch) >> But in this case, PCollection1 can be joined with PCollection2 using >> SideInputJoin ( >> https://github.com/apache/beam/blob/c7911043510a266078a3dc8faef7a1dbe1f598c5/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L456), >> which is being done for Joining an Unbounded PCollection with Bounded >> PCollection. I am thinking that Beam can guarantee it joins all input >> elements once per window for this case. >> The result of the join might be fuzzy for the window when the Trigger for >> PCollection2 fires and sideinput gets loaded into Memory. >> >> PCollection2 can be considered as Slowly Changing Lookup Cache and >> BeamSql can support Pattern: >> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs, >> which is currently not possible. >> I am working on https://jira.apache.org/jira/browse/BEAM-7758 for >> BeamSql to natively support PCollectionView so that BeamSql supports >> "Slowly Updating Global Window Sideinput Pattern" using SqlTransform's >> TableProvider. >> >> If we can support this, User will be able to do: >> PCollection mainStream = ...; >> PCollection lookupStream = ...; >> PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"), >> new TupleTag("LookupTable")); >> tuple.apply(SqlTransform.query("MainTable JOIN LookupTable")); >> >> Can we implement SideInputJoin for this case? >> I might be wrong in my understanding. Please let me know your thoughts. >> >> Thanks, >> Rahul >> >> On Thu, Jul 25, 2019 at 9:28 AM Kenneth Knowles <k...@apache.org> wrote: >> >>> I think the best way to approach this is probably to have an example SQL >>> statement and to discuss what the relational semantics should be. >>> >>> Windowing is not really part of SQL (yet) and in a way it just needs >>> very minimal extensions. See https://arxiv.org/abs/1905.12133. In this >>> proposal for SQL, windowed aggregation is explicitly be part of the GROUP >>> BY operation, where you GROUP BY window columns that were added. So it is >>> more explicit than in Beam. Relations do not have a WindowFn so there is no >>> problem of them being incompatible. >>> >>> With Beam SQL there are basically two ways of windowing that work >>> totally differently: >>> >>> 1. SQL style windowing where you GROUP BY windows. This does not use the >>> input PCollection windowfn >>> 2. PCollection windowing where the SQL does not do any windowing - this >>> should apply the SQL expression to each window independently >>> >>> In order to support a hybrid of these, it might be: >>> >>> 3. SQL style windowing, where when a PCollection has window assigned, >>> the window columns are added before the SQL is applied. It is a bit strange >>> but might enable your use. >>> >>> Kenn >>> >>> On Mon, Jul 22, 2019 at 10:39 AM rahul patwari < >>> rahulpatwari8...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> Beam currently doesn't support Join of Unbounded PCollections of >>>> different WindowFns ( >>>> https://beam.apache.org/documentation/programming-guide/#groupbykey-and-unbounded-pcollections >>>> ). >>>> >>>> BeamSql performs [Unbounded PCollection] JOIN [Bounded PCollection], by >>>> performing 'SideInputJoin' with Bounded PCollection as a SideInput. >>>> >>>> Can we support [Unbounded PCollection] JOIN [Unbounded PCollection], >>>> when one of the Unbounded PCollection has [GlobalWindows Applied with >>>> Non-Default Trigger(probably a slow-changing lookup cache >>>> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs)] >>>> by performing 'SideInputJoin'? >>>> >>>> Regards, >>>> Rahul >>>> >>>