> > I'd love to see something like this as well. Also +1 to process(@Element > InputT element, @Output OutputReceiver<CompletionStage<OutputT>>). I > don't know if there's much benefit to passing a future in, since the > framework itself could hook up the process function to complete when the > future completes. >
One benefit we get by wrapping the input with CompletionStage is to mandate[1] users to chain their processing logic to the input future; thereby, ensuring asynchrony for the most part. However, it is still possible for users to go out of their way and write blocking code. Although, I am not sure how counter intuitive it is for the runners to wrap the input element into a future before passing it to the user code. Bharath [1] CompletionStage interface does not define methods for initially creating, forcibly completing normally or exceptionally, probing completion status or results, or awaiting completion of a stage. Implementations of CompletionStage may provide means of achieving such effects, as appropriate On Wed, Jan 23, 2019 at 11:31 AM Kenneth Knowles <k...@apache.org> wrote: > I think your concerns are valid but i want to clarify about "first class > async APIs". Does "first class" mean that it is a well-encapsulated > abstraction? or does it mean that the user can more or less do whatever > they want? These are opposite but both valid meanings for "first class", to > me. > > I would not want to encourage users to do explicit multi-threaded > programming or control parallelism. Part of the point of Beam is to gain > big data parallelism without explicit multithreading. I see asynchronous > chaining of futures (or their best-approximation in your language of > choice) as a highly disciplined way of doing asynchronous dependency-driven > computation that is nonetheless conceptually, and readably, straight-line > code. Threads are not required nor the only way to execute this code. In > fact you might often want to execute without threading for a reference > implementation to provide canonically correct results. APIs that leak > lower-level details of threads are asking for trouble. > > One of our other ideas was to provide a dynamic parameter of type > ExecutorService. The SDK harness (pre-portability: the runner) would > control and observe parallelism while the user could simply register tasks. > Providing a future/promise API is even more disciplined. > > Kenn > > On Wed, Jan 23, 2019 at 10:35 AM Scott Wegner <sc...@apache.org> wrote: > >> A related question is how to make execution observable such that a runner >> can make proper scaling decisions. Runners decide how to schedule bundles >> within and across multiple worker instances, and can use information about >> execution to make dynamic scaling decisions. First-class async APIs seem >> like they would encourage DoFn authors to implement their own >> parallelization, rather than deferring to the runner that should be more >> capable of providing the right level of parallelism. >> >> In the Dataflow worker harness, we estimate execution time to PTransform >> steps by sampling execution time on the execution thread and attributing it >> to the currently invoked method. This approach is fairly simple and >> possible because we assume that execution happens within the thread >> controlled by the runner. Some DoFn's already implement their own async >> logic and break this assumption; I would expect more if we make async built >> into the DoFn APIs. >> >> So: this isn't an argument against async APIs, but rather: does this >> break execution observability, and are there other lightweight mechanisms >> for attributing execution time of async work? >> >> On Tue, Jan 22, 2019 at 7:08 PM Kenneth Knowles <k...@google.com> wrote: >> >>> When executed over the portable APIs, it will be primarily the Java SDK >>> harness that makes all of these decisions. If we wanted runners to have >>> some insight into it we would have to add it to the Beam model protos. I >>> don't have any suggestions there, so I would leave it out of this >>> discussion until there's good ideas. We could learn a lot by trying it out >>> just in the SDK harness. >>> >>> Kenn >>> >>> On Tue, Jan 22, 2019 at 6:12 PM Xinyu Liu <xinyuliu...@gmail.com> wrote: >>> >>>> I don't have a strong opinion on the resolution of the futures >>>> regarding to @FinishBundle invocation. Leaving it to be unspecified does >>>> give runners more room to implement it with their own support. >>>> >>>> Optimization is also another great point. Fuse seems pretty complex to >>>> me too if we need to find a way to chain the resulting future into the next >>>> transform, or leave the async transform as a standalone stage initially? >>>> >>>> Btw, I was counting the number of replies before we hit the >>>> portability. Seems after 4 replies fuse finally showed up :). >>>> >>>> Thanks, >>>> Xinyu >>>> >>>> >>>> On Tue, Jan 22, 2019 at 5:42 PM Kenneth Knowles <k...@google.com> wrote: >>>> >>>>> >>>>> >>>>> On Tue, Jan 22, 2019, 17:23 Reuven Lax <re...@google.com wrote: >>>>> >>>>>> >>>>>> >>>>>> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu <xinyuliu...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> @Steve: it's good to see that this is going to be useful in your use >>>>>>> cases as well. Thanks for sharing the code from Scio! I can see in your >>>>>>> implementation that waiting for the future completion is part of the >>>>>>> @FinishBundle. We are thinking of taking advantage of the underlying >>>>>>> runner >>>>>>> async support so the user-level code won't need to implement this logic, >>>>>>> e.g. Samza has an AsyncSteamTask api that provides a callback to invoke >>>>>>> after future completion[1], and Flink also has AsyncFunction api [2] >>>>>>> which >>>>>>> provides a ResultFuture similar to the API we discussed. >>>>>>> >>>>>> >>>>>> Can this be done correctly? What I mean is that if the process dies, >>>>>> can you guarantee that no data is lost? Beam currently guarantees this >>>>>> for >>>>>> FinishBundle, but if you use an arbitrary async framework this might not >>>>>> be >>>>>> true. >>>>>> >>>>> >>>>> What a Beam runner guarantees is that *if* the bundle is committed, >>>>> *then* finishbundle has run. So it seems just as easy to say *if* a bundle >>>>> is committed, *then* every async result has been resolved. >>>>> >>>>> If the process dies the two cases should be naturally analogous. >>>>> >>>>> But it raises the question of whether they should be resolved prior to >>>>> finishbundle, after, or unspecified. I lean toward unspecified. >>>>> >>>>> That's for a single ParDo. Where this could get complex is optimizing >>>>> fused stages for greater asynchrony. >>>>> >>>>> Kenn >>>>> >>>>> >>>>>> >>>>>>> A simple use case for this is to execute a Runnable asynchronously >>>>>>> in user's own executor. The following code illustrates Kenn's option #2, >>>>>>> with a very simple single-thread pool being the executor: >>>>>>> >>>>>>> new DoFn<InputT, OutputT>() { >>>>>>> @ProcessElement >>>>>>> public void process(@Element InputT element, @Output >>>>>>> OutputReceiver<CompletionStage<OutputT>> outputReceiver) { >>>>>>> CompletableFuture<OutputT> future = CompletableFuture.supplyAsync( >>>>>>> () -> someOutput, >>>>>>> Executors.newSingleThreadExecutor()); >>>>>>> outputReceiver.output(future); >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> The neat thing about this API is that the user can choose their own >>>>>>> async framework and we only expect the output to be a CompletionStage. >>>>>>> >>>>>>> >>>>>>> For the implementation of bundling, can we compose a CompletableFuture >>>>>>> from each element in the bundle, e.g. CompletableFuture.allOf(...), and >>>>>>> then invoke @FinishBundle when this future is complete? Seems this >>>>>>> might work. >>>>>>> >>>>>>> Thanks, >>>>>>> Xinyu >>>>>>> >>>>>>> >>>>>>> [1] >>>>>>> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html >>>>>>> [2] >>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html >>>>>>> >>>>>>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz <sniem...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> I'd love to see something like this as well. Also +1 to >>>>>>>> process(@Element >>>>>>>> InputT element, @Output OutputReceiver<CompletionStage<OutputT>>). I >>>>>>>> don't know if there's much benefit to passing a future in, since the >>>>>>>> framework itself could hook up the process function to complete when >>>>>>>> the >>>>>>>> future completes. >>>>>>>> >>>>>>>> I feel like I've spent a bunch of time writing very similar "kick >>>>>>>> off a future in ProcessElement, join it in FinishBundle" code, and >>>>>>>> looking >>>>>>>> around beam itself a lot of built-in transforms do it as well. Scio >>>>>>>> provides a few AsyncDoFn implementations [1] but it'd be great to see >>>>>>>> this >>>>>>>> as a first-class concept in beam itself. Doing error handling, >>>>>>>> concurrency, etc correctly can be tricky. >>>>>>>> >>>>>>>> [1] >>>>>>>> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java >>>>>>>> >>>>>>>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles <k...@google.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> If the input is a CompletionStage<InputT> then the output should >>>>>>>>> also be a CompletionStage<OutputT>, since all you should do is async >>>>>>>>> chaining. We could enforce this by giving the DoFn an >>>>>>>>> OutputReceiver(CompletionStage<OutputT>). >>>>>>>>> >>>>>>>>> Another possibility that might be even more robust against poor >>>>>>>>> future use could be process(@Element InputT element, @Output >>>>>>>>> OutputReceiver<CompletionStage<OutputT>>). In this way, the process >>>>>>>>> method >>>>>>>>> itself will be async chained, rather than counting on the user to do >>>>>>>>> the >>>>>>>>> right thing. >>>>>>>>> >>>>>>>>> We should see how these look in real use cases. The way that >>>>>>>>> processing is split between @ProcessElement and @FinishBundle might >>>>>>>>> complicate things. >>>>>>>>> >>>>>>>>> Kenn >>>>>>>>> >>>>>>>>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu <xinyuliu...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, guys, >>>>>>>>>> >>>>>>>>>> As more users try out Beam running on the SamzaRunner, we got a >>>>>>>>>> lot of asks for an asynchronous processing API. There are a few >>>>>>>>>> reasons for >>>>>>>>>> these asks: >>>>>>>>>> >>>>>>>>>> - The users here are experienced in asynchronous programming. >>>>>>>>>> With async frameworks such as Netty and ParSeq and libs like >>>>>>>>>> async jersey >>>>>>>>>> client, they are able to make remote calls efficiently and the >>>>>>>>>> libraries >>>>>>>>>> help manage the execution threads underneath. Async remote calls >>>>>>>>>> are very >>>>>>>>>> common in most of our streaming applications today. >>>>>>>>>> - Many jobs are running on a multi-tenancy cluster. Async >>>>>>>>>> processing helps for less resource usage and fast computation >>>>>>>>>> (less context >>>>>>>>>> switch). >>>>>>>>>> >>>>>>>>>> I asked about the async support in a previous email thread. The >>>>>>>>>> following API was mentioned in the reply: >>>>>>>>>> >>>>>>>>>> new DoFn<InputT, OutputT>() { >>>>>>>>>> @ProcessElement >>>>>>>>>> public void process(@Element CompletionStage<InputT> element, >>>>>>>>>> ...) { >>>>>>>>>> element.thenApply(...) >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> We are wondering whether there are any discussions on this API >>>>>>>>>> and related docs. It is awesome that you guys already considered >>>>>>>>>> having >>>>>>>>>> DoFn to process asynchronously. Out of curiosity, this API seems to >>>>>>>>>> create >>>>>>>>>> a CompletionState out of the input element (probably using >>>>>>>>>> framework's >>>>>>>>>> executor) and then allow user to chain on it. To us, it seems more >>>>>>>>>> convenient if the DoFn output a CompletionStage<OutputT> or pass in a >>>>>>>>>> CompletionStage<OutputT> to invoke upon completion. >>>>>>>>>> >>>>>>>>>> We would like to discuss further on the async API and hopefully >>>>>>>>>> we will have a great support in Beam. Really appreciate the feedback! >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Xinyu >>>>>>>>>> >>>>>>>>> >> >> -- >> >> >> >> >> Got feedback? tinyurl.com/swegner-feedback >> >