>
> I'd love to see something like this as well.  Also +1 to process(@Element
> InputT element, @Output OutputReceiver<CompletionStage<OutputT>>). I
> don't know if there's much benefit to passing a future in, since the
> framework itself could hook up the process function to complete when the
> future completes.
>

One benefit we get by wrapping the input with CompletionStage is to
mandate[1] users to chain their processing logic to the input future;
thereby, ensuring asynchrony for the most part. However, it is still
possible for users to go out of their way and write blocking code.

Although, I am not sure how counter intuitive it is for the runners to wrap
the input element into a future before passing it to the user code.

Bharath

[1] CompletionStage interface does not define methods for initially
creating, forcibly completing normally or exceptionally, probing completion
status or results, or awaiting completion of a stage. Implementations of
CompletionStage may provide means of achieving such effects, as appropriate


On Wed, Jan 23, 2019 at 11:31 AM Kenneth Knowles <k...@apache.org> wrote:

> I think your concerns are valid but i want to clarify about "first class
> async APIs". Does "first class" mean that it is a well-encapsulated
> abstraction? or does it mean that the user can more or less do whatever
> they want? These are opposite but both valid meanings for "first class", to
> me.
>
> I would not want to encourage users to do explicit multi-threaded
> programming or control parallelism. Part of the point of Beam is to gain
> big data parallelism without explicit multithreading. I see asynchronous
> chaining of futures (or their best-approximation in your language of
> choice) as a highly disciplined way of doing asynchronous dependency-driven
> computation that is nonetheless conceptually, and readably, straight-line
> code. Threads are not required nor the only way to execute this code. In
> fact you might often want to execute without threading for a reference
> implementation to provide canonically correct results. APIs that leak
> lower-level details of threads are asking for trouble.
>
> One of our other ideas was to provide a dynamic parameter of type
> ExecutorService. The SDK harness (pre-portability: the runner) would
> control and observe parallelism while the user could simply register tasks.
> Providing a future/promise API is even more disciplined.
>
> Kenn
>
> On Wed, Jan 23, 2019 at 10:35 AM Scott Wegner <sc...@apache.org> wrote:
>
>> A related question is how to make execution observable such that a runner
>> can make proper scaling decisions. Runners decide how to schedule bundles
>> within and across multiple worker instances, and can use information about
>> execution to make dynamic scaling decisions. First-class async APIs seem
>> like they would encourage DoFn authors to implement their own
>> parallelization, rather than deferring to the runner that should be more
>> capable of providing the right level of parallelism.
>>
>> In the Dataflow worker harness, we estimate execution time to PTransform
>> steps by sampling execution time on the execution thread and attributing it
>> to the currently invoked method. This approach is fairly simple and
>> possible because we assume that execution happens within the thread
>> controlled by the runner. Some DoFn's already implement their own async
>> logic and break this assumption; I would expect more if we make async built
>> into the DoFn APIs.
>>
>> So: this isn't an argument against async APIs, but rather: does this
>> break execution observability, and are there other lightweight mechanisms
>> for attributing execution time of async work?
>>
>> On Tue, Jan 22, 2019 at 7:08 PM Kenneth Knowles <k...@google.com> wrote:
>>
>>> When executed over the portable APIs, it will be primarily the Java SDK
>>> harness that makes all of these decisions. If we wanted runners to have
>>> some insight into it we would have to add it to the Beam model protos. I
>>> don't have any suggestions there, so I would leave it out of this
>>> discussion until there's good ideas. We could learn a lot by trying it out
>>> just in the SDK harness.
>>>
>>> Kenn
>>>
>>> On Tue, Jan 22, 2019 at 6:12 PM Xinyu Liu <xinyuliu...@gmail.com> wrote:
>>>
>>>> I don't have a strong opinion on the resolution of the futures
>>>> regarding to @FinishBundle invocation. Leaving it to be unspecified does
>>>> give runners more room to implement it with their own support.
>>>>
>>>> Optimization is also another great point. Fuse seems pretty complex to
>>>> me too if we need to find a way to chain the resulting future into the next
>>>> transform, or leave the async transform as a standalone stage initially?
>>>>
>>>> Btw, I was counting the number of replies before we hit the
>>>> portability. Seems after 4 replies fuse finally showed up :).
>>>>
>>>> Thanks,
>>>> Xinyu
>>>>
>>>>
>>>> On Tue, Jan 22, 2019 at 5:42 PM Kenneth Knowles <k...@google.com> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Tue, Jan 22, 2019, 17:23 Reuven Lax <re...@google.com wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu <xinyuliu...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> @Steve: it's good to see that this is going to be useful in your use
>>>>>>> cases as well. Thanks for sharing the code from Scio! I can see in your
>>>>>>> implementation that waiting for the future completion is part of the
>>>>>>> @FinishBundle. We are thinking of taking advantage of the underlying 
>>>>>>> runner
>>>>>>> async support so the user-level code won't need to implement this logic,
>>>>>>> e.g. Samza has an AsyncSteamTask api that provides a callback to invoke
>>>>>>> after future completion[1], and Flink also has AsyncFunction api [2] 
>>>>>>> which
>>>>>>> provides a ResultFuture similar to the API we discussed.
>>>>>>>
>>>>>>
>>>>>> Can this be done correctly? What I mean is that if the process dies,
>>>>>> can you guarantee that no data is lost? Beam currently guarantees this 
>>>>>> for
>>>>>> FinishBundle, but if you use an arbitrary async framework this might not 
>>>>>> be
>>>>>> true.
>>>>>>
>>>>>
>>>>> What a Beam runner guarantees is that *if* the bundle is committed,
>>>>> *then* finishbundle has run. So it seems just as easy to say *if* a bundle
>>>>> is committed, *then* every async result has been resolved.
>>>>>
>>>>> If the process dies the two cases should be naturally analogous.
>>>>>
>>>>> But it raises the question of whether they should be resolved prior to
>>>>> finishbundle, after, or unspecified. I lean toward unspecified.
>>>>>
>>>>> That's for a single ParDo. Where this could get complex is optimizing
>>>>> fused stages for greater asynchrony.
>>>>>
>>>>> Kenn
>>>>>
>>>>>
>>>>>>
>>>>>>> A simple use case for this is to execute a Runnable asynchronously
>>>>>>> in user's own executor. The following code illustrates Kenn's option #2,
>>>>>>> with a very simple single-thread pool being the executor:
>>>>>>>
>>>>>>> new DoFn<InputT, OutputT>() {
>>>>>>>   @ProcessElement
>>>>>>>   public void process(@Element InputT element, @Output 
>>>>>>> OutputReceiver<CompletionStage<OutputT>> outputReceiver) {
>>>>>>>     CompletableFuture<OutputT> future = CompletableFuture.supplyAsync(
>>>>>>>         () -> someOutput,
>>>>>>>         Executors.newSingleThreadExecutor());
>>>>>>>     outputReceiver.output(future);
>>>>>>>   }
>>>>>>> }
>>>>>>>
>>>>>>> The neat thing about this API is that the user can choose their own 
>>>>>>> async framework and we only expect the output to be a CompletionStage.
>>>>>>>
>>>>>>>
>>>>>>> For the implementation of bundling, can we compose a CompletableFuture 
>>>>>>> from each element in the bundle, e.g. CompletableFuture.allOf(...), and 
>>>>>>> then invoke @FinishBundle when this future is complete? Seems this 
>>>>>>> might work.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Xinyu
>>>>>>>
>>>>>>>
>>>>>>> [1]
>>>>>>> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
>>>>>>> [2]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
>>>>>>>
>>>>>>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz <sniem...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I'd love to see something like this as well.  Also +1 to 
>>>>>>>> process(@Element
>>>>>>>> InputT element, @Output OutputReceiver<CompletionStage<OutputT>>).  I
>>>>>>>> don't know if there's much benefit to passing a future in, since the
>>>>>>>> framework itself could hook up the process function to complete when 
>>>>>>>> the
>>>>>>>> future completes.
>>>>>>>>
>>>>>>>> I feel like I've spent a bunch of time writing very similar "kick
>>>>>>>> off a future in ProcessElement, join it in FinishBundle" code, and 
>>>>>>>> looking
>>>>>>>> around beam itself a lot of built-in transforms do it as well.  Scio
>>>>>>>> provides a few AsyncDoFn implementations [1] but it'd be great to see 
>>>>>>>> this
>>>>>>>> as a first-class concept in beam itself.  Doing error handling,
>>>>>>>> concurrency, etc correctly can be tricky.
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
>>>>>>>>
>>>>>>>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles <k...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> If the input is a CompletionStage<InputT> then the output should
>>>>>>>>> also be a CompletionStage<OutputT>, since all you should do is async
>>>>>>>>> chaining. We could enforce this by giving the DoFn an
>>>>>>>>> OutputReceiver(CompletionStage<OutputT>).
>>>>>>>>>
>>>>>>>>> Another possibility that might be even more robust against poor
>>>>>>>>> future use could be process(@Element InputT element, @Output
>>>>>>>>> OutputReceiver<CompletionStage<OutputT>>). In this way, the process 
>>>>>>>>> method
>>>>>>>>> itself will be async chained, rather than counting on the user to do 
>>>>>>>>> the
>>>>>>>>> right thing.
>>>>>>>>>
>>>>>>>>> We should see how these look in real use cases. The way that
>>>>>>>>> processing is split between @ProcessElement and @FinishBundle might
>>>>>>>>> complicate things.
>>>>>>>>>
>>>>>>>>> Kenn
>>>>>>>>>
>>>>>>>>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu <xinyuliu...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi, guys,
>>>>>>>>>>
>>>>>>>>>> As more users try out Beam running on the SamzaRunner, we got a
>>>>>>>>>> lot of asks for an asynchronous processing API. There are a few 
>>>>>>>>>> reasons for
>>>>>>>>>> these asks:
>>>>>>>>>>
>>>>>>>>>>    - The users here are experienced in asynchronous programming.
>>>>>>>>>>    With async frameworks such as Netty and ParSeq and libs like 
>>>>>>>>>> async jersey
>>>>>>>>>>    client, they are able to make remote calls efficiently and the 
>>>>>>>>>> libraries
>>>>>>>>>>    help manage the execution threads underneath. Async remote calls 
>>>>>>>>>> are very
>>>>>>>>>>    common in most of our streaming applications today.
>>>>>>>>>>    - Many jobs are running on a multi-tenancy cluster. Async
>>>>>>>>>>    processing helps for less resource usage and fast computation 
>>>>>>>>>> (less context
>>>>>>>>>>    switch).
>>>>>>>>>>
>>>>>>>>>> I asked about the async support in a previous email thread. The
>>>>>>>>>> following API was mentioned in the reply:
>>>>>>>>>>
>>>>>>>>>>   new DoFn<InputT, OutputT>() {
>>>>>>>>>>     @ProcessElement
>>>>>>>>>>     public void process(@Element CompletionStage<InputT> element,
>>>>>>>>>> ...) {
>>>>>>>>>>       element.thenApply(...)
>>>>>>>>>>     }
>>>>>>>>>>   }
>>>>>>>>>>
>>>>>>>>>> We are wondering whether there are any discussions on this API
>>>>>>>>>> and related docs. It is awesome that you guys already considered 
>>>>>>>>>> having
>>>>>>>>>> DoFn to process asynchronously. Out of curiosity, this API seems to 
>>>>>>>>>> create
>>>>>>>>>> a CompletionState out of the input element (probably using 
>>>>>>>>>> framework's
>>>>>>>>>> executor) and then allow user to chain on it. To us, it seems more
>>>>>>>>>> convenient if the DoFn output a CompletionStage<OutputT> or pass in a
>>>>>>>>>> CompletionStage<OutputT> to invoke upon completion.
>>>>>>>>>>
>>>>>>>>>> We would like to discuss further on the async API and hopefully
>>>>>>>>>> we will have a great support in Beam. Really appreciate the feedback!
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Xinyu
>>>>>>>>>>
>>>>>>>>>
>>
>> --
>>
>>
>>
>>
>> Got feedback? tinyurl.com/swegner-feedback
>>
>

Reply via email to