I see. So I guess I wasn't fully understand the requirement:

Do you want to have a 1-min window join on two unbounded sources and write
to sink when the window closes ? Or there is an extra requirement such that
you also want to write to sink every minute per window?

For the former, you can do it by SQL:

    pipeline.apply(KafkaIO.read() ... )
        .apply(Window.into(FixedWindows.of(1 minute))
        .apply(SqlTransform(
                  "SELECT ... FROM
                        (select TUMBLE_START() as window_start, * FROM
stream1 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE))  as table_a
                  JOIN
                        (select TUMBLE_START() as window_start, * FROM
stream2 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE))  as table_b
                   on table_a.window_start = table_b.window_start ...")
        .apply(HCatalogIO.write() ...)

But as Kenneth mentioned HCatalogIO might not work as expected.



For the latter, the mixed Java and SQL pipeline won't help you.



-Rui

On Mon, Feb 24, 2020 at 5:04 PM Kenneth Knowles <k...@apache.org> wrote:

> I think actually it depends on the pipeline. You cannot do it all in SQL,
> but if you mix Java and SQL I think you can do this. If you write this:
>
>     pipeline.apply(KafkaIO.read() ... )
>         .apply(Window.into(FixedWindows.of(1 minute))
>         .apply(SqlTransform("SELECT ... FROM stream1 JOIN stream2 ...")
>         .apply(HCatalogIO.write() ...)
>
> This should apply the SQL on each window. When the SQL does not do any
> windowing, it is required to be a "per-window" SQL execution. That is the
> spec for SqlTransform. If that does not work, please report your experience.
>
> But the SQL semantics do not require waiting. Today the stream-to-stream
> join will do a CoGroupByKey so it will wait. But SQL may in the future
> adopt a better join for this case that can output records with lower
> latency.
>
> It may be a bigger question whether HCatalogIO.write() has all the knobs
> you would like.
>
> Kenn
>
> On Mon, Feb 24, 2020 at 12:14 PM Rui Wang <ruw...@google.com> wrote:
>
>> SQL does not support such joins with your requirement: write to sink
>> after every 1 min after window closes.
>>
>> You might can use state and timer API to achieve your goal.
>>
>>
>>
>> -Rui
>>
>> On Mon, Feb 24, 2020 at 9:50 AM shanta chakpram <shantachakp...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I am trying to join inputs from Unbounded Sources then write to Bounded
>>> Sink.
>>> The pipeline I'm trying is:
>>> Kafka Sources -> SqlTransform -> HCatalogIO  Sink
>>>  And, a FixedWindow of 1 minute duration is applied.
>>>
>>> I'm expecting the inputs from unbounded sources joined within the
>>> current window to be written to the HCatalogIO Sink after every 1 min i.e
>>> after each window interval.
>>>
>>> Can someone please tell if this is a valid scenario and what is the
>>> expected behaviour from this kind of scenario?
>>>
>>> Regards,
>>> Shanta
>>>
>>

Reply via email to