I see. So I guess I wasn't fully understand the requirement:

Do you want to have a 1-min window join on two unbounded sources and write
to sink when the window closes ? Or there is an extra requirement such that
you also want to write to sink every minute per window?

For the former, you can do it by SQL:

    pipeline.apply(KafkaIO.read() ... )
        .apply(Window.into(FixedWindows.of(1 minute))
                  "SELECT ... FROM
                        (select TUMBLE_START() as window_start, * FROM
stream1 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE))  as table_a
                        (select TUMBLE_START() as window_start, * FROM
stream2 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE))  as table_b
                   on table_a.window_start = table_b.window_start ...")
        .apply(HCatalogIO.write() ...)

But as Kenneth mentioned HCatalogIO might not work as expected.

For the latter, the mixed Java and SQL pipeline won't help you.


On Mon, Feb 24, 2020 at 5:04 PM Kenneth Knowles <k...@apache.org> wrote:

> I think actually it depends on the pipeline. You cannot do it all in SQL,
> but if you mix Java and SQL I think you can do this. If you write this:
>     pipeline.apply(KafkaIO.read() ... )
>         .apply(Window.into(FixedWindows.of(1 minute))
>         .apply(SqlTransform("SELECT ... FROM stream1 JOIN stream2 ...")
>         .apply(HCatalogIO.write() ...)
> This should apply the SQL on each window. When the SQL does not do any
> windowing, it is required to be a "per-window" SQL execution. That is the
> spec for SqlTransform. If that does not work, please report your experience.
> But the SQL semantics do not require waiting. Today the stream-to-stream
> join will do a CoGroupByKey so it will wait. But SQL may in the future
> adopt a better join for this case that can output records with lower
> latency.
> It may be a bigger question whether HCatalogIO.write() has all the knobs
> you would like.
> Kenn
> On Mon, Feb 24, 2020 at 12:14 PM Rui Wang <ruw...@google.com> wrote:
>> SQL does not support such joins with your requirement: write to sink
>> after every 1 min after window closes.
>> You might can use state and timer API to achieve your goal.
>> -Rui
>> On Mon, Feb 24, 2020 at 9:50 AM shanta chakpram <shantachakp...@gmail.com>
>> wrote:
>>> Hi,
>>> I am trying to join inputs from Unbounded Sources then write to Bounded
>>> Sink.
>>> The pipeline I'm trying is:
>>> Kafka Sources -> SqlTransform -> HCatalogIO  Sink
>>>  And, a FixedWindow of 1 minute duration is applied.
>>> I'm expecting the inputs from unbounded sources joined within the
>>> current window to be written to the HCatalogIO Sink after every 1 min i.e
>>> after each window interval.
>>> Can someone please tell if this is a valid scenario and what is the
>>> expected behaviour from this kind of scenario?
>>> Regards,
>>> Shanta

Reply via email to