@Steve: it's good to see that this is going to be useful in your use cases
as well. Thanks for sharing the code from Scio! I can see in your
implementation that waiting for the future completion is part of the
@FinishBundle. We are thinking of taking advantage of the underlying runner
async support so the user-level code won't need to implement this logic,
e.g. Samza has an AsyncSteamTask api that provides a callback to invoke
after future completion[1], and Flink also has AsyncFunction api [2] which
provides a ResultFuture similar to the API we discussed.

A simple use case for this is to execute a Runnable asynchronously in
user's own executor. The following code illustrates Kenn's option #2, with
a very simple single-thread pool being the executor:

new DoFn<InputT, OutputT>() {
  @ProcessElement
  public void process(@Element InputT element, @Output
OutputReceiver<CompletionStage<OutputT>> outputReceiver) {
    CompletableFuture<OutputT> future = CompletableFuture.supplyAsync(
        () -> someOutput,
        Executors.newSingleThreadExecutor());
    outputReceiver.output(future);
  }
}

The neat thing about this API is that the user can choose their own
async framework and we only expect the output to be a CompletionStage.


For the implementation of bundling, can we compose a CompletableFuture
from each element in the bundle, e.g. CompletableFuture.allOf(...),
and then invoke @FinishBundle when this future is complete? Seems this
might work.

Thanks,
Xinyu


[1]
https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html

On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz <[email protected]> wrote:

> I'd love to see something like this as well.  Also +1 to process(@Element
> InputT element, @Output OutputReceiver<CompletionStage<OutputT>>).  I
> don't know if there's much benefit to passing a future in, since the
> framework itself could hook up the process function to complete when the
> future completes.
>
> I feel like I've spent a bunch of time writing very similar "kick off a
> future in ProcessElement, join it in FinishBundle" code, and looking around
> beam itself a lot of built-in transforms do it as well.  Scio provides a
> few AsyncDoFn implementations [1] but it'd be great to see this as a
> first-class concept in beam itself.  Doing error handling, concurrency, etc
> correctly can be tricky.
>
> [1]
> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
>
> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles <[email protected]> wrote:
>
>> If the input is a CompletionStage<InputT> then the output should also be
>> a CompletionStage<OutputT>, since all you should do is async chaining. We
>> could enforce this by giving the DoFn an
>> OutputReceiver(CompletionStage<OutputT>).
>>
>> Another possibility that might be even more robust against poor future
>> use could be process(@Element InputT element, @Output
>> OutputReceiver<CompletionStage<OutputT>>). In this way, the process method
>> itself will be async chained, rather than counting on the user to do the
>> right thing.
>>
>> We should see how these look in real use cases. The way that processing
>> is split between @ProcessElement and @FinishBundle might complicate things.
>>
>> Kenn
>>
>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu <[email protected]> wrote:
>>
>>> Hi, guys,
>>>
>>> As more users try out Beam running on the SamzaRunner, we got a lot of
>>> asks for an asynchronous processing API. There are a few reasons for these
>>> asks:
>>>
>>>    - The users here are experienced in asynchronous programming. With
>>>    async frameworks such as Netty and ParSeq and libs like async jersey
>>>    client, they are able to make remote calls efficiently and the libraries
>>>    help manage the execution threads underneath. Async remote calls are very
>>>    common in most of our streaming applications today.
>>>    - Many jobs are running on a multi-tenancy cluster. Async processing
>>>    helps for less resource usage and fast computation (less context switch).
>>>
>>> I asked about the async support in a previous email thread. The
>>> following API was mentioned in the reply:
>>>
>>>   new DoFn<InputT, OutputT>() {
>>>     @ProcessElement
>>>     public void process(@Element CompletionStage<InputT> element, ...) {
>>>       element.thenApply(...)
>>>     }
>>>   }
>>>
>>> We are wondering whether there are any discussions on this API and
>>> related docs. It is awesome that you guys already considered having DoFn to
>>> process asynchronously. Out of curiosity, this API seems to create a
>>> CompletionState out of the input element (probably using framework's
>>> executor) and then allow user to chain on it. To us, it seems more
>>> convenient if the DoFn output a CompletionStage<OutputT> or pass in a
>>> CompletionStage<OutputT> to invoke upon completion.
>>>
>>> We would like to discuss further on the async API and hopefully we will
>>> have a great support in Beam. Really appreciate the feedback!
>>>
>>> Thanks,
>>> Xinyu
>>>
>>

Reply via email to