Re: Async I/O: preserve stream order for requests on key level

2023-06-27 Thread Teoh, Hong
Hi Juho,

Thank you for bringing this up! Definitely +1 to this. We have had similar 
requests for the AsyncSink as well.
As a side note, it would be useful to share the same implementation for both 
somehow, to prevent duplicate code.

Happy to help with the implementation here.

For the AsyncSink, this came up when we were implementing a sink to DynamoDB. 
We use a batchWrite, and this doesn’t allow two requests on the same key in the 
same batch write. So in our case, since DDB overwrites, we take the latest 
record received from the Flink job graph as truth. [1]

[1] 
https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java#L169-L171


Regards,
Hong




> On 26 Jun 2023, at 03:52, Jark Wu  wrote:
> 
> CAUTION: This email originated from outside of the organization. Do not click 
> links or open attachments unless you can confirm the sender and know the 
> content is safe.
> 
> 
> 
> This is not supported by the current Async I/O API.
> But I do think this is a very useful feature and we should support it.
> Just as Jingsong said this allows changelog stream can also use Async
> Lookup Join.
> The rough idea is just like a mixture of the ordered and unordered modes of
> async operator.
> Requests for the same key should be handled in the ordered mode,
> but requests for different keys can be handled in the unordered mode.
> 
> Best,
> Jark
> 
> On Wed, 21 Jun 2023 at 12:18, Jingsong Li  wrote:
> 
>> +1 for this.
>> 
>> Actually, this is a headache for Flink SQL too.
>> 
>> There is certainly a lot of updated data (CDC changelog) in real
>> stream processing, The semantics here is the need to ensure the order
>> between keys, and different keys can be handled in disorder.
>> 
>> I'm very happy that the community has a similar need, and I think it's
>> worth refining it in Flink.
>> 
>> Best,
>> Jingsong
>> 
>> On Tue, Jun 20, 2023 at 10:20 PM Juho Autio
>>  wrote:
>>> 
>>> Thank you very much! It seems like you have a quite similar goal.
>> However,
>>> could you clarify: do you maintain the stream order on key level, or do
>> you
>>> just limit the parallel requests per key to one without caring about the
>>> order?
>>> 
>>> I'm not 100% sure how your implementation with futures is done. If you
>> are
>>> able to share a code snippet that would be much appreciated!
>>> 
>>> I'm also wondering what kind of memory implication that implementation
>> has:
>>> would the futures be queued inside the operator without any limit? Would
>> it
>>> be a problem if the same key has too many records within the same time
>>> window? But I suppose the function can be made blocking to protect
>> against
>>> that.
>>> 
>>> On Tue, Jun 20, 2023 at 3:34 PM Galen Warren
>>>  wrote:
>>> 
 Hi Juho -- I'm doing something similar. In my case, I want to execute
>> async
 requests concurrently for inputs associated with different keys but
>> issue
 them sequentially for any given key. The way I do it is to create a
>> keyed
 stream and use it as an input to an async function. In this
>> arrangement,
 all the inputs for a given key are handled by a single instance of the
 async function; inside that function instance, I use a map to keep
>> track of
 any in-flight requests for a given key. When a new input comes in for a
 key, if there is an existing in-flight request for that key, the future
 that is constructed for the new request is constructed as [existing
 request].then([new request]) so that the new one is only executed once
>> the
 in-flight request completes. The futures are constructed in such a way
>> that
 they maintain the map properly after completing.
 
 
 On Mon, Jun 19, 2023 at 10:55 AM Juho Autio
>> 
 wrote:
 
> I need to make some slower external requests in parallel, so Async
>> I/O
> helps greatly with that. However, I also need to make the requests
>> in a
> certain order per key. Is that possible with Async I/O?
> 
> The documentation[1] talks about preserving the stream order of
> results, but it doesn't discuss the order of the async requests. I
>> tried
 to
> use AsyncDataStream.orderedWait, but the order of async requests
>> seems to
> be random – the order of calls gets shuffled even if I
> use AsyncDataStream.orderedWait.
> 
> If that is by design, would there be any suggestion how to work
>> around
> that? I was thinking of collecting all events of the same key into a
> List, so that the async operator gets a list instead of individual
 events.
> There are of course some downsides with using a List, so I would
>> rather
> have something better.
> 
> In a nutshell my code is:
> 
> AsyncDataStream.orderedWait(stream.keyBy(key), asyncFunction)
> 
> The asyncFunction extends RichAsyncFunction.
> 
> Thanks!
> 
> [1]
> 
> 

Re: Async I/O: preserve stream order for requests on key level

2023-06-25 Thread Jark Wu
This is not supported by the current Async I/O API.
But I do think this is a very useful feature and we should support it.
Just as Jingsong said this allows changelog stream can also use Async
Lookup Join.
The rough idea is just like a mixture of the ordered and unordered modes of
async operator.
Requests for the same key should be handled in the ordered mode,
but requests for different keys can be handled in the unordered mode.

Best,
Jark

On Wed, 21 Jun 2023 at 12:18, Jingsong Li  wrote:

> +1 for this.
>
> Actually, this is a headache for Flink SQL too.
>
> There is certainly a lot of updated data (CDC changelog) in real
> stream processing, The semantics here is the need to ensure the order
> between keys, and different keys can be handled in disorder.
>
> I'm very happy that the community has a similar need, and I think it's
> worth refining it in Flink.
>
> Best,
> Jingsong
>
> On Tue, Jun 20, 2023 at 10:20 PM Juho Autio
>  wrote:
> >
> > Thank you very much! It seems like you have a quite similar goal.
> However,
> > could you clarify: do you maintain the stream order on key level, or do
> you
> > just limit the parallel requests per key to one without caring about the
> > order?
> >
> > I'm not 100% sure how your implementation with futures is done. If you
> are
> > able to share a code snippet that would be much appreciated!
> >
> > I'm also wondering what kind of memory implication that implementation
> has:
> > would the futures be queued inside the operator without any limit? Would
> it
> > be a problem if the same key has too many records within the same time
> > window? But I suppose the function can be made blocking to protect
> against
> > that.
> >
> > On Tue, Jun 20, 2023 at 3:34 PM Galen Warren
> >  wrote:
> >
> > > Hi Juho -- I'm doing something similar. In my case, I want to execute
> async
> > > requests concurrently for inputs associated with different keys but
> issue
> > > them sequentially for any given key. The way I do it is to create a
> keyed
> > > stream and use it as an input to an async function. In this
> arrangement,
> > > all the inputs for a given key are handled by a single instance of the
> > > async function; inside that function instance, I use a map to keep
> track of
> > > any in-flight requests for a given key. When a new input comes in for a
> > > key, if there is an existing in-flight request for that key, the future
> > > that is constructed for the new request is constructed as [existing
> > > request].then([new request]) so that the new one is only executed once
> the
> > > in-flight request completes. The futures are constructed in such a way
> that
> > > they maintain the map properly after completing.
> > >
> > >
> > > On Mon, Jun 19, 2023 at 10:55 AM Juho Autio
> 
> > > wrote:
> > >
> > > > I need to make some slower external requests in parallel, so Async
> I/O
> > > > helps greatly with that. However, I also need to make the requests
> in a
> > > > certain order per key. Is that possible with Async I/O?
> > > >
> > > > The documentation[1] talks about preserving the stream order of
> > > > results, but it doesn't discuss the order of the async requests. I
> tried
> > > to
> > > > use AsyncDataStream.orderedWait, but the order of async requests
> seems to
> > > > be random – the order of calls gets shuffled even if I
> > > > use AsyncDataStream.orderedWait.
> > > >
> > > > If that is by design, would there be any suggestion how to work
> around
> > > > that? I was thinking of collecting all events of the same key into a
> > > > List, so that the async operator gets a list instead of individual
> > > events.
> > > > There are of course some downsides with using a List, so I would
> rather
> > > > have something better.
> > > >
> > > > In a nutshell my code is:
> > > >
> > > > AsyncDataStream.orderedWait(stream.keyBy(key), asyncFunction)
> > > >
> > > > The asyncFunction extends RichAsyncFunction.
> > > >
> > > > Thanks!
> > > >
> > > > [1]
> > > >
> > > >
> > >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#order-of-results
> > > >
> > > > (Sorry if it's not appropriate to post this type of question to the
> dev
> > > > mailing list. I tried the Flink users list with no luck.)
> > > >
>


Re: Async I/O: preserve stream order for requests on key level

2023-06-20 Thread Jingsong Li
+1 for this.

Actually, this is a headache for Flink SQL too.

There is certainly a lot of updated data (CDC changelog) in real
stream processing, The semantics here is the need to ensure the order
between keys, and different keys can be handled in disorder.

I'm very happy that the community has a similar need, and I think it's
worth refining it in Flink.

Best,
Jingsong

On Tue, Jun 20, 2023 at 10:20 PM Juho Autio
 wrote:
>
> Thank you very much! It seems like you have a quite similar goal. However,
> could you clarify: do you maintain the stream order on key level, or do you
> just limit the parallel requests per key to one without caring about the
> order?
>
> I'm not 100% sure how your implementation with futures is done. If you are
> able to share a code snippet that would be much appreciated!
>
> I'm also wondering what kind of memory implication that implementation has:
> would the futures be queued inside the operator without any limit? Would it
> be a problem if the same key has too many records within the same time
> window? But I suppose the function can be made blocking to protect against
> that.
>
> On Tue, Jun 20, 2023 at 3:34 PM Galen Warren
>  wrote:
>
> > Hi Juho -- I'm doing something similar. In my case, I want to execute async
> > requests concurrently for inputs associated with different keys but issue
> > them sequentially for any given key. The way I do it is to create a keyed
> > stream and use it as an input to an async function. In this arrangement,
> > all the inputs for a given key are handled by a single instance of the
> > async function; inside that function instance, I use a map to keep track of
> > any in-flight requests for a given key. When a new input comes in for a
> > key, if there is an existing in-flight request for that key, the future
> > that is constructed for the new request is constructed as [existing
> > request].then([new request]) so that the new one is only executed once the
> > in-flight request completes. The futures are constructed in such a way that
> > they maintain the map properly after completing.
> >
> >
> > On Mon, Jun 19, 2023 at 10:55 AM Juho Autio 
> > wrote:
> >
> > > I need to make some slower external requests in parallel, so Async I/O
> > > helps greatly with that. However, I also need to make the requests in a
> > > certain order per key. Is that possible with Async I/O?
> > >
> > > The documentation[1] talks about preserving the stream order of
> > > results, but it doesn't discuss the order of the async requests. I tried
> > to
> > > use AsyncDataStream.orderedWait, but the order of async requests seems to
> > > be random – the order of calls gets shuffled even if I
> > > use AsyncDataStream.orderedWait.
> > >
> > > If that is by design, would there be any suggestion how to work around
> > > that? I was thinking of collecting all events of the same key into a
> > > List, so that the async operator gets a list instead of individual
> > events.
> > > There are of course some downsides with using a List, so I would rather
> > > have something better.
> > >
> > > In a nutshell my code is:
> > >
> > > AsyncDataStream.orderedWait(stream.keyBy(key), asyncFunction)
> > >
> > > The asyncFunction extends RichAsyncFunction.
> > >
> > > Thanks!
> > >
> > > [1]
> > >
> > >
> > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#order-of-results
> > >
> > > (Sorry if it's not appropriate to post this type of question to the dev
> > > mailing list. I tried the Flink users list with no luck.)
> > >


Re: Async I/O: preserve stream order for requests on key level

2023-06-20 Thread Juho Autio
Thank you very much! It seems like you have a quite similar goal. However,
could you clarify: do you maintain the stream order on key level, or do you
just limit the parallel requests per key to one without caring about the
order?

I'm not 100% sure how your implementation with futures is done. If you are
able to share a code snippet that would be much appreciated!

I'm also wondering what kind of memory implication that implementation has:
would the futures be queued inside the operator without any limit? Would it
be a problem if the same key has too many records within the same time
window? But I suppose the function can be made blocking to protect against
that.

On Tue, Jun 20, 2023 at 3:34 PM Galen Warren
 wrote:

> Hi Juho -- I'm doing something similar. In my case, I want to execute async
> requests concurrently for inputs associated with different keys but issue
> them sequentially for any given key. The way I do it is to create a keyed
> stream and use it as an input to an async function. In this arrangement,
> all the inputs for a given key are handled by a single instance of the
> async function; inside that function instance, I use a map to keep track of
> any in-flight requests for a given key. When a new input comes in for a
> key, if there is an existing in-flight request for that key, the future
> that is constructed for the new request is constructed as [existing
> request].then([new request]) so that the new one is only executed once the
> in-flight request completes. The futures are constructed in such a way that
> they maintain the map properly after completing.
>
>
> On Mon, Jun 19, 2023 at 10:55 AM Juho Autio 
> wrote:
>
> > I need to make some slower external requests in parallel, so Async I/O
> > helps greatly with that. However, I also need to make the requests in a
> > certain order per key. Is that possible with Async I/O?
> >
> > The documentation[1] talks about preserving the stream order of
> > results, but it doesn't discuss the order of the async requests. I tried
> to
> > use AsyncDataStream.orderedWait, but the order of async requests seems to
> > be random – the order of calls gets shuffled even if I
> > use AsyncDataStream.orderedWait.
> >
> > If that is by design, would there be any suggestion how to work around
> > that? I was thinking of collecting all events of the same key into a
> > List, so that the async operator gets a list instead of individual
> events.
> > There are of course some downsides with using a List, so I would rather
> > have something better.
> >
> > In a nutshell my code is:
> >
> > AsyncDataStream.orderedWait(stream.keyBy(key), asyncFunction)
> >
> > The asyncFunction extends RichAsyncFunction.
> >
> > Thanks!
> >
> > [1]
> >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#order-of-results
> >
> > (Sorry if it's not appropriate to post this type of question to the dev
> > mailing list. I tried the Flink users list with no luck.)
> >


Re: Async I/O: preserve stream order for requests on key level

2023-06-20 Thread Galen Warren
Hi Juho -- I'm doing something similar. In my case, I want to execute async
requests concurrently for inputs associated with different keys but issue
them sequentially for any given key. The way I do it is to create a keyed
stream and use it as an input to an async function. In this arrangement,
all the inputs for a given key are handled by a single instance of the
async function; inside that function instance, I use a map to keep track of
any in-flight requests for a given key. When a new input comes in for a
key, if there is an existing in-flight request for that key, the future
that is constructed for the new request is constructed as [existing
request].then([new request]) so that the new one is only executed once the
in-flight request completes. The futures are constructed in such a way that
they maintain the map properly after completing.


On Mon, Jun 19, 2023 at 10:55 AM Juho Autio 
wrote:

> I need to make some slower external requests in parallel, so Async I/O
> helps greatly with that. However, I also need to make the requests in a
> certain order per key. Is that possible with Async I/O?
>
> The documentation[1] talks about preserving the stream order of
> results, but it doesn't discuss the order of the async requests. I tried to
> use AsyncDataStream.orderedWait, but the order of async requests seems to
> be random – the order of calls gets shuffled even if I
> use AsyncDataStream.orderedWait.
>
> If that is by design, would there be any suggestion how to work around
> that? I was thinking of collecting all events of the same key into a
> List, so that the async operator gets a list instead of individual events.
> There are of course some downsides with using a List, so I would rather
> have something better.
>
> In a nutshell my code is:
>
> AsyncDataStream.orderedWait(stream.keyBy(key), asyncFunction)
>
> The asyncFunction extends RichAsyncFunction.
>
> Thanks!
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#order-of-results
>
> (Sorry if it's not appropriate to post this type of question to the dev
> mailing list. I tried the Flink users list with no luck.)
>