Re: Unbounded input join Unbounded input then write to Bounded Sink
Hello, Sinks are not bounded or unbounded, they are just normal ParDos (DoFns) that behave consistently with the pipeline data, so if your pipeline deals with unbounded data the sink will write this data correspondingly (when windows close, triggers match, etc so data is ready to be out). One pattern that was reported with a similar pipeline but with JdbcIO as a sink was the connection exhaustion of the database, this happened because the pipeline was producing lots of windows / materializations that ended creating thousands of DoFn writes to the sink and every write to the sink was requesting a new connection so the database got overwhelmed. This was fixed for JdbcIO via a pool of connections and smarter instantiation. If this is happening on HCatalogIO it is an issue that we Beam devs should fix, but this should not be a concern for your pipeline design. Regards, Ismaël On Tue, Feb 25, 2020 at 3:30 AM rahul patwari wrote: > Hi Kenn, Rui, > > The pipeline that we are trying is exactly what Kenn has mentioned above > i.e. > Read From Kafka => Apply Fixed Windows of 1 Min => SqlTransform => Write > to Hive using HcatalogIO > > We are interested in understanding the behaviour when the source is > Unbounded and Sink is bounded as this pipeline is being used for ETL. > Does the same pipeline work for any other Bounded Sink, instead of > HcatalogIO? > What are the features required to be supported by the Bounded Sink, for it > to be used along with an Unbounded Source? > > Are there any best practices (or) pipeline patterns for these kinds of > pipelines? Will there be any performance hits? > > Regards, > Rahul > > On Tue, Feb 25, 2020 at 6:57 AM Rui Wang wrote: > >> Sorry please remove " .apply(Window.into(FixedWindows.of(1 minute))" >> from the query above. >> >> >> >> -Rui >> >> On Mon, Feb 24, 2020 at 5:26 PM Rui Wang wrote: >> >>> I see. So I guess I wasn't fully understand the requirement: >>> >>> Do you want to have a 1-min window join on two unbounded sources and >>> write to sink when the window closes ? Or there is an extra requirement >>> such that you also want to write to sink every minute per window? >>> >>> For the former, you can do it by SQL: >>> >>> pipeline.apply(KafkaIO.read() ... ) >>> .apply(Window.into(FixedWindows.of(1 minute)) >>> .apply(SqlTransform( >>> "SELECT ... FROM >>> (select TUMBLE_START() as window_start, * FROM >>> stream1 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE)) as table_a >>> JOIN >>> (select TUMBLE_START() as window_start, * FROM >>> stream2 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE)) as table_b >>>on table_a.window_start = table_b.window_start ...") >>> .apply(HCatalogIO.write() ...) >>> >>> But as Kenneth mentioned HCatalogIO might not work as expected. >>> >>> >>> >>> For the latter, the mixed Java and SQL pipeline won't help you. >>> >>> >>> >>> -Rui >>> >>> On Mon, Feb 24, 2020 at 5:04 PM Kenneth Knowles wrote: >>> I think actually it depends on the pipeline. You cannot do it all in SQL, but if you mix Java and SQL I think you can do this. If you write this: pipeline.apply(KafkaIO.read() ... ) .apply(Window.into(FixedWindows.of(1 minute)) .apply(SqlTransform("SELECT ... FROM stream1 JOIN stream2 ...") .apply(HCatalogIO.write() ...) This should apply the SQL on each window. When the SQL does not do any windowing, it is required to be a "per-window" SQL execution. That is the spec for SqlTransform. If that does not work, please report your experience. But the SQL semantics do not require waiting. Today the stream-to-stream join will do a CoGroupByKey so it will wait. But SQL may in the future adopt a better join for this case that can output records with lower latency. It may be a bigger question whether HCatalogIO.write() has all the knobs you would like. Kenn On Mon, Feb 24, 2020 at 12:14 PM Rui Wang wrote: > SQL does not support such joins with your requirement: write to sink > after every 1 min after window closes. > > You might can use state and timer API to achieve your goal. > > > > -Rui > > On Mon, Feb 24, 2020 at 9:50 AM shanta chakpram < > shantachakp...@gmail.com> wrote: > >> Hi, >> >> I am trying to join inputs from Unbounded Sources then write to >> Bounded Sink. >> The pipeline I'm trying is: >> Kafka Sources -> SqlTransform -> HCatalogIO Sink >> And, a FixedWindow of 1 minute duration is applied. >> >> I'm expecting the inputs from unbounded sources joined within the >> current window to be written to the HCatalogIO Sink after every 1 min i.e >> after each window interval. >> >> Can someone please tell if this is a valid scenario and what is the >> expe
Re: Unbounded input join Unbounded input then write to Bounded Sink
Hi Kenn, Rui, The pipeline that we are trying is exactly what Kenn has mentioned above i.e. Read From Kafka => Apply Fixed Windows of 1 Min => SqlTransform => Write to Hive using HcatalogIO We are interested in understanding the behaviour when the source is Unbounded and Sink is bounded as this pipeline is being used for ETL. Does the same pipeline work for any other Bounded Sink, instead of HcatalogIO? What are the features required to be supported by the Bounded Sink, for it to be used along with an Unbounded Source? Are there any best practices (or) pipeline patterns for these kinds of pipelines? Will there be any performance hits? Regards, Rahul On Tue, Feb 25, 2020 at 6:57 AM Rui Wang wrote: > Sorry please remove " .apply(Window.into(FixedWindows.of(1 minute))" from > the query above. > > > > -Rui > > On Mon, Feb 24, 2020 at 5:26 PM Rui Wang wrote: > >> I see. So I guess I wasn't fully understand the requirement: >> >> Do you want to have a 1-min window join on two unbounded sources and >> write to sink when the window closes ? Or there is an extra requirement >> such that you also want to write to sink every minute per window? >> >> For the former, you can do it by SQL: >> >> pipeline.apply(KafkaIO.read() ... ) >> .apply(Window.into(FixedWindows.of(1 minute)) >> .apply(SqlTransform( >> "SELECT ... FROM >> (select TUMBLE_START() as window_start, * FROM >> stream1 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE)) as table_a >> JOIN >> (select TUMBLE_START() as window_start, * FROM >> stream2 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE)) as table_b >>on table_a.window_start = table_b.window_start ...") >> .apply(HCatalogIO.write() ...) >> >> But as Kenneth mentioned HCatalogIO might not work as expected. >> >> >> >> For the latter, the mixed Java and SQL pipeline won't help you. >> >> >> >> -Rui >> >> On Mon, Feb 24, 2020 at 5:04 PM Kenneth Knowles wrote: >> >>> I think actually it depends on the pipeline. You cannot do it all in >>> SQL, but if you mix Java and SQL I think you can do this. If you write this: >>> >>> pipeline.apply(KafkaIO.read() ... ) >>> .apply(Window.into(FixedWindows.of(1 minute)) >>> .apply(SqlTransform("SELECT ... FROM stream1 JOIN stream2 ...") >>> .apply(HCatalogIO.write() ...) >>> >>> This should apply the SQL on each window. When the SQL does not do any >>> windowing, it is required to be a "per-window" SQL execution. That is the >>> spec for SqlTransform. If that does not work, please report your experience. >>> >>> But the SQL semantics do not require waiting. Today the stream-to-stream >>> join will do a CoGroupByKey so it will wait. But SQL may in the future >>> adopt a better join for this case that can output records with lower >>> latency. >>> >>> It may be a bigger question whether HCatalogIO.write() has all the knobs >>> you would like. >>> >>> Kenn >>> >>> On Mon, Feb 24, 2020 at 12:14 PM Rui Wang wrote: >>> SQL does not support such joins with your requirement: write to sink after every 1 min after window closes. You might can use state and timer API to achieve your goal. -Rui On Mon, Feb 24, 2020 at 9:50 AM shanta chakpram < shantachakp...@gmail.com> wrote: > Hi, > > I am trying to join inputs from Unbounded Sources then write to > Bounded Sink. > The pipeline I'm trying is: > Kafka Sources -> SqlTransform -> HCatalogIO Sink > And, a FixedWindow of 1 minute duration is applied. > > I'm expecting the inputs from unbounded sources joined within the > current window to be written to the HCatalogIO Sink after every 1 min i.e > after each window interval. > > Can someone please tell if this is a valid scenario and what is the > expected behaviour from this kind of scenario? > > Regards, > Shanta >
Re: Unbounded input join Unbounded input then write to Bounded Sink
I see. So I guess I wasn't fully understand the requirement: Do you want to have a 1-min window join on two unbounded sources and write to sink when the window closes ? Or there is an extra requirement such that you also want to write to sink every minute per window? For the former, you can do it by SQL: pipeline.apply(KafkaIO.read() ... ) .apply(Window.into(FixedWindows.of(1 minute)) .apply(SqlTransform( "SELECT ... FROM (select TUMBLE_START() as window_start, * FROM stream1 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE)) as table_a JOIN (select TUMBLE_START() as window_start, * FROM stream2 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE)) as table_b on table_a.window_start = table_b.window_start ...") .apply(HCatalogIO.write() ...) But as Kenneth mentioned HCatalogIO might not work as expected. For the latter, the mixed Java and SQL pipeline won't help you. -Rui On Mon, Feb 24, 2020 at 5:04 PM Kenneth Knowles wrote: > I think actually it depends on the pipeline. You cannot do it all in SQL, > but if you mix Java and SQL I think you can do this. If you write this: > > pipeline.apply(KafkaIO.read() ... ) > .apply(Window.into(FixedWindows.of(1 minute)) > .apply(SqlTransform("SELECT ... FROM stream1 JOIN stream2 ...") > .apply(HCatalogIO.write() ...) > > This should apply the SQL on each window. When the SQL does not do any > windowing, it is required to be a "per-window" SQL execution. That is the > spec for SqlTransform. If that does not work, please report your experience. > > But the SQL semantics do not require waiting. Today the stream-to-stream > join will do a CoGroupByKey so it will wait. But SQL may in the future > adopt a better join for this case that can output records with lower > latency. > > It may be a bigger question whether HCatalogIO.write() has all the knobs > you would like. > > Kenn > > On Mon, Feb 24, 2020 at 12:14 PM Rui Wang wrote: > >> SQL does not support such joins with your requirement: write to sink >> after every 1 min after window closes. >> >> You might can use state and timer API to achieve your goal. >> >> >> >> -Rui >> >> On Mon, Feb 24, 2020 at 9:50 AM shanta chakpram >> wrote: >> >>> Hi, >>> >>> I am trying to join inputs from Unbounded Sources then write to Bounded >>> Sink. >>> The pipeline I'm trying is: >>> Kafka Sources -> SqlTransform -> HCatalogIO Sink >>> And, a FixedWindow of 1 minute duration is applied. >>> >>> I'm expecting the inputs from unbounded sources joined within the >>> current window to be written to the HCatalogIO Sink after every 1 min i.e >>> after each window interval. >>> >>> Can someone please tell if this is a valid scenario and what is the >>> expected behaviour from this kind of scenario? >>> >>> Regards, >>> Shanta >>> >>
Re: Unbounded input join Unbounded input then write to Bounded Sink
Sorry please remove " .apply(Window.into(FixedWindows.of(1 minute))" from the query above. -Rui On Mon, Feb 24, 2020 at 5:26 PM Rui Wang wrote: > I see. So I guess I wasn't fully understand the requirement: > > Do you want to have a 1-min window join on two unbounded sources and write > to sink when the window closes ? Or there is an extra requirement such that > you also want to write to sink every minute per window? > > For the former, you can do it by SQL: > > pipeline.apply(KafkaIO.read() ... ) > .apply(Window.into(FixedWindows.of(1 minute)) > .apply(SqlTransform( > "SELECT ... FROM > (select TUMBLE_START() as window_start, * FROM > stream1 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE)) as table_a > JOIN > (select TUMBLE_START() as window_start, * FROM > stream2 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE)) as table_b >on table_a.window_start = table_b.window_start ...") > .apply(HCatalogIO.write() ...) > > But as Kenneth mentioned HCatalogIO might not work as expected. > > > > For the latter, the mixed Java and SQL pipeline won't help you. > > > > -Rui > > On Mon, Feb 24, 2020 at 5:04 PM Kenneth Knowles wrote: > >> I think actually it depends on the pipeline. You cannot do it all in SQL, >> but if you mix Java and SQL I think you can do this. If you write this: >> >> pipeline.apply(KafkaIO.read() ... ) >> .apply(Window.into(FixedWindows.of(1 minute)) >> .apply(SqlTransform("SELECT ... FROM stream1 JOIN stream2 ...") >> .apply(HCatalogIO.write() ...) >> >> This should apply the SQL on each window. When the SQL does not do any >> windowing, it is required to be a "per-window" SQL execution. That is the >> spec for SqlTransform. If that does not work, please report your experience. >> >> But the SQL semantics do not require waiting. Today the stream-to-stream >> join will do a CoGroupByKey so it will wait. But SQL may in the future >> adopt a better join for this case that can output records with lower >> latency. >> >> It may be a bigger question whether HCatalogIO.write() has all the knobs >> you would like. >> >> Kenn >> >> On Mon, Feb 24, 2020 at 12:14 PM Rui Wang wrote: >> >>> SQL does not support such joins with your requirement: write to sink >>> after every 1 min after window closes. >>> >>> You might can use state and timer API to achieve your goal. >>> >>> >>> >>> -Rui >>> >>> On Mon, Feb 24, 2020 at 9:50 AM shanta chakpram < >>> shantachakp...@gmail.com> wrote: >>> Hi, I am trying to join inputs from Unbounded Sources then write to Bounded Sink. The pipeline I'm trying is: Kafka Sources -> SqlTransform -> HCatalogIO Sink And, a FixedWindow of 1 minute duration is applied. I'm expecting the inputs from unbounded sources joined within the current window to be written to the HCatalogIO Sink after every 1 min i.e after each window interval. Can someone please tell if this is a valid scenario and what is the expected behaviour from this kind of scenario? Regards, Shanta >>>
Re: Unbounded input join Unbounded input then write to Bounded Sink
I think actually it depends on the pipeline. You cannot do it all in SQL, but if you mix Java and SQL I think you can do this. If you write this: pipeline.apply(KafkaIO.read() ... ) .apply(Window.into(FixedWindows.of(1 minute)) .apply(SqlTransform("SELECT ... FROM stream1 JOIN stream2 ...") .apply(HCatalogIO.write() ...) This should apply the SQL on each window. When the SQL does not do any windowing, it is required to be a "per-window" SQL execution. That is the spec for SqlTransform. If that does not work, please report your experience. But the SQL semantics do not require waiting. Today the stream-to-stream join will do a CoGroupByKey so it will wait. But SQL may in the future adopt a better join for this case that can output records with lower latency. It may be a bigger question whether HCatalogIO.write() has all the knobs you would like. Kenn On Mon, Feb 24, 2020 at 12:14 PM Rui Wang wrote: > SQL does not support such joins with your requirement: write to sink after > every 1 min after window closes. > > You might can use state and timer API to achieve your goal. > > > > -Rui > > On Mon, Feb 24, 2020 at 9:50 AM shanta chakpram > wrote: > >> Hi, >> >> I am trying to join inputs from Unbounded Sources then write to Bounded >> Sink. >> The pipeline I'm trying is: >> Kafka Sources -> SqlTransform -> HCatalogIO Sink >> And, a FixedWindow of 1 minute duration is applied. >> >> I'm expecting the inputs from unbounded sources joined within the current >> window to be written to the HCatalogIO Sink after every 1 min i.e after >> each window interval. >> >> Can someone please tell if this is a valid scenario and what is the >> expected behaviour from this kind of scenario? >> >> Regards, >> Shanta >> >
Re: Unbounded input join Unbounded input then write to Bounded Sink
SQL does not support such joins with your requirement: write to sink after every 1 min after window closes. You might can use state and timer API to achieve your goal. -Rui On Mon, Feb 24, 2020 at 9:50 AM shanta chakpram wrote: > Hi, > > I am trying to join inputs from Unbounded Sources then write to Bounded > Sink. > The pipeline I'm trying is: > Kafka Sources -> SqlTransform -> HCatalogIO Sink > And, a FixedWindow of 1 minute duration is applied. > > I'm expecting the inputs from unbounded sources joined within the current > window to be written to the HCatalogIO Sink after every 1 min i.e after > each window interval. > > Can someone please tell if this is a valid scenario and what is the > expected behaviour from this kind of scenario? > > Regards, > Shanta >