This is not supported by the current Async I/O API. But I do think this is a very useful feature and we should support it. Just as Jingsong said this allows changelog stream can also use Async Lookup Join. The rough idea is just like a mixture of the ordered and unordered modes of async operator. Requests for the same key should be handled in the ordered mode, but requests for different keys can be handled in the unordered mode.
Best, Jark On Wed, 21 Jun 2023 at 12:18, Jingsong Li <jingsongl...@gmail.com> wrote: > +1 for this. > > Actually, this is a headache for Flink SQL too. > > There is certainly a lot of updated data (CDC changelog) in real > stream processing, The semantics here is the need to ensure the order > between keys, and different keys can be handled in disorder. > > I'm very happy that the community has a similar need, and I think it's > worth refining it in Flink. > > Best, > Jingsong > > On Tue, Jun 20, 2023 at 10:20 PM Juho Autio > <juho.au...@rovio.com.invalid> wrote: > > > > Thank you very much! It seems like you have a quite similar goal. > However, > > could you clarify: do you maintain the stream order on key level, or do > you > > just limit the parallel requests per key to one without caring about the > > order? > > > > I'm not 100% sure how your implementation with futures is done. If you > are > > able to share a code snippet that would be much appreciated! > > > > I'm also wondering what kind of memory implication that implementation > has: > > would the futures be queued inside the operator without any limit? Would > it > > be a problem if the same key has too many records within the same time > > window? But I suppose the function can be made blocking to protect > against > > that. > > > > On Tue, Jun 20, 2023 at 3:34 PM Galen Warren > > <ga...@cvillewarrens.com.invalid> wrote: > > > > > Hi Juho -- I'm doing something similar. In my case, I want to execute > async > > > requests concurrently for inputs associated with different keys but > issue > > > them sequentially for any given key. The way I do it is to create a > keyed > > > stream and use it as an input to an async function. In this > arrangement, > > > all the inputs for a given key are handled by a single instance of the > > > async function; inside that function instance, I use a map to keep > track of > > > any in-flight requests for a given key. When a new input comes in for a > > > key, if there is an existing in-flight request for that key, the future > > > that is constructed for the new request is constructed as [existing > > > request].then([new request]) so that the new one is only executed once > the > > > in-flight request completes. The futures are constructed in such a way > that > > > they maintain the map properly after completing. > > > > > > > > > On Mon, Jun 19, 2023 at 10:55 AM Juho Autio > <juho.au...@rovio.com.invalid> > > > wrote: > > > > > > > I need to make some slower external requests in parallel, so Async > I/O > > > > helps greatly with that. However, I also need to make the requests > in a > > > > certain order per key. Is that possible with Async I/O? > > > > > > > > The documentation[1] talks about preserving the stream order of > > > > results, but it doesn't discuss the order of the async requests. I > tried > > > to > > > > use AsyncDataStream.orderedWait, but the order of async requests > seems to > > > > be random – the order of calls gets shuffled even if I > > > > use AsyncDataStream.orderedWait. > > > > > > > > If that is by design, would there be any suggestion how to work > around > > > > that? I was thinking of collecting all events of the same key into a > > > > List, so that the async operator gets a list instead of individual > > > events. > > > > There are of course some downsides with using a List, so I would > rather > > > > have something better. > > > > > > > > In a nutshell my code is: > > > > > > > > AsyncDataStream.orderedWait(stream.keyBy(key), asyncFunction) > > > > > > > > The asyncFunction extends RichAsyncFunction. > > > > > > > > Thanks! > > > > > > > > [1] > > > > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#order-of-results > > > > > > > > (Sorry if it's not appropriate to post this type of question to the > dev > > > > mailing list. I tried the Flink users list with no luck.) > > > > >