Re: How should I call external HTTP services with PyFlink?
Hey Dian, You are right. What this approach does is to allow one to "queue n messages", call some async function on each of them and return results. Which means it will indeed block flink job after it gathers n messages. It is a down-side indeed. Some optimisations with parallelism, watermarking etc can be implemented, but it is going to be an inherently blocking operation at some point. On Fri, 6 May 2022 at 08:29, Dian Fu wrote: > Hi Dhavan, > > Thanks a lot for the sharing. This is very interesting. Just want to add > that this is somewhat different from the asyncio operator supported in > Flink, e.g. you are waiting the results of one element before processing > the next element and so it's actually synchronous from this point of view. > > Regards, > Dian > > On Thu, May 5, 2022 at 9:52 PM Dhavan Vaidya > wrote: > >> I have found the following way to make use of aiohttp via asyncio. It >> works alright, and flatmap can be converted to process function to add >> timers. >> >> ds = env.from_collection( >> collection=["a", "b"], >> type_info=Types.STRING()) >> >> class MyFlatMapFunction(FlatMapFunction): >> queue = [] >> >> async def process_queue(self, value): >> tasks = [] >> while len(self.queue) > 0: >> tasks.append(self.make_http_call(self.queue.pop())) >> return await asyncio.gather(*tasks) >> >> async def make_http_call(self, value): >> async with aiohttp.ClientSession() as session: >> url = 'http://localhost/' >> async with session.get(url) as response: >> return await response.json() >> >> def flat_map(self, value): >> # print("Channel ID: {}".format(value)) >> self.queue.append(value) >> if len(self.queue) == 1: >> results = asyncio.run(self.process_queue(value)) >> for result in results: >> yield result >> >> >> ds = ds.flat_map(MyFlatMapFunction()) >> >> >> ds.print() >> env.execute() >> >> On Thu, 5 May 2022 at 08:26, Dian Fu wrote: >> >>> Hi Dhavan, >>> >>> Asyncio operator is still not supported in PyFlink. >>> >>> Regards, >>> Dian >>> >>> On Tue, May 3, 2022 at 3:48 PM Dhavan Vaidya < >>> dhavan.vai...@kofluence.com> wrote: >>> Hey Francis! Thanks for the insights! I am thinking of using Java / Scala for this scenario given your input. Introducing a new language to the team, however, is going to be a big ask :-D Another option that you mentioned is pushing enrichment data instead of pulling. That would be excellent, I will try to model the pipes and see if that works. Thanks again! On Tue, 3 May 2022 at 05:53, Francis Conroy < francis.con...@switchdin.com> wrote: > Hi Dhavan, > > We have looked at using pyflink for data stream enrichment and found > the performance lacking compared to the java counterpart. One option for > you might be to use statefun for the enrichment stages. We've also changed > our model for enrichment, we're pushing the enrichment data into the > pipeline instead of pulling it, but this won't work in a lot of > situations. > > Hope that gives you some ideas. > > On Mon, 2 May 2022 at 22:54, Dhavan Vaidya < > dhavan.vai...@kofluence.com> wrote: > >> Hello! >> >> I want to make HTTP(S) calls to enrich data streams. The HTTP >> services are running on our VPC, so the delay is limited, but sometimes >> these services end up calling third party APIs, and latencies become >> high. >> >> From documentation ( >> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/overview/) >> it seems PyFlink does not support "asyncio operator" like Java does ( >> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/asyncio/). >> Am I missing something? How should this be approached? >> >> Thanks! >> >> -- >> Dhavan >> > > This email and any attachments are proprietary and confidential and > are intended solely for the use of the individual to whom it is addressed. > Any views or opinions expressed are solely those of the author and do not > necessarily reflect or represent those of SwitchDin Pty Ltd. If you have > received this email in error, please let us know immediately by reply > email > and delete it from your system. You may not use, disseminate, distribute > or > copy this message nor disclose its contents to anyone. > SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 > Australia >
Re: How should I call external HTTP services with PyFlink?
Hi Dhavan, Thanks a lot for the sharing. This is very interesting. Just want to add that this is somewhat different from the asyncio operator supported in Flink, e.g. you are waiting the results of one element before processing the next element and so it's actually synchronous from this point of view. Regards, Dian On Thu, May 5, 2022 at 9:52 PM Dhavan Vaidya wrote: > I have found the following way to make use of aiohttp via asyncio. It > works alright, and flatmap can be converted to process function to add > timers. > > ds = env.from_collection( > collection=["a", "b"], > type_info=Types.STRING()) > > class MyFlatMapFunction(FlatMapFunction): > queue = [] > > async def process_queue(self, value): > tasks = [] > while len(self.queue) > 0: > tasks.append(self.make_http_call(self.queue.pop())) > return await asyncio.gather(*tasks) > > async def make_http_call(self, value): > async with aiohttp.ClientSession() as session: > url = 'http://localhost/' > async with session.get(url) as response: > return await response.json() > > def flat_map(self, value): > # print("Channel ID: {}".format(value)) > self.queue.append(value) > if len(self.queue) == 1: > results = asyncio.run(self.process_queue(value)) > for result in results: > yield result > > > ds = ds.flat_map(MyFlatMapFunction()) > > > ds.print() > env.execute() > > On Thu, 5 May 2022 at 08:26, Dian Fu wrote: > >> Hi Dhavan, >> >> Asyncio operator is still not supported in PyFlink. >> >> Regards, >> Dian >> >> On Tue, May 3, 2022 at 3:48 PM Dhavan Vaidya >> wrote: >> >>> Hey Francis! >>> >>> Thanks for the insights! I am thinking of using Java / Scala for this >>> scenario given your input. Introducing a new language to the team, however, >>> is going to be a big ask :-D >>> >>> Another option that you mentioned is pushing enrichment data instead of >>> pulling. That would be excellent, I will try to model the pipes and see if >>> that works. >>> >>> Thanks again! >>> >>> On Tue, 3 May 2022 at 05:53, Francis Conroy < >>> francis.con...@switchdin.com> wrote: >>> Hi Dhavan, We have looked at using pyflink for data stream enrichment and found the performance lacking compared to the java counterpart. One option for you might be to use statefun for the enrichment stages. We've also changed our model for enrichment, we're pushing the enrichment data into the pipeline instead of pulling it, but this won't work in a lot of situations. Hope that gives you some ideas. On Mon, 2 May 2022 at 22:54, Dhavan Vaidya wrote: > Hello! > > I want to make HTTP(S) calls to enrich data streams. The HTTP services > are running on our VPC, so the delay is limited, but sometimes these > services end up calling third party APIs, and latencies become high. > > From documentation ( > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/overview/) > it seems PyFlink does not support "asyncio operator" like Java does ( > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/asyncio/). > Am I missing something? How should this be approached? > > Thanks! > > -- > Dhavan > This email and any attachments are proprietary and confidential and are intended solely for the use of the individual to whom it is addressed. Any views or opinions expressed are solely those of the author and do not necessarily reflect or represent those of SwitchDin Pty Ltd. If you have received this email in error, please let us know immediately by reply email and delete it from your system. You may not use, disseminate, distribute or copy this message nor disclose its contents to anyone. SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia >>>
Re: How should I call external HTTP services with PyFlink?
I have found the following way to make use of aiohttp via asyncio. It works alright, and flatmap can be converted to process function to add timers. ds = env.from_collection( collection=["a", "b"], type_info=Types.STRING()) class MyFlatMapFunction(FlatMapFunction): queue = [] async def process_queue(self, value): tasks = [] while len(self.queue) > 0: tasks.append(self.make_http_call(self.queue.pop())) return await asyncio.gather(*tasks) async def make_http_call(self, value): async with aiohttp.ClientSession() as session: url = 'http://localhost/' async with session.get(url) as response: return await response.json() def flat_map(self, value): # print("Channel ID: {}".format(value)) self.queue.append(value) if len(self.queue) == 1: results = asyncio.run(self.process_queue(value)) for result in results: yield result ds = ds.flat_map(MyFlatMapFunction()) ds.print() env.execute() On Thu, 5 May 2022 at 08:26, Dian Fu wrote: > Hi Dhavan, > > Asyncio operator is still not supported in PyFlink. > > Regards, > Dian > > On Tue, May 3, 2022 at 3:48 PM Dhavan Vaidya > wrote: > >> Hey Francis! >> >> Thanks for the insights! I am thinking of using Java / Scala for this >> scenario given your input. Introducing a new language to the team, however, >> is going to be a big ask :-D >> >> Another option that you mentioned is pushing enrichment data instead of >> pulling. That would be excellent, I will try to model the pipes and see if >> that works. >> >> Thanks again! >> >> On Tue, 3 May 2022 at 05:53, Francis Conroy >> wrote: >> >>> Hi Dhavan, >>> >>> We have looked at using pyflink for data stream enrichment and found the >>> performance lacking compared to the java counterpart. One option for you >>> might be to use statefun for the enrichment stages. We've also changed our >>> model for enrichment, we're pushing the enrichment data into the pipeline >>> instead of pulling it, but this won't work in a lot of situations. >>> >>> Hope that gives you some ideas. >>> >>> On Mon, 2 May 2022 at 22:54, Dhavan Vaidya >>> wrote: >>> Hello! I want to make HTTP(S) calls to enrich data streams. The HTTP services are running on our VPC, so the delay is limited, but sometimes these services end up calling third party APIs, and latencies become high. From documentation ( https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/overview/) it seems PyFlink does not support "asyncio operator" like Java does ( https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/asyncio/). Am I missing something? How should this be approached? Thanks! -- Dhavan >>> >>> This email and any attachments are proprietary and confidential and are >>> intended solely for the use of the individual to whom it is addressed. Any >>> views or opinions expressed are solely those of the author and do not >>> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have >>> received this email in error, please let us know immediately by reply email >>> and delete it from your system. You may not use, disseminate, distribute or >>> copy this message nor disclose its contents to anyone. >>> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 >>> Australia >>> >>
Re: How should I call external HTTP services with PyFlink?
Hi Dhavan, Asyncio operator is still not supported in PyFlink. Regards, Dian On Tue, May 3, 2022 at 3:48 PM Dhavan Vaidya wrote: > Hey Francis! > > Thanks for the insights! I am thinking of using Java / Scala for this > scenario given your input. Introducing a new language to the team, however, > is going to be a big ask :-D > > Another option that you mentioned is pushing enrichment data instead of > pulling. That would be excellent, I will try to model the pipes and see if > that works. > > Thanks again! > > On Tue, 3 May 2022 at 05:53, Francis Conroy > wrote: > >> Hi Dhavan, >> >> We have looked at using pyflink for data stream enrichment and found the >> performance lacking compared to the java counterpart. One option for you >> might be to use statefun for the enrichment stages. We've also changed our >> model for enrichment, we're pushing the enrichment data into the pipeline >> instead of pulling it, but this won't work in a lot of situations. >> >> Hope that gives you some ideas. >> >> On Mon, 2 May 2022 at 22:54, Dhavan Vaidya >> wrote: >> >>> Hello! >>> >>> I want to make HTTP(S) calls to enrich data streams. The HTTP services >>> are running on our VPC, so the delay is limited, but sometimes these >>> services end up calling third party APIs, and latencies become high. >>> >>> From documentation ( >>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/overview/) >>> it seems PyFlink does not support "asyncio operator" like Java does ( >>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/asyncio/). >>> Am I missing something? How should this be approached? >>> >>> Thanks! >>> >>> -- >>> Dhavan >>> >> >> This email and any attachments are proprietary and confidential and are >> intended solely for the use of the individual to whom it is addressed. Any >> views or opinions expressed are solely those of the author and do not >> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have >> received this email in error, please let us know immediately by reply email >> and delete it from your system. You may not use, disseminate, distribute or >> copy this message nor disclose its contents to anyone. >> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 >> Australia >> >
Re: How should I call external HTTP services with PyFlink?
Hey Francis! Thanks for the insights! I am thinking of using Java / Scala for this scenario given your input. Introducing a new language to the team, however, is going to be a big ask :-D Another option that you mentioned is pushing enrichment data instead of pulling. That would be excellent, I will try to model the pipes and see if that works. Thanks again! On Tue, 3 May 2022 at 05:53, Francis Conroy wrote: > Hi Dhavan, > > We have looked at using pyflink for data stream enrichment and found the > performance lacking compared to the java counterpart. One option for you > might be to use statefun for the enrichment stages. We've also changed our > model for enrichment, we're pushing the enrichment data into the pipeline > instead of pulling it, but this won't work in a lot of situations. > > Hope that gives you some ideas. > > On Mon, 2 May 2022 at 22:54, Dhavan Vaidya > wrote: > >> Hello! >> >> I want to make HTTP(S) calls to enrich data streams. The HTTP services >> are running on our VPC, so the delay is limited, but sometimes these >> services end up calling third party APIs, and latencies become high. >> >> From documentation ( >> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/overview/) >> it seems PyFlink does not support "asyncio operator" like Java does ( >> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/asyncio/). >> Am I missing something? How should this be approached? >> >> Thanks! >> >> -- >> Dhavan >> > > This email and any attachments are proprietary and confidential and are > intended solely for the use of the individual to whom it is addressed. Any > views or opinions expressed are solely those of the author and do not > necessarily reflect or represent those of SwitchDin Pty Ltd. If you have > received this email in error, please let us know immediately by reply email > and delete it from your system. You may not use, disseminate, distribute or > copy this message nor disclose its contents to anyone. > SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 > Australia >
Re: How should I call external HTTP services with PyFlink?
Hi Dhavan, We have looked at using pyflink for data stream enrichment and found the performance lacking compared to the java counterpart. One option for you might be to use statefun for the enrichment stages. We've also changed our model for enrichment, we're pushing the enrichment data into the pipeline instead of pulling it, but this won't work in a lot of situations. Hope that gives you some ideas. On Mon, 2 May 2022 at 22:54, Dhavan Vaidya wrote: > Hello! > > I want to make HTTP(S) calls to enrich data streams. The HTTP services are > running on our VPC, so the delay is limited, but sometimes these services > end up calling third party APIs, and latencies become high. > > From documentation ( > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/overview/) > it seems PyFlink does not support "asyncio operator" like Java does ( > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/asyncio/). > Am I missing something? How should this be approached? > > Thanks! > > -- > Dhavan > -- This email and any attachments are proprietary and confidential and are intended solely for the use of the individual to whom it is addressed. Any views or opinions expressed are solely those of the author and do not necessarily reflect or represent those of SwitchDin Pty Ltd. If you have received this email in error, please let us know immediately by reply email and delete it from your system. You may not use, disseminate, distribute or copy this message nor disclose its contents to anyone. SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia
How should I call external HTTP services with PyFlink?
Hello! I want to make HTTP(S) calls to enrich data streams. The HTTP services are running on our VPC, so the delay is limited, but sometimes these services end up calling third party APIs, and latencies become high. >From documentation ( https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/overview/) it seems PyFlink does not support "asyncio operator" like Java does ( https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/asyncio/). Am I missing something? How should this be approached? Thanks! -- Dhavan