Re: How should I call external HTTP services with PyFlink?

2022-05-05 Thread Dhavan Vaidya
Hey Dian,

You are right. What this approach does is to allow one to "queue n
messages", call some async function on each of them and return results.
Which means it will indeed block flink job after it gathers n messages. It
is a down-side indeed. Some optimisations with parallelism, watermarking
etc can be implemented, but it is going to be an inherently blocking
operation at some point.

On Fri, 6 May 2022 at 08:29, Dian Fu  wrote:

> Hi Dhavan,
>
> Thanks a lot for the sharing. This is very interesting. Just want to add
> that this is somewhat different from the asyncio operator supported in
> Flink, e.g. you are waiting the results of one element before processing
> the next element and so it's actually synchronous from this point of view.
>
> Regards,
> Dian
>
> On Thu, May 5, 2022 at 9:52 PM Dhavan Vaidya 
> wrote:
>
>> I have found the following way to make use of aiohttp via asyncio. It
>> works alright, and flatmap can be converted to process function to add
>> timers.
>>
>> ds = env.from_collection(
>> collection=["a", "b"],
>> type_info=Types.STRING())
>>
>> class MyFlatMapFunction(FlatMapFunction):
>> queue = []
>>
>> async def process_queue(self, value):
>> tasks = []
>> while len(self.queue) > 0:
>> tasks.append(self.make_http_call(self.queue.pop()))
>> return await asyncio.gather(*tasks)
>>
>> async def make_http_call(self, value):
>> async with aiohttp.ClientSession() as session:
>> url = 'http://localhost/'
>> async with session.get(url) as response:
>> return await response.json()
>>
>> def flat_map(self, value):
>> # print("Channel ID: {}".format(value))
>> self.queue.append(value)
>> if len(self.queue) == 1:
>> results = asyncio.run(self.process_queue(value))
>> for result in results:
>> yield result
>>
>>
>> ds = ds.flat_map(MyFlatMapFunction())
>>
>>
>> ds.print()
>> env.execute()
>>
>> On Thu, 5 May 2022 at 08:26, Dian Fu  wrote:
>>
>>> Hi Dhavan,
>>>
>>> Asyncio operator is still not supported in PyFlink.
>>>
>>> Regards,
>>> Dian
>>>
>>> On Tue, May 3, 2022 at 3:48 PM Dhavan Vaidya <
>>> dhavan.vai...@kofluence.com> wrote:
>>>
 Hey Francis!

 Thanks for the insights! I am thinking of using Java / Scala for this
 scenario given your input. Introducing a new language to the team, however,
 is going to be a big ask :-D

 Another option that you mentioned is pushing enrichment data instead of
 pulling. That would be excellent, I will try to model the pipes and see if
 that works.

 Thanks again!

 On Tue, 3 May 2022 at 05:53, Francis Conroy <
 francis.con...@switchdin.com> wrote:

> Hi Dhavan,
>
> We have looked at using pyflink for data stream enrichment and found
> the performance lacking compared to the java counterpart. One option for
> you might be to use statefun for the enrichment stages. We've also changed
> our model for enrichment, we're pushing the enrichment data into the
> pipeline instead of pulling it, but this won't work in a lot of 
> situations.
>
> Hope that gives you some ideas.
>
> On Mon, 2 May 2022 at 22:54, Dhavan Vaidya <
> dhavan.vai...@kofluence.com> wrote:
>
>> Hello!
>>
>> I want to make HTTP(S) calls to enrich data streams. The HTTP
>> services are running on our VPC, so the delay is limited, but sometimes
>> these services end up calling third party APIs, and latencies become 
>> high.
>>
>> From documentation (
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/overview/)
>> it seems PyFlink does not support "asyncio operator" like Java does (
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/asyncio/).
>> Am I missing something? How should this be approached?
>>
>> Thanks!
>>
>> --
>> Dhavan
>>
>
> This email and any attachments are proprietary and confidential and
> are intended solely for the use of the individual to whom it is addressed.
> Any views or opinions expressed are solely those of the author and do not
> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
> received this email in error, please let us know immediately by reply 
> email
> and delete it from your system. You may not use, disseminate, distribute 
> or
> copy this message nor disclose its contents to anyone.
> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
> Australia
>



Re: How should I call external HTTP services with PyFlink?

2022-05-05 Thread Dian Fu
Hi Dhavan,

Thanks a lot for the sharing. This is very interesting. Just want to add
that this is somewhat different from the asyncio operator supported in
Flink, e.g. you are waiting the results of one element before processing
the next element and so it's actually synchronous from this point of view.

Regards,
Dian

On Thu, May 5, 2022 at 9:52 PM Dhavan Vaidya 
wrote:

> I have found the following way to make use of aiohttp via asyncio. It
> works alright, and flatmap can be converted to process function to add
> timers.
>
> ds = env.from_collection(
> collection=["a", "b"],
> type_info=Types.STRING())
>
> class MyFlatMapFunction(FlatMapFunction):
> queue = []
>
> async def process_queue(self, value):
> tasks = []
> while len(self.queue) > 0:
> tasks.append(self.make_http_call(self.queue.pop()))
> return await asyncio.gather(*tasks)
>
> async def make_http_call(self, value):
> async with aiohttp.ClientSession() as session:
> url = 'http://localhost/'
> async with session.get(url) as response:
> return await response.json()
>
> def flat_map(self, value):
> # print("Channel ID: {}".format(value))
> self.queue.append(value)
> if len(self.queue) == 1:
> results = asyncio.run(self.process_queue(value))
> for result in results:
> yield result
>
>
> ds = ds.flat_map(MyFlatMapFunction())
>
>
> ds.print()
> env.execute()
>
> On Thu, 5 May 2022 at 08:26, Dian Fu  wrote:
>
>> Hi Dhavan,
>>
>> Asyncio operator is still not supported in PyFlink.
>>
>> Regards,
>> Dian
>>
>> On Tue, May 3, 2022 at 3:48 PM Dhavan Vaidya 
>> wrote:
>>
>>> Hey Francis!
>>>
>>> Thanks for the insights! I am thinking of using Java / Scala for this
>>> scenario given your input. Introducing a new language to the team, however,
>>> is going to be a big ask :-D
>>>
>>> Another option that you mentioned is pushing enrichment data instead of
>>> pulling. That would be excellent, I will try to model the pipes and see if
>>> that works.
>>>
>>> Thanks again!
>>>
>>> On Tue, 3 May 2022 at 05:53, Francis Conroy <
>>> francis.con...@switchdin.com> wrote:
>>>
 Hi Dhavan,

 We have looked at using pyflink for data stream enrichment and found
 the performance lacking compared to the java counterpart. One option for
 you might be to use statefun for the enrichment stages. We've also changed
 our model for enrichment, we're pushing the enrichment data into the
 pipeline instead of pulling it, but this won't work in a lot of situations.

 Hope that gives you some ideas.

 On Mon, 2 May 2022 at 22:54, Dhavan Vaidya 
 wrote:

> Hello!
>
> I want to make HTTP(S) calls to enrich data streams. The HTTP services
> are running on our VPC, so the delay is limited, but sometimes these
> services end up calling third party APIs, and latencies become high.
>
> From documentation (
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/overview/)
> it seems PyFlink does not support "asyncio operator" like Java does (
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/asyncio/).
> Am I missing something? How should this be approached?
>
> Thanks!
>
> --
> Dhavan
>

 This email and any attachments are proprietary and confidential and are
 intended solely for the use of the individual to whom it is addressed. Any
 views or opinions expressed are solely those of the author and do not
 necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
 received this email in error, please let us know immediately by reply email
 and delete it from your system. You may not use, disseminate, distribute or
 copy this message nor disclose its contents to anyone.
 SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
 Australia

>>>


Re: How should I call external HTTP services with PyFlink?

2022-05-05 Thread Dhavan Vaidya
I have found the following way to make use of aiohttp via asyncio. It works
alright, and flatmap can be converted to process function to add timers.

ds = env.from_collection(
collection=["a", "b"],
type_info=Types.STRING())

class MyFlatMapFunction(FlatMapFunction):
queue = []

async def process_queue(self, value):
tasks = []
while len(self.queue) > 0:
tasks.append(self.make_http_call(self.queue.pop()))
return await asyncio.gather(*tasks)

async def make_http_call(self, value):
async with aiohttp.ClientSession() as session:
url = 'http://localhost/'
async with session.get(url) as response:
return await response.json()

def flat_map(self, value):
# print("Channel ID: {}".format(value))
self.queue.append(value)
if len(self.queue) == 1:
results = asyncio.run(self.process_queue(value))
for result in results:
yield result


ds = ds.flat_map(MyFlatMapFunction())


ds.print()
env.execute()

On Thu, 5 May 2022 at 08:26, Dian Fu  wrote:

> Hi Dhavan,
>
> Asyncio operator is still not supported in PyFlink.
>
> Regards,
> Dian
>
> On Tue, May 3, 2022 at 3:48 PM Dhavan Vaidya 
> wrote:
>
>> Hey Francis!
>>
>> Thanks for the insights! I am thinking of using Java / Scala for this
>> scenario given your input. Introducing a new language to the team, however,
>> is going to be a big ask :-D
>>
>> Another option that you mentioned is pushing enrichment data instead of
>> pulling. That would be excellent, I will try to model the pipes and see if
>> that works.
>>
>> Thanks again!
>>
>> On Tue, 3 May 2022 at 05:53, Francis Conroy 
>> wrote:
>>
>>> Hi Dhavan,
>>>
>>> We have looked at using pyflink for data stream enrichment and found the
>>> performance lacking compared to the java counterpart. One option for you
>>> might be to use statefun for the enrichment stages. We've also changed our
>>> model for enrichment, we're pushing the enrichment data into the pipeline
>>> instead of pulling it, but this won't work in a lot of situations.
>>>
>>> Hope that gives you some ideas.
>>>
>>> On Mon, 2 May 2022 at 22:54, Dhavan Vaidya 
>>> wrote:
>>>
 Hello!

 I want to make HTTP(S) calls to enrich data streams. The HTTP services
 are running on our VPC, so the delay is limited, but sometimes these
 services end up calling third party APIs, and latencies become high.

 From documentation (
 https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/overview/)
 it seems PyFlink does not support "asyncio operator" like Java does (
 https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/asyncio/).
 Am I missing something? How should this be approached?

 Thanks!

 --
 Dhavan

>>>
>>> This email and any attachments are proprietary and confidential and are
>>> intended solely for the use of the individual to whom it is addressed. Any
>>> views or opinions expressed are solely those of the author and do not
>>> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
>>> received this email in error, please let us know immediately by reply email
>>> and delete it from your system. You may not use, disseminate, distribute or
>>> copy this message nor disclose its contents to anyone.
>>> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
>>> Australia
>>>
>>


Re: How should I call external HTTP services with PyFlink?

2022-05-04 Thread Dian Fu
Hi Dhavan,

Asyncio operator is still not supported in PyFlink.

Regards,
Dian

On Tue, May 3, 2022 at 3:48 PM Dhavan Vaidya 
wrote:

> Hey Francis!
>
> Thanks for the insights! I am thinking of using Java / Scala for this
> scenario given your input. Introducing a new language to the team, however,
> is going to be a big ask :-D
>
> Another option that you mentioned is pushing enrichment data instead of
> pulling. That would be excellent, I will try to model the pipes and see if
> that works.
>
> Thanks again!
>
> On Tue, 3 May 2022 at 05:53, Francis Conroy 
> wrote:
>
>> Hi Dhavan,
>>
>> We have looked at using pyflink for data stream enrichment and found the
>> performance lacking compared to the java counterpart. One option for you
>> might be to use statefun for the enrichment stages. We've also changed our
>> model for enrichment, we're pushing the enrichment data into the pipeline
>> instead of pulling it, but this won't work in a lot of situations.
>>
>> Hope that gives you some ideas.
>>
>> On Mon, 2 May 2022 at 22:54, Dhavan Vaidya 
>> wrote:
>>
>>> Hello!
>>>
>>> I want to make HTTP(S) calls to enrich data streams. The HTTP services
>>> are running on our VPC, so the delay is limited, but sometimes these
>>> services end up calling third party APIs, and latencies become high.
>>>
>>> From documentation (
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/overview/)
>>> it seems PyFlink does not support "asyncio operator" like Java does (
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/asyncio/).
>>> Am I missing something? How should this be approached?
>>>
>>> Thanks!
>>>
>>> --
>>> Dhavan
>>>
>>
>> This email and any attachments are proprietary and confidential and are
>> intended solely for the use of the individual to whom it is addressed. Any
>> views or opinions expressed are solely those of the author and do not
>> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
>> received this email in error, please let us know immediately by reply email
>> and delete it from your system. You may not use, disseminate, distribute or
>> copy this message nor disclose its contents to anyone.
>> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
>> Australia
>>
>


Re: How should I call external HTTP services with PyFlink?

2022-05-03 Thread Dhavan Vaidya
Hey Francis!

Thanks for the insights! I am thinking of using Java / Scala for this
scenario given your input. Introducing a new language to the team, however,
is going to be a big ask :-D

Another option that you mentioned is pushing enrichment data instead of
pulling. That would be excellent, I will try to model the pipes and see if
that works.

Thanks again!

On Tue, 3 May 2022 at 05:53, Francis Conroy 
wrote:

> Hi Dhavan,
>
> We have looked at using pyflink for data stream enrichment and found the
> performance lacking compared to the java counterpart. One option for you
> might be to use statefun for the enrichment stages. We've also changed our
> model for enrichment, we're pushing the enrichment data into the pipeline
> instead of pulling it, but this won't work in a lot of situations.
>
> Hope that gives you some ideas.
>
> On Mon, 2 May 2022 at 22:54, Dhavan Vaidya 
> wrote:
>
>> Hello!
>>
>> I want to make HTTP(S) calls to enrich data streams. The HTTP services
>> are running on our VPC, so the delay is limited, but sometimes these
>> services end up calling third party APIs, and latencies become high.
>>
>> From documentation (
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/overview/)
>> it seems PyFlink does not support "asyncio operator" like Java does (
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/asyncio/).
>> Am I missing something? How should this be approached?
>>
>> Thanks!
>>
>> --
>> Dhavan
>>
>
> This email and any attachments are proprietary and confidential and are
> intended solely for the use of the individual to whom it is addressed. Any
> views or opinions expressed are solely those of the author and do not
> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
> received this email in error, please let us know immediately by reply email
> and delete it from your system. You may not use, disseminate, distribute or
> copy this message nor disclose its contents to anyone.
> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
> Australia
>


Re: How should I call external HTTP services with PyFlink?

2022-05-02 Thread Francis Conroy
Hi Dhavan,

We have looked at using pyflink for data stream enrichment and found the
performance lacking compared to the java counterpart. One option for you
might be to use statefun for the enrichment stages. We've also changed our
model for enrichment, we're pushing the enrichment data into the pipeline
instead of pulling it, but this won't work in a lot of situations.

Hope that gives you some ideas.

On Mon, 2 May 2022 at 22:54, Dhavan Vaidya 
wrote:

> Hello!
>
> I want to make HTTP(S) calls to enrich data streams. The HTTP services are
> running on our VPC, so the delay is limited, but sometimes these services
> end up calling third party APIs, and latencies become high.
>
> From documentation (
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/overview/)
> it seems PyFlink does not support "asyncio operator" like Java does (
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/asyncio/).
> Am I missing something? How should this be approached?
>
> Thanks!
>
> --
> Dhavan
>

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


How should I call external HTTP services with PyFlink?

2022-05-02 Thread Dhavan Vaidya
Hello!

I want to make HTTP(S) calls to enrich data streams. The HTTP services are
running on our VPC, so the delay is limited, but sometimes these services
end up calling third party APIs, and latencies become high.

>From documentation (
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/overview/)
it seems PyFlink does not support "asyncio operator" like Java does (
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/asyncio/).
Am I missing something? How should this be approached?

Thanks!

--
Dhavan