If I understand correctly, the end goal is to process input elements of a DoFn asynchronously. Were I to do this naively, I would implement DoFns that simply take and receive [Serializable?]CompletionStages as element types, followed by a DoFn that adds a callback to emit on completion (possibly via a queue to avoid being-on-the-wrong-thread issues) and whose finalize forces all completions. This would, of course, interact poorly with processing time tracking, fusion breaks, watermark tracking, counter attribution, window propagation, etc. so it is desirable to make it part of the system itself.
Taking a OutputReceiver<CompletionStage<OutputT>> seems like a decent API. The invoking of the downstream process could be chained onto this, with all the implicit tracking and tracing set up correctly. Taking a CompletionStage as input means a DoFn would not have to create its output CompletionStage ex nihilo and possibly allow for better chaining (depending on the asynchronous APIs used). Even better might be to simply let the invocation of all DoFn.process() methods be asynchronous, but as Java doesn't offer an await primitive to relinquish control in the middle of a function body this might be hard. I think for correctness, completion would have to be forced at the end of each bundle. If your bundles are large enough, this may not be that big of a deal. In this case you could also start executing subsequent bundles while waiting for prior ones to complete. On Wed, Jan 23, 2019 at 11:58 PM Bharath Kumara Subramanian <codin.mart...@gmail.com> wrote: >> >> I'd love to see something like this as well. Also +1 to process(@Element >> InputT element, @Output OutputReceiver<CompletionStage<OutputT>>). I don't >> know if there's much benefit to passing a future in, since the framework >> itself could hook up the process function to complete when the future >> completes. > > > One benefit we get by wrapping the input with CompletionStage is to > mandate[1] users to chain their processing logic to the input future; > thereby, ensuring asynchrony for the most part. However, it is still possible > for users to go out of their way and write blocking code. > > Although, I am not sure how counter intuitive it is for the runners to wrap > the input element into a future before passing it to the user code. > > Bharath > > [1] CompletionStage interface does not define methods for initially creating, > forcibly completing normally or exceptionally, probing completion status or > results, or awaiting completion of a stage. Implementations of > CompletionStage may provide means of achieving such effects, as appropriate > > > On Wed, Jan 23, 2019 at 11:31 AM Kenneth Knowles <k...@apache.org> wrote: >> >> I think your concerns are valid but i want to clarify about "first class >> async APIs". Does "first class" mean that it is a well-encapsulated >> abstraction? or does it mean that the user can more or less do whatever they >> want? These are opposite but both valid meanings for "first class", to me. >> >> I would not want to encourage users to do explicit multi-threaded >> programming or control parallelism. Part of the point of Beam is to gain big >> data parallelism without explicit multithreading. I see asynchronous >> chaining of futures (or their best-approximation in your language of choice) >> as a highly disciplined way of doing asynchronous dependency-driven >> computation that is nonetheless conceptually, and readably, straight-line >> code. Threads are not required nor the only way to execute this code. In >> fact you might often want to execute without threading for a reference >> implementation to provide canonically correct results. APIs that leak >> lower-level details of threads are asking for trouble. >> >> One of our other ideas was to provide a dynamic parameter of type >> ExecutorService. The SDK harness (pre-portability: the runner) would control >> and observe parallelism while the user could simply register tasks. >> Providing a future/promise API is even more disciplined. >> >> Kenn >> >> On Wed, Jan 23, 2019 at 10:35 AM Scott Wegner <sc...@apache.org> wrote: >>> >>> A related question is how to make execution observable such that a runner >>> can make proper scaling decisions. Runners decide how to schedule bundles >>> within and across multiple worker instances, and can use information about >>> execution to make dynamic scaling decisions. First-class async APIs seem >>> like they would encourage DoFn authors to implement their own >>> parallelization, rather than deferring to the runner that should be more >>> capable of providing the right level of parallelism. >>> >>> In the Dataflow worker harness, we estimate execution time to PTransform >>> steps by sampling execution time on the execution thread and attributing it >>> to the currently invoked method. This approach is fairly simple and >>> possible because we assume that execution happens within the thread >>> controlled by the runner. Some DoFn's already implement their own async >>> logic and break this assumption; I would expect more if we make async built >>> into the DoFn APIs. >>> >>> So: this isn't an argument against async APIs, but rather: does this break >>> execution observability, and are there other lightweight mechanisms for >>> attributing execution time of async work? >>> >>> On Tue, Jan 22, 2019 at 7:08 PM Kenneth Knowles <k...@google.com> wrote: >>>> >>>> When executed over the portable APIs, it will be primarily the Java SDK >>>> harness that makes all of these decisions. If we wanted runners to have >>>> some insight into it we would have to add it to the Beam model protos. I >>>> don't have any suggestions there, so I would leave it out of this >>>> discussion until there's good ideas. We could learn a lot by trying it out >>>> just in the SDK harness. >>>> >>>> Kenn >>>> >>>> On Tue, Jan 22, 2019 at 6:12 PM Xinyu Liu <xinyuliu...@gmail.com> wrote: >>>>> >>>>> I don't have a strong opinion on the resolution of the futures regarding >>>>> to @FinishBundle invocation. Leaving it to be unspecified does give >>>>> runners more room to implement it with their own support. >>>>> >>>>> Optimization is also another great point. Fuse seems pretty complex to me >>>>> too if we need to find a way to chain the resulting future into the next >>>>> transform, or leave the async transform as a standalone stage initially? >>>>> >>>>> Btw, I was counting the number of replies before we hit the portability. >>>>> Seems after 4 replies fuse finally showed up :). >>>>> >>>>> Thanks, >>>>> Xinyu >>>>> >>>>> >>>>> On Tue, Jan 22, 2019 at 5:42 PM Kenneth Knowles <k...@google.com> wrote: >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Jan 22, 2019, 17:23 Reuven Lax <re...@google.com wrote: >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu <xinyuliu...@gmail.com> wrote: >>>>>>>> >>>>>>>> @Steve: it's good to see that this is going to be useful in your use >>>>>>>> cases as well. Thanks for sharing the code from Scio! I can see in >>>>>>>> your implementation that waiting for the future completion is part of >>>>>>>> the @FinishBundle. We are thinking of taking advantage of the >>>>>>>> underlying runner async support so the user-level code won't need to >>>>>>>> implement this logic, e.g. Samza has an AsyncSteamTask api that >>>>>>>> provides a callback to invoke after future completion[1], and Flink >>>>>>>> also has AsyncFunction api [2] which provides a ResultFuture similar >>>>>>>> to the API we discussed. >>>>>>> >>>>>>> >>>>>>> Can this be done correctly? What I mean is that if the process dies, >>>>>>> can you guarantee that no data is lost? Beam currently guarantees this >>>>>>> for FinishBundle, but if you use an arbitrary async framework this >>>>>>> might not be true. >>>>>> >>>>>> >>>>>> What a Beam runner guarantees is that *if* the bundle is committed, >>>>>> *then* finishbundle has run. So it seems just as easy to say *if* a >>>>>> bundle is committed, *then* every async result has been resolved. >>>>>> >>>>>> If the process dies the two cases should be naturally analogous. >>>>>> >>>>>> But it raises the question of whether they should be resolved prior to >>>>>> finishbundle, after, or unspecified. I lean toward unspecified. >>>>>> >>>>>> That's for a single ParDo. Where this could get complex is optimizing >>>>>> fused stages for greater asynchrony. >>>>>> >>>>>> Kenn >>>>>> >>>>>>> >>>>>>>> >>>>>>>> A simple use case for this is to execute a Runnable asynchronously in >>>>>>>> user's own executor. The following code illustrates Kenn's option #2, >>>>>>>> with a very simple single-thread pool being the executor: >>>>>>>> >>>>>>>> new DoFn<InputT, OutputT>() { >>>>>>>> @ProcessElement >>>>>>>> public void process(@Element InputT element, @Output >>>>>>>> OutputReceiver<CompletionStage<OutputT>> outputReceiver) { >>>>>>>> CompletableFuture<OutputT> future = CompletableFuture.supplyAsync( >>>>>>>> () -> someOutput, >>>>>>>> Executors.newSingleThreadExecutor()); >>>>>>>> outputReceiver.output(future); >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> The neat thing about this API is that the user can choose their own >>>>>>>> async framework and we only expect the output to be a CompletionStage. >>>>>>>> >>>>>>>> >>>>>>>> For the implementation of bundling, can we compose a CompletableFuture >>>>>>>> from each element in the bundle, e.g. CompletableFuture.allOf(...), >>>>>>>> and then invoke @FinishBundle when this future is complete? Seems this >>>>>>>> might work. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Xinyu >>>>>>>> >>>>>>>> >>>>>>>> [1] >>>>>>>> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html >>>>>>>> [2] >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html >>>>>>>> >>>>>>>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz <sniem...@apache.org> >>>>>>>> wrote: >>>>>>>>> >>>>>>>>> I'd love to see something like this as well. Also +1 to >>>>>>>>> process(@Element InputT element, @Output >>>>>>>>> OutputReceiver<CompletionStage<OutputT>>). I don't know if there's >>>>>>>>> much benefit to passing a future in, since the framework itself could >>>>>>>>> hook up the process function to complete when the future completes. >>>>>>>>> >>>>>>>>> I feel like I've spent a bunch of time writing very similar "kick off >>>>>>>>> a future in ProcessElement, join it in FinishBundle" code, and >>>>>>>>> looking around beam itself a lot of built-in transforms do it as >>>>>>>>> well. Scio provides a few AsyncDoFn implementations [1] but it'd be >>>>>>>>> great to see this as a first-class concept in beam itself. Doing >>>>>>>>> error handling, concurrency, etc correctly can be tricky. >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java >>>>>>>>> >>>>>>>>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles <k...@google.com> >>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> If the input is a CompletionStage<InputT> then the output should >>>>>>>>>> also be a CompletionStage<OutputT>, since all you should do is async >>>>>>>>>> chaining. We could enforce this by giving the DoFn an >>>>>>>>>> OutputReceiver(CompletionStage<OutputT>). >>>>>>>>>> >>>>>>>>>> Another possibility that might be even more robust against poor >>>>>>>>>> future use could be process(@Element InputT element, @Output >>>>>>>>>> OutputReceiver<CompletionStage<OutputT>>). In this way, the process >>>>>>>>>> method itself will be async chained, rather than counting on the >>>>>>>>>> user to do the right thing. >>>>>>>>>> >>>>>>>>>> We should see how these look in real use cases. The way that >>>>>>>>>> processing is split between @ProcessElement and @FinishBundle might >>>>>>>>>> complicate things. >>>>>>>>>> >>>>>>>>>> Kenn >>>>>>>>>> >>>>>>>>>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu <xinyuliu...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> Hi, guys, >>>>>>>>>>> >>>>>>>>>>> As more users try out Beam running on the SamzaRunner, we got a lot >>>>>>>>>>> of asks for an asynchronous processing API. There are a few reasons >>>>>>>>>>> for these asks: >>>>>>>>>>> >>>>>>>>>>> The users here are experienced in asynchronous programming. With >>>>>>>>>>> async frameworks such as Netty and ParSeq and libs like async >>>>>>>>>>> jersey client, they are able to make remote calls efficiently and >>>>>>>>>>> the libraries help manage the execution threads underneath. Async >>>>>>>>>>> remote calls are very common in most of our streaming applications >>>>>>>>>>> today. >>>>>>>>>>> Many jobs are running on a multi-tenancy cluster. Async processing >>>>>>>>>>> helps for less resource usage and fast computation (less context >>>>>>>>>>> switch). >>>>>>>>>>> >>>>>>>>>>> I asked about the async support in a previous email thread. The >>>>>>>>>>> following API was mentioned in the reply: >>>>>>>>>>> >>>>>>>>>>> new DoFn<InputT, OutputT>() { >>>>>>>>>>> @ProcessElement >>>>>>>>>>> public void process(@Element CompletionStage<InputT> element, >>>>>>>>>>> ...) { >>>>>>>>>>> element.thenApply(...) >>>>>>>>>>> } >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> We are wondering whether there are any discussions on this API and >>>>>>>>>>> related docs. It is awesome that you guys already considered having >>>>>>>>>>> DoFn to process asynchronously. Out of curiosity, this API seems to >>>>>>>>>>> create a CompletionState out of the input element (probably using >>>>>>>>>>> framework's executor) and then allow user to chain on it. To us, it >>>>>>>>>>> seems more convenient if the DoFn output a CompletionStage<OutputT> >>>>>>>>>>> or pass in a CompletionStage<OutputT> to invoke upon completion. >>>>>>>>>>> >>>>>>>>>>> We would like to discuss further on the async API and hopefully we >>>>>>>>>>> will have a great support in Beam. Really appreciate the feedback! >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Xinyu >>> >>> >>> >>> -- >>> >>> >>> >>> >>> Got feedback? tinyurl.com/swegner-feedback