Sorry please remove " .apply(Window.into(FixedWindows.of(1 minute))" from
the query above.



-Rui

On Mon, Feb 24, 2020 at 5:26 PM Rui Wang <ruw...@google.com> wrote:

> I see. So I guess I wasn't fully understand the requirement:
>
> Do you want to have a 1-min window join on two unbounded sources and write
> to sink when the window closes ? Or there is an extra requirement such that
> you also want to write to sink every minute per window?
>
> For the former, you can do it by SQL:
>
>     pipeline.apply(KafkaIO.read() ... )
>         .apply(Window.into(FixedWindows.of(1 minute))
>         .apply(SqlTransform(
>                   "SELECT ... FROM
>                         (select TUMBLE_START() as window_start, * FROM
> stream1 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE))  as table_a
>                   JOIN
>                         (select TUMBLE_START() as window_start, * FROM
> stream2 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE))  as table_b
>                    on table_a.window_start = table_b.window_start ...")
>         .apply(HCatalogIO.write() ...)
>
> But as Kenneth mentioned HCatalogIO might not work as expected.
>
>
>
> For the latter, the mixed Java and SQL pipeline won't help you.
>
>
>
> -Rui
>
> On Mon, Feb 24, 2020 at 5:04 PM Kenneth Knowles <k...@apache.org> wrote:
>
>> I think actually it depends on the pipeline. You cannot do it all in SQL,
>> but if you mix Java and SQL I think you can do this. If you write this:
>>
>>     pipeline.apply(KafkaIO.read() ... )
>>         .apply(Window.into(FixedWindows.of(1 minute))
>>         .apply(SqlTransform("SELECT ... FROM stream1 JOIN stream2 ...")
>>         .apply(HCatalogIO.write() ...)
>>
>> This should apply the SQL on each window. When the SQL does not do any
>> windowing, it is required to be a "per-window" SQL execution. That is the
>> spec for SqlTransform. If that does not work, please report your experience.
>>
>> But the SQL semantics do not require waiting. Today the stream-to-stream
>> join will do a CoGroupByKey so it will wait. But SQL may in the future
>> adopt a better join for this case that can output records with lower
>> latency.
>>
>> It may be a bigger question whether HCatalogIO.write() has all the knobs
>> you would like.
>>
>> Kenn
>>
>> On Mon, Feb 24, 2020 at 12:14 PM Rui Wang <ruw...@google.com> wrote:
>>
>>> SQL does not support such joins with your requirement: write to sink
>>> after every 1 min after window closes.
>>>
>>> You might can use state and timer API to achieve your goal.
>>>
>>>
>>>
>>> -Rui
>>>
>>> On Mon, Feb 24, 2020 at 9:50 AM shanta chakpram <
>>> shantachakp...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am trying to join inputs from Unbounded Sources then write to Bounded
>>>> Sink.
>>>> The pipeline I'm trying is:
>>>> Kafka Sources -> SqlTransform -> HCatalogIO  Sink
>>>>  And, a FixedWindow of 1 minute duration is applied.
>>>>
>>>> I'm expecting the inputs from unbounded sources joined within the
>>>> current window to be written to the HCatalogIO Sink after every 1 min i.e
>>>> after each window interval.
>>>>
>>>> Can someone please tell if this is a valid scenario and what is the
>>>> expected behaviour from this kind of scenario?
>>>>
>>>> Regards,
>>>> Shanta
>>>>
>>>

Reply via email to