Sorry please remove " .apply(Window.into(FixedWindows.of(1 minute))" from the query above.
-Rui On Mon, Feb 24, 2020 at 5:26 PM Rui Wang <ruw...@google.com> wrote: > I see. So I guess I wasn't fully understand the requirement: > > Do you want to have a 1-min window join on two unbounded sources and write > to sink when the window closes ? Or there is an extra requirement such that > you also want to write to sink every minute per window? > > For the former, you can do it by SQL: > > pipeline.apply(KafkaIO.read() ... ) > .apply(Window.into(FixedWindows.of(1 minute)) > .apply(SqlTransform( > "SELECT ... FROM > (select TUMBLE_START() as window_start, * FROM > stream1 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE)) as table_a > JOIN > (select TUMBLE_START() as window_start, * FROM > stream2 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE)) as table_b > on table_a.window_start = table_b.window_start ...") > .apply(HCatalogIO.write() ...) > > But as Kenneth mentioned HCatalogIO might not work as expected. > > > > For the latter, the mixed Java and SQL pipeline won't help you. > > > > -Rui > > On Mon, Feb 24, 2020 at 5:04 PM Kenneth Knowles <k...@apache.org> wrote: > >> I think actually it depends on the pipeline. You cannot do it all in SQL, >> but if you mix Java and SQL I think you can do this. If you write this: >> >> pipeline.apply(KafkaIO.read() ... ) >> .apply(Window.into(FixedWindows.of(1 minute)) >> .apply(SqlTransform("SELECT ... FROM stream1 JOIN stream2 ...") >> .apply(HCatalogIO.write() ...) >> >> This should apply the SQL on each window. When the SQL does not do any >> windowing, it is required to be a "per-window" SQL execution. That is the >> spec for SqlTransform. If that does not work, please report your experience. >> >> But the SQL semantics do not require waiting. Today the stream-to-stream >> join will do a CoGroupByKey so it will wait. But SQL may in the future >> adopt a better join for this case that can output records with lower >> latency. >> >> It may be a bigger question whether HCatalogIO.write() has all the knobs >> you would like. >> >> Kenn >> >> On Mon, Feb 24, 2020 at 12:14 PM Rui Wang <ruw...@google.com> wrote: >> >>> SQL does not support such joins with your requirement: write to sink >>> after every 1 min after window closes. >>> >>> You might can use state and timer API to achieve your goal. >>> >>> >>> >>> -Rui >>> >>> On Mon, Feb 24, 2020 at 9:50 AM shanta chakpram < >>> shantachakp...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I am trying to join inputs from Unbounded Sources then write to Bounded >>>> Sink. >>>> The pipeline I'm trying is: >>>> Kafka Sources -> SqlTransform -> HCatalogIO Sink >>>> And, a FixedWindow of 1 minute duration is applied. >>>> >>>> I'm expecting the inputs from unbounded sources joined within the >>>> current window to be written to the HCatalogIO Sink after every 1 min i.e >>>> after each window interval. >>>> >>>> Can someone please tell if this is a valid scenario and what is the >>>> expected behaviour from this kind of scenario? >>>> >>>> Regards, >>>> Shanta >>>> >>>