I'd love to see something like this as well. Also +1 to process(@Element InputT element, @Output OutputReceiver<CompletionStage<OutputT>>). I don't know if there's much benefit to passing a future in, since the framework itself could hook up the process function to complete when the future completes.
I feel like I've spent a bunch of time writing very similar "kick off a future in ProcessElement, join it in FinishBundle" code, and looking around beam itself a lot of built-in transforms do it as well. Scio provides a few AsyncDoFn implementations [1] but it'd be great to see this as a first-class concept in beam itself. Doing error handling, concurrency, etc correctly can be tricky. [1] https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles <k...@google.com> wrote: > If the input is a CompletionStage<InputT> then the output should also be a > CompletionStage<OutputT>, since all you should do is async chaining. We > could enforce this by giving the DoFn an > OutputReceiver(CompletionStage<OutputT>). > > Another possibility that might be even more robust against poor future use > could be process(@Element InputT element, @Output > OutputReceiver<CompletionStage<OutputT>>). In this way, the process method > itself will be async chained, rather than counting on the user to do the > right thing. > > We should see how these look in real use cases. The way that processing is > split between @ProcessElement and @FinishBundle might complicate things. > > Kenn > > On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu <xinyuliu...@gmail.com> wrote: > >> Hi, guys, >> >> As more users try out Beam running on the SamzaRunner, we got a lot of >> asks for an asynchronous processing API. There are a few reasons for these >> asks: >> >> - The users here are experienced in asynchronous programming. With >> async frameworks such as Netty and ParSeq and libs like async jersey >> client, they are able to make remote calls efficiently and the libraries >> help manage the execution threads underneath. Async remote calls are very >> common in most of our streaming applications today. >> - Many jobs are running on a multi-tenancy cluster. Async processing >> helps for less resource usage and fast computation (less context switch). >> >> I asked about the async support in a previous email thread. The following >> API was mentioned in the reply: >> >> new DoFn<InputT, OutputT>() { >> @ProcessElement >> public void process(@Element CompletionStage<InputT> element, ...) { >> element.thenApply(...) >> } >> } >> >> We are wondering whether there are any discussions on this API and >> related docs. It is awesome that you guys already considered having DoFn to >> process asynchronously. Out of curiosity, this API seems to create a >> CompletionState out of the input element (probably using framework's >> executor) and then allow user to chain on it. To us, it seems more >> convenient if the DoFn output a CompletionStage<OutputT> or pass in a >> CompletionStage<OutputT> to invoke upon completion. >> >> We would like to discuss further on the async API and hopefully we will >> have a great support in Beam. Really appreciate the feedback! >> >> Thanks, >> Xinyu >> >