I have found the following way to make use of aiohttp via asyncio. It works
alright, and flatmap can be converted to process function to add timers.
ds = env.from_collection(
collection=["a", "b"],
type_info=Types.STRING())
class MyFlatMapFunction(FlatMapFunction):
queue = []
async def process_queue(self, value):
tasks = []
while len(self.queue) > 0:
tasks.append(self.make_http_call(self.queue.pop()))
return await asyncio.gather(*tasks)
async def make_http_call(self, value):
async with aiohttp.ClientSession() as session:
url = 'http://localhost/'
async with session.get(url) as response:
return await response.json()
def flat_map(self, value):
# print("Channel ID: {}".format(value))
self.queue.append(value)
if len(self.queue) == 1:
results = asyncio.run(self.process_queue(value))
for result in results:
yield result
ds = ds.flat_map(MyFlatMapFunction())
ds.print()
env.execute()
On Thu, 5 May 2022 at 08:26, Dian Fu <[email protected]> wrote:
> Hi Dhavan,
>
> Asyncio operator is still not supported in PyFlink.
>
> Regards,
> Dian
>
> On Tue, May 3, 2022 at 3:48 PM Dhavan Vaidya <[email protected]>
> wrote:
>
>> Hey Francis!
>>
>> Thanks for the insights! I am thinking of using Java / Scala for this
>> scenario given your input. Introducing a new language to the team, however,
>> is going to be a big ask :-D
>>
>> Another option that you mentioned is pushing enrichment data instead of
>> pulling. That would be excellent, I will try to model the pipes and see if
>> that works.
>>
>> Thanks again!
>>
>> On Tue, 3 May 2022 at 05:53, Francis Conroy <[email protected]>
>> wrote:
>>
>>> Hi Dhavan,
>>>
>>> We have looked at using pyflink for data stream enrichment and found the
>>> performance lacking compared to the java counterpart. One option for you
>>> might be to use statefun for the enrichment stages. We've also changed our
>>> model for enrichment, we're pushing the enrichment data into the pipeline
>>> instead of pulling it, but this won't work in a lot of situations.
>>>
>>> Hope that gives you some ideas.
>>>
>>> On Mon, 2 May 2022 at 22:54, Dhavan Vaidya <[email protected]>
>>> wrote:
>>>
>>>> Hello!
>>>>
>>>> I want to make HTTP(S) calls to enrich data streams. The HTTP services
>>>> are running on our VPC, so the delay is limited, but sometimes these
>>>> services end up calling third party APIs, and latencies become high.
>>>>
>>>> From documentation (
>>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/overview/)
>>>> it seems PyFlink does not support "asyncio operator" like Java does (
>>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/asyncio/).
>>>> Am I missing something? How should this be approached?
>>>>
>>>> Thanks!
>>>>
>>>> --
>>>> Dhavan
>>>>
>>>
>>> This email and any attachments are proprietary and confidential and are
>>> intended solely for the use of the individual to whom it is addressed. Any
>>> views or opinions expressed are solely those of the author and do not
>>> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
>>> received this email in error, please let us know immediately by reply email
>>> and delete it from your system. You may not use, disseminate, distribute or
>>> copy this message nor disclose its contents to anyone.
>>> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
>>> Australia
>>>
>>