Thank you very much! It seems like you have a quite similar goal. However,
could you clarify: do you maintain the stream order on key level, or do you
just limit the parallel requests per key to one without caring about the
order?

I'm not 100% sure how your implementation with futures is done. If you are
able to share a code snippet that would be much appreciated!

I'm also wondering what kind of memory implication that implementation has:
would the futures be queued inside the operator without any limit? Would it
be a problem if the same key has too many records within the same time
window? But I suppose the function can be made blocking to protect against
that.

On Tue, Jun 20, 2023 at 3:34 PM Galen Warren
<ga...@cvillewarrens.com.invalid> wrote:

> Hi Juho -- I'm doing something similar. In my case, I want to execute async
> requests concurrently for inputs associated with different keys but issue
> them sequentially for any given key. The way I do it is to create a keyed
> stream and use it as an input to an async function. In this arrangement,
> all the inputs for a given key are handled by a single instance of the
> async function; inside that function instance, I use a map to keep track of
> any in-flight requests for a given key. When a new input comes in for a
> key, if there is an existing in-flight request for that key, the future
> that is constructed for the new request is constructed as [existing
> request].then([new request]) so that the new one is only executed once the
> in-flight request completes. The futures are constructed in such a way that
> they maintain the map properly after completing.
>
>
> On Mon, Jun 19, 2023 at 10:55 AM Juho Autio <juho.au...@rovio.com.invalid>
> wrote:
>
> > I need to make some slower external requests in parallel, so Async I/O
> > helps greatly with that. However, I also need to make the requests in a
> > certain order per key. Is that possible with Async I/O?
> >
> > The documentation[1] talks about preserving the stream order of
> > results, but it doesn't discuss the order of the async requests. I tried
> to
> > use AsyncDataStream.orderedWait, but the order of async requests seems to
> > be random – the order of calls gets shuffled even if I
> > use AsyncDataStream.orderedWait.
> >
> > If that is by design, would there be any suggestion how to work around
> > that? I was thinking of collecting all events of the same key into a
> > List, so that the async operator gets a list instead of individual
> events.
> > There are of course some downsides with using a List, so I would rather
> > have something better.
> >
> > In a nutshell my code is:
> >
> > AsyncDataStream.orderedWait(stream.keyBy(key), asyncFunction)
> >
> > The asyncFunction extends RichAsyncFunction.
> >
> > Thanks!
> >
> > [1]
> >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#order-of-results
> >
> > (Sorry if it's not appropriate to post this type of question to the dev
> > mailing list. I tried the Flink users list with no luck.)
> >

Reply via email to