I'd love to see something like this as well.  Also +1 to process(@Element
InputT element, @Output OutputReceiver<CompletionStage<OutputT>>).  I don't
know if there's much benefit to passing a future in, since the framework
itself could hook up the process function to complete when the future
completes.

I feel like I've spent a bunch of time writing very similar "kick off a
future in ProcessElement, join it in FinishBundle" code, and looking around
beam itself a lot of built-in transforms do it as well.  Scio provides a
few AsyncDoFn implementations [1] but it'd be great to see this as a
first-class concept in beam itself.  Doing error handling, concurrency, etc
correctly can be tricky.

[1]
https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java

On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles <k...@google.com> wrote:

> If the input is a CompletionStage<InputT> then the output should also be a
> CompletionStage<OutputT>, since all you should do is async chaining. We
> could enforce this by giving the DoFn an
> OutputReceiver(CompletionStage<OutputT>).
>
> Another possibility that might be even more robust against poor future use
> could be process(@Element InputT element, @Output
> OutputReceiver<CompletionStage<OutputT>>). In this way, the process method
> itself will be async chained, rather than counting on the user to do the
> right thing.
>
> We should see how these look in real use cases. The way that processing is
> split between @ProcessElement and @FinishBundle might complicate things.
>
> Kenn
>
> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu <xinyuliu...@gmail.com> wrote:
>
>> Hi, guys,
>>
>> As more users try out Beam running on the SamzaRunner, we got a lot of
>> asks for an asynchronous processing API. There are a few reasons for these
>> asks:
>>
>>    - The users here are experienced in asynchronous programming. With
>>    async frameworks such as Netty and ParSeq and libs like async jersey
>>    client, they are able to make remote calls efficiently and the libraries
>>    help manage the execution threads underneath. Async remote calls are very
>>    common in most of our streaming applications today.
>>    - Many jobs are running on a multi-tenancy cluster. Async processing
>>    helps for less resource usage and fast computation (less context switch).
>>
>> I asked about the async support in a previous email thread. The following
>> API was mentioned in the reply:
>>
>>   new DoFn<InputT, OutputT>() {
>>     @ProcessElement
>>     public void process(@Element CompletionStage<InputT> element, ...) {
>>       element.thenApply(...)
>>     }
>>   }
>>
>> We are wondering whether there are any discussions on this API and
>> related docs. It is awesome that you guys already considered having DoFn to
>> process asynchronously. Out of curiosity, this API seems to create a
>> CompletionState out of the input element (probably using framework's
>> executor) and then allow user to chain on it. To us, it seems more
>> convenient if the DoFn output a CompletionStage<OutputT> or pass in a
>> CompletionStage<OutputT> to invoke upon completion.
>>
>> We would like to discuss further on the async API and hopefully we will
>> have a great support in Beam. Really appreciate the feedback!
>>
>> Thanks,
>> Xinyu
>>
>

Reply via email to