I think actually it depends on the pipeline. You cannot do it all in SQL,
but if you mix Java and SQL I think you can do this. If you write this:

    pipeline.apply(KafkaIO.read() ... )
        .apply(Window.into(FixedWindows.of(1 minute))
        .apply(SqlTransform("SELECT ... FROM stream1 JOIN stream2 ...")
        .apply(HCatalogIO.write() ...)

This should apply the SQL on each window. When the SQL does not do any
windowing, it is required to be a "per-window" SQL execution. That is the
spec for SqlTransform. If that does not work, please report your experience.

But the SQL semantics do not require waiting. Today the stream-to-stream
join will do a CoGroupByKey so it will wait. But SQL may in the future
adopt a better join for this case that can output records with lower
latency.

It may be a bigger question whether HCatalogIO.write() has all the knobs
you would like.

Kenn

On Mon, Feb 24, 2020 at 12:14 PM Rui Wang <[email protected]> wrote:

> SQL does not support such joins with your requirement: write to sink after
> every 1 min after window closes.
>
> You might can use state and timer API to achieve your goal.
>
>
>
> -Rui
>
> On Mon, Feb 24, 2020 at 9:50 AM shanta chakpram <[email protected]>
> wrote:
>
>> Hi,
>>
>> I am trying to join inputs from Unbounded Sources then write to Bounded
>> Sink.
>> The pipeline I'm trying is:
>> Kafka Sources -> SqlTransform -> HCatalogIO  Sink
>>  And, a FixedWindow of 1 minute duration is applied.
>>
>> I'm expecting the inputs from unbounded sources joined within the current
>> window to be written to the HCatalogIO Sink after every 1 min i.e after
>> each window interval.
>>
>> Can someone please tell if this is a valid scenario and what is the
>> expected behaviour from this kind of scenario?
>>
>> Regards,
>> Shanta
>>
>

Reply via email to