I have put together a design document <https://docs.google.com/document/d/1t--UYXgaij0ULEoXUnhG3r8OZPBljN9r_WWlwQJBDrI/edit?usp=sharing> that consolidates our discussion in this thread. Please let me know your thoughts.
Thanks, Bharath On Wed, Jan 30, 2019 at 10:18 AM Xinyu Liu <[email protected]> wrote: > I put the asks and email discussions in this JIRA to track the Async API: > https://jira.apache.org/jira/browse/BEAM-6550. Bharath on the SamzaRunner > side is willing to take a stab at this. He will come up with some design > doc based on our discussions. Will update the thread once it's ready. > Really appreciate all the suggestions and feedback here. > > Thanks, > Xinyu > > On Thu, Jan 24, 2019 at 2:41 PM Robert Bradshaw <[email protected]> > wrote: > >> That's a good point that this "IO" time should be tracked differently. >> >> For a single level, a wrapper/utility that correctly and completely >> (and transparently) implements the "naive" bit I sketched above under >> the hood may be sufficient and implementable purely in user-space, and >> quite useful. >> >> On Thu, Jan 24, 2019 at 7:38 PM Scott Wegner <[email protected]> wrote: >> > >> > Makes sense to me. We should make it easier to write DoFn's in this >> pattern that has emerged as common among I/O connectors. >> > >> > Enabling asynchronous task chaining across a fusion tree is more >> complicated but not necessary for this scenario. >> > >> > On Thu, Jan 24, 2019 at 10:13 AM Steve Niemitz <[email protected]> >> wrote: >> >> >> >> It's also important to note that in many (most?) IO frameworks (gRPC, >> finagle, etc), asynchronous IO is typically completely non-blocking, so >> there generally won't be a large number of threads waiting for IO to >> complete. (netty uses a small pool of threads for the Event Loop Group for >> example). >> >> >> >> But in general I agree with Reuven, runners should not count threads >> in use in other thread pools for IO for the purpose of autoscaling (or most >> kinds of accounting). >> >> >> >> On Thu, Jan 24, 2019 at 12:54 PM Reuven Lax <[email protected]> wrote: >> >>> >> >>> As Steve said, the main rationale for this is so that asynchronous >> IOs (or in general, asynchronous remote calls) call be made. To some degree >> this addresses Scott's concern: the asynchronous threads should be, for the >> most part, simply waiting for IOs to complete; the reason to do the waiting >> asynchronously is so that the main threadpool does not become blocked, >> causing the pipeline to become IO bound. A runner like Dataflow should not >> be tracking these threads for the purpose of autoscaling, as adding more >> workers will (usually) not cause these calls to complete any faster. >> >>> >> >>> Reuven >> >>> >> >>> On Thu, Jan 24, 2019 at 7:28 AM Steve Niemitz <[email protected]> >> wrote: >> >>>> >> >>>> I think I agree with a lot of what you said here, I'm just going to >> restate my initial use-case to try to make it more clear as well. >> >>>> >> >>>> From my usage of beam, I feel like the big benefit of async DoFns >> would be to allow batched IO to be implemented more simply inside a DoFn. >> Even in the Beam SDK itself, there are a lot of IOs that batch up IO >> operations in ProcessElement and wait for them to complete in FinishBundle >> ([1][2], etc). From my experience, things like error handling, emitting >> outputs as the result of an asynchronous operation completing (in the >> correct window, with the correct timestamp, etc) get pretty tricky, and it >> would be great for the SDK to provide support natively for it. >> >>>> >> >>>> It's also probably good to point out that really only DoFns that do >> IO should be asynchronous, normal CPU bound DoFns have no reason to be >> asynchronous. >> >>>> >> >>>> A really good example of this is an IO I had written recently for >> Bigtable, it takes an input PCollection of ByteStrings representing row >> keys, and returns a PCollection of the row data from bigtable. Naively >> this could be implemented by simply blocking on the Bigtable read inside >> the ParDo, however this would limit throughput substantially (even assuming >> an avg read latency is 1ms, thats still only 1000 QPS / instance of the >> ParDo). My implementation batches many reads together (as they arrive at >> the DoFn), executes them once the batch is big enough (or some time >> passes), and then emits them once the batch read completes. Emitting them >> in the correct window and handling errors gets tricky, so this is certainly >> something I'd love the framework itself to handle. >> >>>> >> >>>> I also don't see a big benefit of making a DoFn receive a future, if >> all a user is ever supposed to do is attach a continuation to it, that >> could just as easily be done by the runner itself, basically just invoking >> the entire ParDo as a continuation on the future (which then assumes the >> runner is even representing these tasks as futures internally). >> >>>> >> >>>> Making the DoFn itself actually return a future could be an option, >> even if the language itself doesn't support something like `await`, you >> could still implement it yourself in the DoFn, however, it seems like it'd >> be a strange contrast to the non-async version, which returns void. >> >>>> >> >>>> [1] >> https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L720 >> >>>> [2] >> https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L1080 >> >>>> >> >>>> >> >>>> On Thu, Jan 24, 2019 at 8:43 AM Robert Bradshaw <[email protected]> >> wrote: >> >>>>> >> >>>>> If I understand correctly, the end goal is to process input elements >> >>>>> of a DoFn asynchronously. Were I to do this naively, I would >> implement >> >>>>> DoFns that simply take and receive [Serializable?]CompletionStages >> as >> >>>>> element types, followed by a DoFn that adds a callback to emit on >> >>>>> completion (possibly via a queue to avoid being-on-the-wrong-thread >> >>>>> issues) and whose finalize forces all completions. This would, of >> >>>>> course, interact poorly with processing time tracking, fusion >> breaks, >> >>>>> watermark tracking, counter attribution, window propagation, etc. so >> >>>>> it is desirable to make it part of the system itself. >> >>>>> >> >>>>> Taking a OutputReceiver<CompletionStage<OutputT>> seems like a >> decent >> >>>>> API. The invoking of the downstream process could be chained onto >> >>>>> this, with all the implicit tracking and tracing set up correctly. >> >>>>> Taking a CompletionStage as input means a DoFn would not have to >> >>>>> create its output CompletionStage ex nihilo and possibly allow for >> >>>>> better chaining (depending on the asynchronous APIs used). >> >>>>> >> >>>>> Even better might be to simply let the invocation of all >> >>>>> DoFn.process() methods be asynchronous, but as Java doesn't offer an >> >>>>> await primitive to relinquish control in the middle of a function >> body >> >>>>> this might be hard. >> >>>>> >> >>>>> I think for correctness, completion would have to be forced at the >> end >> >>>>> of each bundle. If your bundles are large enough, this may not be >> that >> >>>>> big of a deal. In this case you could also start executing >> subsequent >> >>>>> bundles while waiting for prior ones to complete. >> >>>>> >> >>>>> >> >>>>> >> >>>>> >> >>>>> On Wed, Jan 23, 2019 at 11:58 PM Bharath Kumara Subramanian >> >>>>> <[email protected]> wrote: >> >>>>> >> >> >>>>> >> I'd love to see something like this as well. Also +1 to >> process(@Element InputT element, @Output >> OutputReceiver<CompletionStage<OutputT>>). I don't know if there's much >> benefit to passing a future in, since the framework itself could hook up >> the process function to complete when the future completes. >> >>>>> > >> >>>>> > >> >>>>> > One benefit we get by wrapping the input with CompletionStage is >> to mandate[1] users to chain their processing logic to the input future; >> thereby, ensuring asynchrony for the most part. However, it is still >> possible for users to go out of their way and write blocking code. >> >>>>> > >> >>>>> > Although, I am not sure how counter intuitive it is for the >> runners to wrap the input element into a future before passing it to the >> user code. >> >>>>> > >> >>>>> > Bharath >> >>>>> > >> >>>>> > [1] CompletionStage interface does not define methods for >> initially creating, forcibly completing normally or exceptionally, probing >> completion status or results, or awaiting completion of a stage. >> Implementations of CompletionStage may provide means of achieving such >> effects, as appropriate >> >>>>> > >> >>>>> > >> >>>>> > On Wed, Jan 23, 2019 at 11:31 AM Kenneth Knowles <[email protected]> >> wrote: >> >>>>> >> >> >>>>> >> I think your concerns are valid but i want to clarify about >> "first class async APIs". Does "first class" mean that it is a >> well-encapsulated abstraction? or does it mean that the user can more or >> less do whatever they want? These are opposite but both valid meanings for >> "first class", to me. >> >>>>> >> >> >>>>> >> I would not want to encourage users to do explicit >> multi-threaded programming or control parallelism. Part of the point of >> Beam is to gain big data parallelism without explicit multithreading. I see >> asynchronous chaining of futures (or their best-approximation in your >> language of choice) as a highly disciplined way of doing asynchronous >> dependency-driven computation that is nonetheless conceptually, and >> readably, straight-line code. Threads are not required nor the only way to >> execute this code. In fact you might often want to execute without >> threading for a reference implementation to provide canonically correct >> results. APIs that leak lower-level details of threads are asking for >> trouble. >> >>>>> >> >> >>>>> >> One of our other ideas was to provide a dynamic parameter of >> type ExecutorService. The SDK harness (pre-portability: the runner) would >> control and observe parallelism while the user could simply register tasks. >> Providing a future/promise API is even more disciplined. >> >>>>> >> >> >>>>> >> Kenn >> >>>>> >> >> >>>>> >> On Wed, Jan 23, 2019 at 10:35 AM Scott Wegner <[email protected]> >> wrote: >> >>>>> >>> >> >>>>> >>> A related question is how to make execution observable such >> that a runner can make proper scaling decisions. Runners decide how to >> schedule bundles within and across multiple worker instances, and can use >> information about execution to make dynamic scaling decisions. First-class >> async APIs seem like they would encourage DoFn authors to implement their >> own parallelization, rather than deferring to the runner that should be >> more capable of providing the right level of parallelism. >> >>>>> >>> >> >>>>> >>> In the Dataflow worker harness, we estimate execution time to >> PTransform steps by sampling execution time on the execution thread and >> attributing it to the currently invoked method. This approach is fairly >> simple and possible because we assume that execution happens within the >> thread controlled by the runner. Some DoFn's already implement their own >> async logic and break this assumption; I would expect more if we make async >> built into the DoFn APIs. >> >>>>> >>> >> >>>>> >>> So: this isn't an argument against async APIs, but rather: does >> this break execution observability, and are there other lightweight >> mechanisms for attributing execution time of async work? >> >>>>> >>> >> >>>>> >>> On Tue, Jan 22, 2019 at 7:08 PM Kenneth Knowles <[email protected]> >> wrote: >> >>>>> >>>> >> >>>>> >>>> When executed over the portable APIs, it will be primarily the >> Java SDK harness that makes all of these decisions. If we wanted runners to >> have some insight into it we would have to add it to the Beam model protos. >> I don't have any suggestions there, so I would leave it out of this >> discussion until there's good ideas. We could learn a lot by trying it out >> just in the SDK harness. >> >>>>> >>>> >> >>>>> >>>> Kenn >> >>>>> >>>> >> >>>>> >>>> On Tue, Jan 22, 2019 at 6:12 PM Xinyu Liu < >> [email protected]> wrote: >> >>>>> >>>>> >> >>>>> >>>>> I don't have a strong opinion on the resolution of the >> futures regarding to @FinishBundle invocation. Leaving it to be unspecified >> does give runners more room to implement it with their own support. >> >>>>> >>>>> >> >>>>> >>>>> Optimization is also another great point. Fuse seems pretty >> complex to me too if we need to find a way to chain the resulting future >> into the next transform, or leave the async transform as a standalone stage >> initially? >> >>>>> >>>>> >> >>>>> >>>>> Btw, I was counting the number of replies before we hit the >> portability. Seems after 4 replies fuse finally showed up :). >> >>>>> >>>>> >> >>>>> >>>>> Thanks, >> >>>>> >>>>> Xinyu >> >>>>> >>>>> >> >>>>> >>>>> >> >>>>> >>>>> On Tue, Jan 22, 2019 at 5:42 PM Kenneth Knowles < >> [email protected]> wrote: >> >>>>> >>>>>> >> >>>>> >>>>>> >> >>>>> >>>>>> >> >>>>> >>>>>> On Tue, Jan 22, 2019, 17:23 Reuven Lax <[email protected] >> wrote: >> >>>>> >>>>>>> >> >>>>> >>>>>>> >> >>>>> >>>>>>> >> >>>>> >>>>>>> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu < >> [email protected]> wrote: >> >>>>> >>>>>>>> >> >>>>> >>>>>>>> @Steve: it's good to see that this is going to be useful >> in your use cases as well. Thanks for sharing the code from Scio! I can see >> in your implementation that waiting for the future completion is part of >> the @FinishBundle. We are thinking of taking advantage of the underlying >> runner async support so the user-level code won't need to implement this >> logic, e.g. Samza has an AsyncSteamTask api that provides a callback to >> invoke after future completion[1], and Flink also has AsyncFunction api [2] >> which provides a ResultFuture similar to the API we discussed. >> >>>>> >>>>>>> >> >>>>> >>>>>>> >> >>>>> >>>>>>> Can this be done correctly? What I mean is that if the >> process dies, can you guarantee that no data is lost? Beam currently >> guarantees this for FinishBundle, but if you use an arbitrary async >> framework this might not be true. >> >>>>> >>>>>> >> >>>>> >>>>>> >> >>>>> >>>>>> What a Beam runner guarantees is that *if* the bundle is >> committed, *then* finishbundle has run. So it seems just as easy to say >> *if* a bundle is committed, *then* every async result has been resolved. >> >>>>> >>>>>> >> >>>>> >>>>>> If the process dies the two cases should be naturally >> analogous. >> >>>>> >>>>>> >> >>>>> >>>>>> But it raises the question of whether they should be >> resolved prior to finishbundle, after, or unspecified. I lean toward >> unspecified. >> >>>>> >>>>>> >> >>>>> >>>>>> That's for a single ParDo. Where this could get complex is >> optimizing fused stages for greater asynchrony. >> >>>>> >>>>>> >> >>>>> >>>>>> Kenn >> >>>>> >>>>>> >> >>>>> >>>>>>> >> >>>>> >>>>>>>> >> >>>>> >>>>>>>> A simple use case for this is to execute a Runnable >> asynchronously in user's own executor. The following code illustrates >> Kenn's option #2, with a very simple single-thread pool being the executor: >> >>>>> >>>>>>>> >> >>>>> >>>>>>>> new DoFn<InputT, OutputT>() { >> >>>>> >>>>>>>> @ProcessElement >> >>>>> >>>>>>>> public void process(@Element InputT element, @Output >> OutputReceiver<CompletionStage<OutputT>> outputReceiver) { >> >>>>> >>>>>>>> CompletableFuture<OutputT> future = >> CompletableFuture.supplyAsync( >> >>>>> >>>>>>>> () -> someOutput, >> >>>>> >>>>>>>> Executors.newSingleThreadExecutor()); >> >>>>> >>>>>>>> outputReceiver.output(future); >> >>>>> >>>>>>>> } >> >>>>> >>>>>>>> } >> >>>>> >>>>>>>> >> >>>>> >>>>>>>> The neat thing about this API is that the user can choose >> their own async framework and we only expect the output to be a >> CompletionStage. >> >>>>> >>>>>>>> >> >>>>> >>>>>>>> >> >>>>> >>>>>>>> For the implementation of bundling, can we compose a >> CompletableFuture from each element in the bundle, e.g. >> CompletableFuture.allOf(...), and then invoke @FinishBundle when this >> future is complete? Seems this might work. >> >>>>> >>>>>>>> >> >>>>> >>>>>>>> Thanks, >> >>>>> >>>>>>>> Xinyu >> >>>>> >>>>>>>> >> >>>>> >>>>>>>> >> >>>>> >>>>>>>> [1] >> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html >> >>>>> >>>>>>>> [2] >> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html >> >>>>> >>>>>>>> >> >>>>> >>>>>>>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz < >> [email protected]> wrote: >> >>>>> >>>>>>>>> >> >>>>> >>>>>>>>> I'd love to see something like this as well. Also +1 to >> process(@Element InputT element, @Output >> OutputReceiver<CompletionStage<OutputT>>). I don't know if there's much >> benefit to passing a future in, since the framework itself could hook up >> the process function to complete when the future completes. >> >>>>> >>>>>>>>> >> >>>>> >>>>>>>>> I feel like I've spent a bunch of time writing very >> similar "kick off a future in ProcessElement, join it in FinishBundle" >> code, and looking around beam itself a lot of built-in transforms do it as >> well. Scio provides a few AsyncDoFn implementations [1] but it'd be great >> to see this as a first-class concept in beam itself. Doing error handling, >> concurrency, etc correctly can be tricky. >> >>>>> >>>>>>>>> >> >>>>> >>>>>>>>> [1] >> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java >> >>>>> >>>>>>>>> >> >>>>> >>>>>>>>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles < >> [email protected]> wrote: >> >>>>> >>>>>>>>>> >> >>>>> >>>>>>>>>> If the input is a CompletionStage<InputT> then the >> output should also be a CompletionStage<OutputT>, since all you should do >> is async chaining. We could enforce this by giving the DoFn an >> OutputReceiver(CompletionStage<OutputT>). >> >>>>> >>>>>>>>>> >> >>>>> >>>>>>>>>> Another possibility that might be even more robust >> against poor future use could be process(@Element InputT element, @Output >> OutputReceiver<CompletionStage<OutputT>>). In this way, the process method >> itself will be async chained, rather than counting on the user to do the >> right thing. >> >>>>> >>>>>>>>>> >> >>>>> >>>>>>>>>> We should see how these look in real use cases. The way >> that processing is split between @ProcessElement and @FinishBundle might >> complicate things. >> >>>>> >>>>>>>>>> >> >>>>> >>>>>>>>>> Kenn >> >>>>> >>>>>>>>>> >> >>>>> >>>>>>>>>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu < >> [email protected]> wrote: >> >>>>> >>>>>>>>>>> >> >>>>> >>>>>>>>>>> Hi, guys, >> >>>>> >>>>>>>>>>> >> >>>>> >>>>>>>>>>> As more users try out Beam running on the SamzaRunner, >> we got a lot of asks for an asynchronous processing API. There are a few >> reasons for these asks: >> >>>>> >>>>>>>>>>> >> >>>>> >>>>>>>>>>> The users here are experienced in asynchronous >> programming. With async frameworks such as Netty and ParSeq and libs like >> async jersey client, they are able to make remote calls efficiently and the >> libraries help manage the execution threads underneath. Async remote calls >> are very common in most of our streaming applications today. >> >>>>> >>>>>>>>>>> Many jobs are running on a multi-tenancy cluster. Async >> processing helps for less resource usage and fast computation (less context >> switch). >> >>>>> >>>>>>>>>>> >> >>>>> >>>>>>>>>>> I asked about the async support in a previous email >> thread. The following API was mentioned in the reply: >> >>>>> >>>>>>>>>>> >> >>>>> >>>>>>>>>>> new DoFn<InputT, OutputT>() { >> >>>>> >>>>>>>>>>> @ProcessElement >> >>>>> >>>>>>>>>>> public void process(@Element >> CompletionStage<InputT> element, ...) { >> >>>>> >>>>>>>>>>> element.thenApply(...) >> >>>>> >>>>>>>>>>> } >> >>>>> >>>>>>>>>>> } >> >>>>> >>>>>>>>>>> >> >>>>> >>>>>>>>>>> We are wondering whether there are any discussions on >> this API and related docs. It is awesome that you guys already considered >> having DoFn to process asynchronously. Out of curiosity, this API seems to >> create a CompletionState out of the input element (probably using >> framework's executor) and then allow user to chain on it. To us, it seems >> more convenient if the DoFn output a CompletionStage<OutputT> or pass in a >> CompletionStage<OutputT> to invoke upon completion. >> >>>>> >>>>>>>>>>> >> >>>>> >>>>>>>>>>> We would like to discuss further on the async API and >> hopefully we will have a great support in Beam. Really appreciate the >> feedback! >> >>>>> >>>>>>>>>>> >> >>>>> >>>>>>>>>>> Thanks, >> >>>>> >>>>>>>>>>> Xinyu >> >>>>> >>> >> >>>>> >>> >> >>>>> >>> >> >>>>> >>> -- >> >>>>> >>> >> >>>>> >>> >> >>>>> >>> >> >>>>> >>> >> >>>>> >>> Got feedback? tinyurl.com/swegner-feedback >> > >> > >> > >> > -- >> > >> > >> > >> > >> > Got feedback? tinyurl.com/swegner-feedback >> >
