@Steve: it's good to see that this is going to be useful in your use cases
as well. Thanks for sharing the code from Scio! I can see in your
implementation that waiting for the future completion is part of the
@FinishBundle. We are thinking of taking advantage of the underlying runner
async support so the user-level code won't need to implement this logic,
e.g. Samza has an AsyncSteamTask api that provides a callback to invoke
after future completion[1], and Flink also has AsyncFunction api [2] which
provides a ResultFuture similar to the API we discussed.
A simple use case for this is to execute a Runnable asynchronously in
user's own executor. The following code illustrates Kenn's option #2, with
a very simple single-thread pool being the executor:
new DoFn<InputT, OutputT>() {
@ProcessElement
public void process(@Element InputT element, @Output
OutputReceiver<CompletionStage<OutputT>> outputReceiver) {
CompletableFuture<OutputT> future = CompletableFuture.supplyAsync(
() -> someOutput,
Executors.newSingleThreadExecutor());
outputReceiver.output(future);
}
}
The neat thing about this API is that the user can choose their own
async framework and we only expect the output to be a CompletionStage.
For the implementation of bundling, can we compose a CompletableFuture
from each element in the bundle, e.g. CompletableFuture.allOf(...),
and then invoke @FinishBundle when this future is complete? Seems this
might work.
Thanks,
Xinyu
[1]
https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz <[email protected]> wrote:
> I'd love to see something like this as well. Also +1 to process(@Element
> InputT element, @Output OutputReceiver<CompletionStage<OutputT>>). I
> don't know if there's much benefit to passing a future in, since the
> framework itself could hook up the process function to complete when the
> future completes.
>
> I feel like I've spent a bunch of time writing very similar "kick off a
> future in ProcessElement, join it in FinishBundle" code, and looking around
> beam itself a lot of built-in transforms do it as well. Scio provides a
> few AsyncDoFn implementations [1] but it'd be great to see this as a
> first-class concept in beam itself. Doing error handling, concurrency, etc
> correctly can be tricky.
>
> [1]
> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
>
> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles <[email protected]> wrote:
>
>> If the input is a CompletionStage<InputT> then the output should also be
>> a CompletionStage<OutputT>, since all you should do is async chaining. We
>> could enforce this by giving the DoFn an
>> OutputReceiver(CompletionStage<OutputT>).
>>
>> Another possibility that might be even more robust against poor future
>> use could be process(@Element InputT element, @Output
>> OutputReceiver<CompletionStage<OutputT>>). In this way, the process method
>> itself will be async chained, rather than counting on the user to do the
>> right thing.
>>
>> We should see how these look in real use cases. The way that processing
>> is split between @ProcessElement and @FinishBundle might complicate things.
>>
>> Kenn
>>
>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu <[email protected]> wrote:
>>
>>> Hi, guys,
>>>
>>> As more users try out Beam running on the SamzaRunner, we got a lot of
>>> asks for an asynchronous processing API. There are a few reasons for these
>>> asks:
>>>
>>> - The users here are experienced in asynchronous programming. With
>>> async frameworks such as Netty and ParSeq and libs like async jersey
>>> client, they are able to make remote calls efficiently and the libraries
>>> help manage the execution threads underneath. Async remote calls are very
>>> common in most of our streaming applications today.
>>> - Many jobs are running on a multi-tenancy cluster. Async processing
>>> helps for less resource usage and fast computation (less context switch).
>>>
>>> I asked about the async support in a previous email thread. The
>>> following API was mentioned in the reply:
>>>
>>> new DoFn<InputT, OutputT>() {
>>> @ProcessElement
>>> public void process(@Element CompletionStage<InputT> element, ...) {
>>> element.thenApply(...)
>>> }
>>> }
>>>
>>> We are wondering whether there are any discussions on this API and
>>> related docs. It is awesome that you guys already considered having DoFn to
>>> process asynchronously. Out of curiosity, this API seems to create a
>>> CompletionState out of the input element (probably using framework's
>>> executor) and then allow user to chain on it. To us, it seems more
>>> convenient if the DoFn output a CompletionStage<OutputT> or pass in a
>>> CompletionStage<OutputT> to invoke upon completion.
>>>
>>> We would like to discuss further on the async API and hopefully we will
>>> have a great support in Beam. Really appreciate the feedback!
>>>
>>> Thanks,
>>> Xinyu
>>>
>>