Sorry please remove " .apply(Window.into(FixedWindows.of(1 minute))" from
the query above.


On Mon, Feb 24, 2020 at 5:26 PM Rui Wang <> wrote:

> I see. So I guess I wasn't fully understand the requirement:
> Do you want to have a 1-min window join on two unbounded sources and write
> to sink when the window closes ? Or there is an extra requirement such that
> you also want to write to sink every minute per window?
> For the former, you can do it by SQL:
>     pipeline.apply( ... )
>         .apply(Window.into(FixedWindows.of(1 minute))
>         .apply(SqlTransform(
>                   "SELECT ... FROM
>                         (select TUMBLE_START() as window_start, * FROM
> stream1 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE))  as table_a
>                   JOIN
>                         (select TUMBLE_START() as window_start, * FROM
> stream2 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE))  as table_b
>                    on table_a.window_start = table_b.window_start ...")
>         .apply(HCatalogIO.write() ...)
> But as Kenneth mentioned HCatalogIO might not work as expected.
> For the latter, the mixed Java and SQL pipeline won't help you.
> -Rui
> On Mon, Feb 24, 2020 at 5:04 PM Kenneth Knowles <> wrote:
>> I think actually it depends on the pipeline. You cannot do it all in SQL,
>> but if you mix Java and SQL I think you can do this. If you write this:
>>     pipeline.apply( ... )
>>         .apply(Window.into(FixedWindows.of(1 minute))
>>         .apply(SqlTransform("SELECT ... FROM stream1 JOIN stream2 ...")
>>         .apply(HCatalogIO.write() ...)
>> This should apply the SQL on each window. When the SQL does not do any
>> windowing, it is required to be a "per-window" SQL execution. That is the
>> spec for SqlTransform. If that does not work, please report your experience.
>> But the SQL semantics do not require waiting. Today the stream-to-stream
>> join will do a CoGroupByKey so it will wait. But SQL may in the future
>> adopt a better join for this case that can output records with lower
>> latency.
>> It may be a bigger question whether HCatalogIO.write() has all the knobs
>> you would like.
>> Kenn
>> On Mon, Feb 24, 2020 at 12:14 PM Rui Wang <> wrote:
>>> SQL does not support such joins with your requirement: write to sink
>>> after every 1 min after window closes.
>>> You might can use state and timer API to achieve your goal.
>>> -Rui
>>> On Mon, Feb 24, 2020 at 9:50 AM shanta chakpram <
>>>> wrote:
>>>> Hi,
>>>> I am trying to join inputs from Unbounded Sources then write to Bounded
>>>> Sink.
>>>> The pipeline I'm trying is:
>>>> Kafka Sources -> SqlTransform -> HCatalogIO  Sink
>>>>  And, a FixedWindow of 1 minute duration is applied.
>>>> I'm expecting the inputs from unbounded sources joined within the
>>>> current window to be written to the HCatalogIO Sink after every 1 min i.e
>>>> after each window interval.
>>>> Can someone please tell if this is a valid scenario and what is the
>>>> expected behaviour from this kind of scenario?
>>>> Regards,
>>>> Shanta

Reply via email to