I think your concerns are valid but i want to clarify about "first class
async APIs". Does "first class" mean that it is a well-encapsulated
abstraction? or does it mean that the user can more or less do whatever
they want? These are opposite but both valid meanings for "first class", to
me.

I would not want to encourage users to do explicit multi-threaded
programming or control parallelism. Part of the point of Beam is to gain
big data parallelism without explicit multithreading. I see asynchronous
chaining of futures (or their best-approximation in your language of
choice) as a highly disciplined way of doing asynchronous dependency-driven
computation that is nonetheless conceptually, and readably, straight-line
code. Threads are not required nor the only way to execute this code. In
fact you might often want to execute without threading for a reference
implementation to provide canonically correct results. APIs that leak
lower-level details of threads are asking for trouble.

One of our other ideas was to provide a dynamic parameter of type
ExecutorService. The SDK harness (pre-portability: the runner) would
control and observe parallelism while the user could simply register tasks.
Providing a future/promise API is even more disciplined.

Kenn

On Wed, Jan 23, 2019 at 10:35 AM Scott Wegner <sc...@apache.org> wrote:

> A related question is how to make execution observable such that a runner
> can make proper scaling decisions. Runners decide how to schedule bundles
> within and across multiple worker instances, and can use information about
> execution to make dynamic scaling decisions. First-class async APIs seem
> like they would encourage DoFn authors to implement their own
> parallelization, rather than deferring to the runner that should be more
> capable of providing the right level of parallelism.
>
> In the Dataflow worker harness, we estimate execution time to PTransform
> steps by sampling execution time on the execution thread and attributing it
> to the currently invoked method. This approach is fairly simple and
> possible because we assume that execution happens within the thread
> controlled by the runner. Some DoFn's already implement their own async
> logic and break this assumption; I would expect more if we make async built
> into the DoFn APIs.
>
> So: this isn't an argument against async APIs, but rather: does this break
> execution observability, and are there other lightweight mechanisms for
> attributing execution time of async work?
>
> On Tue, Jan 22, 2019 at 7:08 PM Kenneth Knowles <k...@google.com> wrote:
>
>> When executed over the portable APIs, it will be primarily the Java SDK
>> harness that makes all of these decisions. If we wanted runners to have
>> some insight into it we would have to add it to the Beam model protos. I
>> don't have any suggestions there, so I would leave it out of this
>> discussion until there's good ideas. We could learn a lot by trying it out
>> just in the SDK harness.
>>
>> Kenn
>>
>> On Tue, Jan 22, 2019 at 6:12 PM Xinyu Liu <xinyuliu...@gmail.com> wrote:
>>
>>> I don't have a strong opinion on the resolution of the futures regarding
>>> to @FinishBundle invocation. Leaving it to be unspecified does give runners
>>> more room to implement it with their own support.
>>>
>>> Optimization is also another great point. Fuse seems pretty complex to
>>> me too if we need to find a way to chain the resulting future into the next
>>> transform, or leave the async transform as a standalone stage initially?
>>>
>>> Btw, I was counting the number of replies before we hit the portability.
>>> Seems after 4 replies fuse finally showed up :).
>>>
>>> Thanks,
>>> Xinyu
>>>
>>>
>>> On Tue, Jan 22, 2019 at 5:42 PM Kenneth Knowles <k...@google.com> wrote:
>>>
>>>>
>>>>
>>>> On Tue, Jan 22, 2019, 17:23 Reuven Lax <re...@google.com wrote:
>>>>
>>>>>
>>>>>
>>>>> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu <xinyuliu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> @Steve: it's good to see that this is going to be useful in your use
>>>>>> cases as well. Thanks for sharing the code from Scio! I can see in your
>>>>>> implementation that waiting for the future completion is part of the
>>>>>> @FinishBundle. We are thinking of taking advantage of the underlying 
>>>>>> runner
>>>>>> async support so the user-level code won't need to implement this logic,
>>>>>> e.g. Samza has an AsyncSteamTask api that provides a callback to invoke
>>>>>> after future completion[1], and Flink also has AsyncFunction api [2] 
>>>>>> which
>>>>>> provides a ResultFuture similar to the API we discussed.
>>>>>>
>>>>>
>>>>> Can this be done correctly? What I mean is that if the process dies,
>>>>> can you guarantee that no data is lost? Beam currently guarantees this for
>>>>> FinishBundle, but if you use an arbitrary async framework this might not 
>>>>> be
>>>>> true.
>>>>>
>>>>
>>>> What a Beam runner guarantees is that *if* the bundle is committed,
>>>> *then* finishbundle has run. So it seems just as easy to say *if* a bundle
>>>> is committed, *then* every async result has been resolved.
>>>>
>>>> If the process dies the two cases should be naturally analogous.
>>>>
>>>> But it raises the question of whether they should be resolved prior to
>>>> finishbundle, after, or unspecified. I lean toward unspecified.
>>>>
>>>> That's for a single ParDo. Where this could get complex is optimizing
>>>> fused stages for greater asynchrony.
>>>>
>>>> Kenn
>>>>
>>>>
>>>>>
>>>>>> A simple use case for this is to execute a Runnable asynchronously in
>>>>>> user's own executor. The following code illustrates Kenn's option #2, 
>>>>>> with
>>>>>> a very simple single-thread pool being the executor:
>>>>>>
>>>>>> new DoFn<InputT, OutputT>() {
>>>>>>   @ProcessElement
>>>>>>   public void process(@Element InputT element, @Output 
>>>>>> OutputReceiver<CompletionStage<OutputT>> outputReceiver) {
>>>>>>     CompletableFuture<OutputT> future = CompletableFuture.supplyAsync(
>>>>>>         () -> someOutput,
>>>>>>         Executors.newSingleThreadExecutor());
>>>>>>     outputReceiver.output(future);
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>> The neat thing about this API is that the user can choose their own 
>>>>>> async framework and we only expect the output to be a CompletionStage.
>>>>>>
>>>>>>
>>>>>> For the implementation of bundling, can we compose a CompletableFuture 
>>>>>> from each element in the bundle, e.g. CompletableFuture.allOf(...), and 
>>>>>> then invoke @FinishBundle when this future is complete? Seems this might 
>>>>>> work.
>>>>>>
>>>>>> Thanks,
>>>>>> Xinyu
>>>>>>
>>>>>>
>>>>>> [1]
>>>>>> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
>>>>>> [2]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
>>>>>>
>>>>>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz <sniem...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> I'd love to see something like this as well.  Also +1 to 
>>>>>>> process(@Element
>>>>>>> InputT element, @Output OutputReceiver<CompletionStage<OutputT>>).  I
>>>>>>> don't know if there's much benefit to passing a future in, since the
>>>>>>> framework itself could hook up the process function to complete when the
>>>>>>> future completes.
>>>>>>>
>>>>>>> I feel like I've spent a bunch of time writing very similar "kick
>>>>>>> off a future in ProcessElement, join it in FinishBundle" code, and 
>>>>>>> looking
>>>>>>> around beam itself a lot of built-in transforms do it as well.  Scio
>>>>>>> provides a few AsyncDoFn implementations [1] but it'd be great to see 
>>>>>>> this
>>>>>>> as a first-class concept in beam itself.  Doing error handling,
>>>>>>> concurrency, etc correctly can be tricky.
>>>>>>>
>>>>>>> [1]
>>>>>>> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
>>>>>>>
>>>>>>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles <k...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> If the input is a CompletionStage<InputT> then the output should
>>>>>>>> also be a CompletionStage<OutputT>, since all you should do is async
>>>>>>>> chaining. We could enforce this by giving the DoFn an
>>>>>>>> OutputReceiver(CompletionStage<OutputT>).
>>>>>>>>
>>>>>>>> Another possibility that might be even more robust against poor
>>>>>>>> future use could be process(@Element InputT element, @Output
>>>>>>>> OutputReceiver<CompletionStage<OutputT>>). In this way, the process 
>>>>>>>> method
>>>>>>>> itself will be async chained, rather than counting on the user to do 
>>>>>>>> the
>>>>>>>> right thing.
>>>>>>>>
>>>>>>>> We should see how these look in real use cases. The way that
>>>>>>>> processing is split between @ProcessElement and @FinishBundle might
>>>>>>>> complicate things.
>>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu <xinyuliu...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi, guys,
>>>>>>>>>
>>>>>>>>> As more users try out Beam running on the SamzaRunner, we got a
>>>>>>>>> lot of asks for an asynchronous processing API. There are a few 
>>>>>>>>> reasons for
>>>>>>>>> these asks:
>>>>>>>>>
>>>>>>>>>    - The users here are experienced in asynchronous programming.
>>>>>>>>>    With async frameworks such as Netty and ParSeq and libs like async 
>>>>>>>>> jersey
>>>>>>>>>    client, they are able to make remote calls efficiently and the 
>>>>>>>>> libraries
>>>>>>>>>    help manage the execution threads underneath. Async remote calls 
>>>>>>>>> are very
>>>>>>>>>    common in most of our streaming applications today.
>>>>>>>>>    - Many jobs are running on a multi-tenancy cluster. Async
>>>>>>>>>    processing helps for less resource usage and fast computation 
>>>>>>>>> (less context
>>>>>>>>>    switch).
>>>>>>>>>
>>>>>>>>> I asked about the async support in a previous email thread. The
>>>>>>>>> following API was mentioned in the reply:
>>>>>>>>>
>>>>>>>>>   new DoFn<InputT, OutputT>() {
>>>>>>>>>     @ProcessElement
>>>>>>>>>     public void process(@Element CompletionStage<InputT> element,
>>>>>>>>> ...) {
>>>>>>>>>       element.thenApply(...)
>>>>>>>>>     }
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>> We are wondering whether there are any discussions on this API and
>>>>>>>>> related docs. It is awesome that you guys already considered having 
>>>>>>>>> DoFn to
>>>>>>>>> process asynchronously. Out of curiosity, this API seems to create a
>>>>>>>>> CompletionState out of the input element (probably using framework's
>>>>>>>>> executor) and then allow user to chain on it. To us, it seems more
>>>>>>>>> convenient if the DoFn output a CompletionStage<OutputT> or pass in a
>>>>>>>>> CompletionStage<OutputT> to invoke upon completion.
>>>>>>>>>
>>>>>>>>> We would like to discuss further on the async API and hopefully we
>>>>>>>>> will have a great support in Beam. Really appreciate the feedback!
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Xinyu
>>>>>>>>>
>>>>>>>>
>
> --
>
>
>
>
> Got feedback? tinyurl.com/swegner-feedback
>

Reply via email to