I don't have a strong opinion on the resolution of the futures regarding
to @FinishBundle invocation. Leaving it to be unspecified does give runners
more room to implement it with their own support.

Optimization is also another great point. Fuse seems pretty complex to me
too if we need to find a way to chain the resulting future into the next
transform, or leave the async transform as a standalone stage initially?

Btw, I was counting the number of replies before we hit the portability.
Seems after 4 replies fuse finally showed up :).

Thanks,
Xinyu


On Tue, Jan 22, 2019 at 5:42 PM Kenneth Knowles <k...@google.com> wrote:

>
>
> On Tue, Jan 22, 2019, 17:23 Reuven Lax <re...@google.com wrote:
>
>>
>>
>> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu <xinyuliu...@gmail.com> wrote:
>>
>>> @Steve: it's good to see that this is going to be useful in your use
>>> cases as well. Thanks for sharing the code from Scio! I can see in your
>>> implementation that waiting for the future completion is part of the
>>> @FinishBundle. We are thinking of taking advantage of the underlying runner
>>> async support so the user-level code won't need to implement this logic,
>>> e.g. Samza has an AsyncSteamTask api that provides a callback to invoke
>>> after future completion[1], and Flink also has AsyncFunction api [2] which
>>> provides a ResultFuture similar to the API we discussed.
>>>
>>
>> Can this be done correctly? What I mean is that if the process dies, can
>> you guarantee that no data is lost? Beam currently guarantees this for
>> FinishBundle, but if you use an arbitrary async framework this might not be
>> true.
>>
>
> What a Beam runner guarantees is that *if* the bundle is committed, *then*
> finishbundle has run. So it seems just as easy to say *if* a bundle is
> committed, *then* every async result has been resolved.
>
> If the process dies the two cases should be naturally analogous.
>
> But it raises the question of whether they should be resolved prior to
> finishbundle, after, or unspecified. I lean toward unspecified.
>
> That's for a single ParDo. Where this could get complex is optimizing
> fused stages for greater asynchrony.
>
> Kenn
>
>
>>
>>> A simple use case for this is to execute a Runnable asynchronously in
>>> user's own executor. The following code illustrates Kenn's option #2, with
>>> a very simple single-thread pool being the executor:
>>>
>>> new DoFn<InputT, OutputT>() {
>>>   @ProcessElement
>>>   public void process(@Element InputT element, @Output 
>>> OutputReceiver<CompletionStage<OutputT>> outputReceiver) {
>>>     CompletableFuture<OutputT> future = CompletableFuture.supplyAsync(
>>>         () -> someOutput,
>>>         Executors.newSingleThreadExecutor());
>>>     outputReceiver.output(future);
>>>   }
>>> }
>>>
>>> The neat thing about this API is that the user can choose their own async 
>>> framework and we only expect the output to be a CompletionStage.
>>>
>>>
>>> For the implementation of bundling, can we compose a CompletableFuture from 
>>> each element in the bundle, e.g. CompletableFuture.allOf(...), and then 
>>> invoke @FinishBundle when this future is complete? Seems this might work.
>>>
>>> Thanks,
>>> Xinyu
>>>
>>>
>>> [1]
>>> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
>>>
>>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz <sniem...@apache.org>
>>> wrote:
>>>
>>>> I'd love to see something like this as well.  Also +1 to process(@Element
>>>> InputT element, @Output OutputReceiver<CompletionStage<OutputT>>).  I
>>>> don't know if there's much benefit to passing a future in, since the
>>>> framework itself could hook up the process function to complete when the
>>>> future completes.
>>>>
>>>> I feel like I've spent a bunch of time writing very similar "kick off a
>>>> future in ProcessElement, join it in FinishBundle" code, and looking around
>>>> beam itself a lot of built-in transforms do it as well.  Scio provides a
>>>> few AsyncDoFn implementations [1] but it'd be great to see this as a
>>>> first-class concept in beam itself.  Doing error handling, concurrency, etc
>>>> correctly can be tricky.
>>>>
>>>> [1]
>>>> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
>>>>
>>>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles <k...@google.com> wrote:
>>>>
>>>>> If the input is a CompletionStage<InputT> then the output should also
>>>>> be a CompletionStage<OutputT>, since all you should do is async chaining.
>>>>> We could enforce this by giving the DoFn an
>>>>> OutputReceiver(CompletionStage<OutputT>).
>>>>>
>>>>> Another possibility that might be even more robust against poor future
>>>>> use could be process(@Element InputT element, @Output
>>>>> OutputReceiver<CompletionStage<OutputT>>). In this way, the process method
>>>>> itself will be async chained, rather than counting on the user to do the
>>>>> right thing.
>>>>>
>>>>> We should see how these look in real use cases. The way that
>>>>> processing is split between @ProcessElement and @FinishBundle might
>>>>> complicate things.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu <xinyuliu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi, guys,
>>>>>>
>>>>>> As more users try out Beam running on the SamzaRunner, we got a lot
>>>>>> of asks for an asynchronous processing API. There are a few reasons for
>>>>>> these asks:
>>>>>>
>>>>>>    - The users here are experienced in asynchronous programming.
>>>>>>    With async frameworks such as Netty and ParSeq and libs like async 
>>>>>> jersey
>>>>>>    client, they are able to make remote calls efficiently and the 
>>>>>> libraries
>>>>>>    help manage the execution threads underneath. Async remote calls are 
>>>>>> very
>>>>>>    common in most of our streaming applications today.
>>>>>>    - Many jobs are running on a multi-tenancy cluster. Async
>>>>>>    processing helps for less resource usage and fast computation (less 
>>>>>> context
>>>>>>    switch).
>>>>>>
>>>>>> I asked about the async support in a previous email thread. The
>>>>>> following API was mentioned in the reply:
>>>>>>
>>>>>>   new DoFn<InputT, OutputT>() {
>>>>>>     @ProcessElement
>>>>>>     public void process(@Element CompletionStage<InputT> element,
>>>>>> ...) {
>>>>>>       element.thenApply(...)
>>>>>>     }
>>>>>>   }
>>>>>>
>>>>>> We are wondering whether there are any discussions on this API and
>>>>>> related docs. It is awesome that you guys already considered having DoFn 
>>>>>> to
>>>>>> process asynchronously. Out of curiosity, this API seems to create a
>>>>>> CompletionState out of the input element (probably using framework's
>>>>>> executor) and then allow user to chain on it. To us, it seems more
>>>>>> convenient if the DoFn output a CompletionStage<OutputT> or pass in a
>>>>>> CompletionStage<OutputT> to invoke upon completion.
>>>>>>
>>>>>> We would like to discuss further on the async API and hopefully we
>>>>>> will have a great support in Beam. Really appreciate the feedback!
>>>>>>
>>>>>> Thanks,
>>>>>> Xinyu
>>>>>>
>>>>>

Reply via email to