I see. So I guess I wasn't fully understand the requirement: Do you want to have a 1-min window join on two unbounded sources and write to sink when the window closes ? Or there is an extra requirement such that you also want to write to sink every minute per window?
For the former, you can do it by SQL: pipeline.apply(KafkaIO.read() ... ) .apply(Window.into(FixedWindows.of(1 minute)) .apply(SqlTransform( "SELECT ... FROM (select TUMBLE_START() as window_start, * FROM stream1 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE)) as table_a JOIN (select TUMBLE_START() as window_start, * FROM stream2 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE)) as table_b on table_a.window_start = table_b.window_start ...") .apply(HCatalogIO.write() ...) But as Kenneth mentioned HCatalogIO might not work as expected. For the latter, the mixed Java and SQL pipeline won't help you. -Rui On Mon, Feb 24, 2020 at 5:04 PM Kenneth Knowles <k...@apache.org> wrote: > I think actually it depends on the pipeline. You cannot do it all in SQL, > but if you mix Java and SQL I think you can do this. If you write this: > > pipeline.apply(KafkaIO.read() ... ) > .apply(Window.into(FixedWindows.of(1 minute)) > .apply(SqlTransform("SELECT ... FROM stream1 JOIN stream2 ...") > .apply(HCatalogIO.write() ...) > > This should apply the SQL on each window. When the SQL does not do any > windowing, it is required to be a "per-window" SQL execution. That is the > spec for SqlTransform. If that does not work, please report your experience. > > But the SQL semantics do not require waiting. Today the stream-to-stream > join will do a CoGroupByKey so it will wait. But SQL may in the future > adopt a better join for this case that can output records with lower > latency. > > It may be a bigger question whether HCatalogIO.write() has all the knobs > you would like. > > Kenn > > On Mon, Feb 24, 2020 at 12:14 PM Rui Wang <ruw...@google.com> wrote: > >> SQL does not support such joins with your requirement: write to sink >> after every 1 min after window closes. >> >> You might can use state and timer API to achieve your goal. >> >> >> >> -Rui >> >> On Mon, Feb 24, 2020 at 9:50 AM shanta chakpram <shantachakp...@gmail.com> >> wrote: >> >>> Hi, >>> >>> I am trying to join inputs from Unbounded Sources then write to Bounded >>> Sink. >>> The pipeline I'm trying is: >>> Kafka Sources -> SqlTransform -> HCatalogIO Sink >>> And, a FixedWindow of 1 minute duration is applied. >>> >>> I'm expecting the inputs from unbounded sources joined within the >>> current window to be written to the HCatalogIO Sink after every 1 min i.e >>> after each window interval. >>> >>> Can someone please tell if this is a valid scenario and what is the >>> expected behaviour from this kind of scenario? >>> >>> Regards, >>> Shanta >>> >>