I think your concerns are valid but i want to clarify about "first class async APIs". Does "first class" mean that it is a well-encapsulated abstraction? or does it mean that the user can more or less do whatever they want? These are opposite but both valid meanings for "first class", to me.
I would not want to encourage users to do explicit multi-threaded programming or control parallelism. Part of the point of Beam is to gain big data parallelism without explicit multithreading. I see asynchronous chaining of futures (or their best-approximation in your language of choice) as a highly disciplined way of doing asynchronous dependency-driven computation that is nonetheless conceptually, and readably, straight-line code. Threads are not required nor the only way to execute this code. In fact you might often want to execute without threading for a reference implementation to provide canonically correct results. APIs that leak lower-level details of threads are asking for trouble. One of our other ideas was to provide a dynamic parameter of type ExecutorService. The SDK harness (pre-portability: the runner) would control and observe parallelism while the user could simply register tasks. Providing a future/promise API is even more disciplined. Kenn On Wed, Jan 23, 2019 at 10:35 AM Scott Wegner <sc...@apache.org> wrote: > A related question is how to make execution observable such that a runner > can make proper scaling decisions. Runners decide how to schedule bundles > within and across multiple worker instances, and can use information about > execution to make dynamic scaling decisions. First-class async APIs seem > like they would encourage DoFn authors to implement their own > parallelization, rather than deferring to the runner that should be more > capable of providing the right level of parallelism. > > In the Dataflow worker harness, we estimate execution time to PTransform > steps by sampling execution time on the execution thread and attributing it > to the currently invoked method. This approach is fairly simple and > possible because we assume that execution happens within the thread > controlled by the runner. Some DoFn's already implement their own async > logic and break this assumption; I would expect more if we make async built > into the DoFn APIs. > > So: this isn't an argument against async APIs, but rather: does this break > execution observability, and are there other lightweight mechanisms for > attributing execution time of async work? > > On Tue, Jan 22, 2019 at 7:08 PM Kenneth Knowles <k...@google.com> wrote: > >> When executed over the portable APIs, it will be primarily the Java SDK >> harness that makes all of these decisions. If we wanted runners to have >> some insight into it we would have to add it to the Beam model protos. I >> don't have any suggestions there, so I would leave it out of this >> discussion until there's good ideas. We could learn a lot by trying it out >> just in the SDK harness. >> >> Kenn >> >> On Tue, Jan 22, 2019 at 6:12 PM Xinyu Liu <xinyuliu...@gmail.com> wrote: >> >>> I don't have a strong opinion on the resolution of the futures regarding >>> to @FinishBundle invocation. Leaving it to be unspecified does give runners >>> more room to implement it with their own support. >>> >>> Optimization is also another great point. Fuse seems pretty complex to >>> me too if we need to find a way to chain the resulting future into the next >>> transform, or leave the async transform as a standalone stage initially? >>> >>> Btw, I was counting the number of replies before we hit the portability. >>> Seems after 4 replies fuse finally showed up :). >>> >>> Thanks, >>> Xinyu >>> >>> >>> On Tue, Jan 22, 2019 at 5:42 PM Kenneth Knowles <k...@google.com> wrote: >>> >>>> >>>> >>>> On Tue, Jan 22, 2019, 17:23 Reuven Lax <re...@google.com wrote: >>>> >>>>> >>>>> >>>>> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu <xinyuliu...@gmail.com> >>>>> wrote: >>>>> >>>>>> @Steve: it's good to see that this is going to be useful in your use >>>>>> cases as well. Thanks for sharing the code from Scio! I can see in your >>>>>> implementation that waiting for the future completion is part of the >>>>>> @FinishBundle. We are thinking of taking advantage of the underlying >>>>>> runner >>>>>> async support so the user-level code won't need to implement this logic, >>>>>> e.g. Samza has an AsyncSteamTask api that provides a callback to invoke >>>>>> after future completion[1], and Flink also has AsyncFunction api [2] >>>>>> which >>>>>> provides a ResultFuture similar to the API we discussed. >>>>>> >>>>> >>>>> Can this be done correctly? What I mean is that if the process dies, >>>>> can you guarantee that no data is lost? Beam currently guarantees this for >>>>> FinishBundle, but if you use an arbitrary async framework this might not >>>>> be >>>>> true. >>>>> >>>> >>>> What a Beam runner guarantees is that *if* the bundle is committed, >>>> *then* finishbundle has run. So it seems just as easy to say *if* a bundle >>>> is committed, *then* every async result has been resolved. >>>> >>>> If the process dies the two cases should be naturally analogous. >>>> >>>> But it raises the question of whether they should be resolved prior to >>>> finishbundle, after, or unspecified. I lean toward unspecified. >>>> >>>> That's for a single ParDo. Where this could get complex is optimizing >>>> fused stages for greater asynchrony. >>>> >>>> Kenn >>>> >>>> >>>>> >>>>>> A simple use case for this is to execute a Runnable asynchronously in >>>>>> user's own executor. The following code illustrates Kenn's option #2, >>>>>> with >>>>>> a very simple single-thread pool being the executor: >>>>>> >>>>>> new DoFn<InputT, OutputT>() { >>>>>> @ProcessElement >>>>>> public void process(@Element InputT element, @Output >>>>>> OutputReceiver<CompletionStage<OutputT>> outputReceiver) { >>>>>> CompletableFuture<OutputT> future = CompletableFuture.supplyAsync( >>>>>> () -> someOutput, >>>>>> Executors.newSingleThreadExecutor()); >>>>>> outputReceiver.output(future); >>>>>> } >>>>>> } >>>>>> >>>>>> The neat thing about this API is that the user can choose their own >>>>>> async framework and we only expect the output to be a CompletionStage. >>>>>> >>>>>> >>>>>> For the implementation of bundling, can we compose a CompletableFuture >>>>>> from each element in the bundle, e.g. CompletableFuture.allOf(...), and >>>>>> then invoke @FinishBundle when this future is complete? Seems this might >>>>>> work. >>>>>> >>>>>> Thanks, >>>>>> Xinyu >>>>>> >>>>>> >>>>>> [1] >>>>>> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html >>>>>> [2] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html >>>>>> >>>>>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz <sniem...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> I'd love to see something like this as well. Also +1 to >>>>>>> process(@Element >>>>>>> InputT element, @Output OutputReceiver<CompletionStage<OutputT>>). I >>>>>>> don't know if there's much benefit to passing a future in, since the >>>>>>> framework itself could hook up the process function to complete when the >>>>>>> future completes. >>>>>>> >>>>>>> I feel like I've spent a bunch of time writing very similar "kick >>>>>>> off a future in ProcessElement, join it in FinishBundle" code, and >>>>>>> looking >>>>>>> around beam itself a lot of built-in transforms do it as well. Scio >>>>>>> provides a few AsyncDoFn implementations [1] but it'd be great to see >>>>>>> this >>>>>>> as a first-class concept in beam itself. Doing error handling, >>>>>>> concurrency, etc correctly can be tricky. >>>>>>> >>>>>>> [1] >>>>>>> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java >>>>>>> >>>>>>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles <k...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> If the input is a CompletionStage<InputT> then the output should >>>>>>>> also be a CompletionStage<OutputT>, since all you should do is async >>>>>>>> chaining. We could enforce this by giving the DoFn an >>>>>>>> OutputReceiver(CompletionStage<OutputT>). >>>>>>>> >>>>>>>> Another possibility that might be even more robust against poor >>>>>>>> future use could be process(@Element InputT element, @Output >>>>>>>> OutputReceiver<CompletionStage<OutputT>>). In this way, the process >>>>>>>> method >>>>>>>> itself will be async chained, rather than counting on the user to do >>>>>>>> the >>>>>>>> right thing. >>>>>>>> >>>>>>>> We should see how these look in real use cases. The way that >>>>>>>> processing is split between @ProcessElement and @FinishBundle might >>>>>>>> complicate things. >>>>>>>> >>>>>>>> Kenn >>>>>>>> >>>>>>>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu <xinyuliu...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, guys, >>>>>>>>> >>>>>>>>> As more users try out Beam running on the SamzaRunner, we got a >>>>>>>>> lot of asks for an asynchronous processing API. There are a few >>>>>>>>> reasons for >>>>>>>>> these asks: >>>>>>>>> >>>>>>>>> - The users here are experienced in asynchronous programming. >>>>>>>>> With async frameworks such as Netty and ParSeq and libs like async >>>>>>>>> jersey >>>>>>>>> client, they are able to make remote calls efficiently and the >>>>>>>>> libraries >>>>>>>>> help manage the execution threads underneath. Async remote calls >>>>>>>>> are very >>>>>>>>> common in most of our streaming applications today. >>>>>>>>> - Many jobs are running on a multi-tenancy cluster. Async >>>>>>>>> processing helps for less resource usage and fast computation >>>>>>>>> (less context >>>>>>>>> switch). >>>>>>>>> >>>>>>>>> I asked about the async support in a previous email thread. The >>>>>>>>> following API was mentioned in the reply: >>>>>>>>> >>>>>>>>> new DoFn<InputT, OutputT>() { >>>>>>>>> @ProcessElement >>>>>>>>> public void process(@Element CompletionStage<InputT> element, >>>>>>>>> ...) { >>>>>>>>> element.thenApply(...) >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> We are wondering whether there are any discussions on this API and >>>>>>>>> related docs. It is awesome that you guys already considered having >>>>>>>>> DoFn to >>>>>>>>> process asynchronously. Out of curiosity, this API seems to create a >>>>>>>>> CompletionState out of the input element (probably using framework's >>>>>>>>> executor) and then allow user to chain on it. To us, it seems more >>>>>>>>> convenient if the DoFn output a CompletionStage<OutputT> or pass in a >>>>>>>>> CompletionStage<OutputT> to invoke upon completion. >>>>>>>>> >>>>>>>>> We would like to discuss further on the async API and hopefully we >>>>>>>>> will have a great support in Beam. Really appreciate the feedback! >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Xinyu >>>>>>>>> >>>>>>>> > > -- > > > > > Got feedback? tinyurl.com/swegner-feedback >