Let me start a separate proposal thread and link this conversation. Sorry about that.
On Fri, Sep 13, 2019 at 9:31 AM Bharath Kumara Subramanian < [email protected]> wrote: > I have put together a design document > <https://docs.google.com/document/d/1t--UYXgaij0ULEoXUnhG3r8OZPBljN9r_WWlwQJBDrI/edit?usp=sharing> > that consolidates our discussion in this thread. > Please let me know your thoughts. > > Thanks, > Bharath > > > > On Wed, Jan 30, 2019 at 10:18 AM Xinyu Liu <[email protected]> wrote: > >> I put the asks and email discussions in this JIRA to track the Async API: >> https://jira.apache.org/jira/browse/BEAM-6550. Bharath on the >> SamzaRunner side is willing to take a stab at this. He will come up with >> some design doc based on our discussions. Will update the thread once it's >> ready. Really appreciate all the suggestions and feedback here. >> >> Thanks, >> Xinyu >> >> On Thu, Jan 24, 2019 at 2:41 PM Robert Bradshaw <[email protected]> >> wrote: >> >>> That's a good point that this "IO" time should be tracked differently. >>> >>> For a single level, a wrapper/utility that correctly and completely >>> (and transparently) implements the "naive" bit I sketched above under >>> the hood may be sufficient and implementable purely in user-space, and >>> quite useful. >>> >>> On Thu, Jan 24, 2019 at 7:38 PM Scott Wegner <[email protected]> wrote: >>> > >>> > Makes sense to me. We should make it easier to write DoFn's in this >>> pattern that has emerged as common among I/O connectors. >>> > >>> > Enabling asynchronous task chaining across a fusion tree is more >>> complicated but not necessary for this scenario. >>> > >>> > On Thu, Jan 24, 2019 at 10:13 AM Steve Niemitz <[email protected]> >>> wrote: >>> >> >>> >> It's also important to note that in many (most?) IO frameworks (gRPC, >>> finagle, etc), asynchronous IO is typically completely non-blocking, so >>> there generally won't be a large number of threads waiting for IO to >>> complete. (netty uses a small pool of threads for the Event Loop Group for >>> example). >>> >> >>> >> But in general I agree with Reuven, runners should not count threads >>> in use in other thread pools for IO for the purpose of autoscaling (or most >>> kinds of accounting). >>> >> >>> >> On Thu, Jan 24, 2019 at 12:54 PM Reuven Lax <[email protected]> wrote: >>> >>> >>> >>> As Steve said, the main rationale for this is so that asynchronous >>> IOs (or in general, asynchronous remote calls) call be made. To some degree >>> this addresses Scott's concern: the asynchronous threads should be, for the >>> most part, simply waiting for IOs to complete; the reason to do the waiting >>> asynchronously is so that the main threadpool does not become blocked, >>> causing the pipeline to become IO bound. A runner like Dataflow should not >>> be tracking these threads for the purpose of autoscaling, as adding more >>> workers will (usually) not cause these calls to complete any faster. >>> >>> >>> >>> Reuven >>> >>> >>> >>> On Thu, Jan 24, 2019 at 7:28 AM Steve Niemitz <[email protected]> >>> wrote: >>> >>>> >>> >>>> I think I agree with a lot of what you said here, I'm just going to >>> restate my initial use-case to try to make it more clear as well. >>> >>>> >>> >>>> From my usage of beam, I feel like the big benefit of async DoFns >>> would be to allow batched IO to be implemented more simply inside a DoFn. >>> Even in the Beam SDK itself, there are a lot of IOs that batch up IO >>> operations in ProcessElement and wait for them to complete in FinishBundle >>> ([1][2], etc). From my experience, things like error handling, emitting >>> outputs as the result of an asynchronous operation completing (in the >>> correct window, with the correct timestamp, etc) get pretty tricky, and it >>> would be great for the SDK to provide support natively for it. >>> >>>> >>> >>>> It's also probably good to point out that really only DoFns that do >>> IO should be asynchronous, normal CPU bound DoFns have no reason to be >>> asynchronous. >>> >>>> >>> >>>> A really good example of this is an IO I had written recently for >>> Bigtable, it takes an input PCollection of ByteStrings representing row >>> keys, and returns a PCollection of the row data from bigtable. Naively >>> this could be implemented by simply blocking on the Bigtable read inside >>> the ParDo, however this would limit throughput substantially (even assuming >>> an avg read latency is 1ms, thats still only 1000 QPS / instance of the >>> ParDo). My implementation batches many reads together (as they arrive at >>> the DoFn), executes them once the batch is big enough (or some time >>> passes), and then emits them once the batch read completes. Emitting them >>> in the correct window and handling errors gets tricky, so this is certainly >>> something I'd love the framework itself to handle. >>> >>>> >>> >>>> I also don't see a big benefit of making a DoFn receive a future, >>> if all a user is ever supposed to do is attach a continuation to it, that >>> could just as easily be done by the runner itself, basically just invoking >>> the entire ParDo as a continuation on the future (which then assumes the >>> runner is even representing these tasks as futures internally). >>> >>>> >>> >>>> Making the DoFn itself actually return a future could be an option, >>> even if the language itself doesn't support something like `await`, you >>> could still implement it yourself in the DoFn, however, it seems like it'd >>> be a strange contrast to the non-async version, which returns void. >>> >>>> >>> >>>> [1] >>> https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L720 >>> >>>> [2] >>> https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L1080 >>> >>>> >>> >>>> >>> >>>> On Thu, Jan 24, 2019 at 8:43 AM Robert Bradshaw < >>> [email protected]> wrote: >>> >>>>> >>> >>>>> If I understand correctly, the end goal is to process input >>> elements >>> >>>>> of a DoFn asynchronously. Were I to do this naively, I would >>> implement >>> >>>>> DoFns that simply take and receive [Serializable?]CompletionStages >>> as >>> >>>>> element types, followed by a DoFn that adds a callback to emit on >>> >>>>> completion (possibly via a queue to avoid being-on-the-wrong-thread >>> >>>>> issues) and whose finalize forces all completions. This would, of >>> >>>>> course, interact poorly with processing time tracking, fusion >>> breaks, >>> >>>>> watermark tracking, counter attribution, window propagation, etc. >>> so >>> >>>>> it is desirable to make it part of the system itself. >>> >>>>> >>> >>>>> Taking a OutputReceiver<CompletionStage<OutputT>> seems like a >>> decent >>> >>>>> API. The invoking of the downstream process could be chained onto >>> >>>>> this, with all the implicit tracking and tracing set up correctly. >>> >>>>> Taking a CompletionStage as input means a DoFn would not have to >>> >>>>> create its output CompletionStage ex nihilo and possibly allow for >>> >>>>> better chaining (depending on the asynchronous APIs used). >>> >>>>> >>> >>>>> Even better might be to simply let the invocation of all >>> >>>>> DoFn.process() methods be asynchronous, but as Java doesn't offer >>> an >>> >>>>> await primitive to relinquish control in the middle of a function >>> body >>> >>>>> this might be hard. >>> >>>>> >>> >>>>> I think for correctness, completion would have to be forced at the >>> end >>> >>>>> of each bundle. If your bundles are large enough, this may not be >>> that >>> >>>>> big of a deal. In this case you could also start executing >>> subsequent >>> >>>>> bundles while waiting for prior ones to complete. >>> >>>>> >>> >>>>> >>> >>>>> >>> >>>>> >>> >>>>> On Wed, Jan 23, 2019 at 11:58 PM Bharath Kumara Subramanian >>> >>>>> <[email protected]> wrote: >>> >>>>> >> >>> >>>>> >> I'd love to see something like this as well. Also +1 to >>> process(@Element InputT element, @Output >>> OutputReceiver<CompletionStage<OutputT>>). I don't know if there's much >>> benefit to passing a future in, since the framework itself could hook up >>> the process function to complete when the future completes. >>> >>>>> > >>> >>>>> > >>> >>>>> > One benefit we get by wrapping the input with CompletionStage is >>> to mandate[1] users to chain their processing logic to the input future; >>> thereby, ensuring asynchrony for the most part. However, it is still >>> possible for users to go out of their way and write blocking code. >>> >>>>> > >>> >>>>> > Although, I am not sure how counter intuitive it is for the >>> runners to wrap the input element into a future before passing it to the >>> user code. >>> >>>>> > >>> >>>>> > Bharath >>> >>>>> > >>> >>>>> > [1] CompletionStage interface does not define methods for >>> initially creating, forcibly completing normally or exceptionally, probing >>> completion status or results, or awaiting completion of a stage. >>> Implementations of CompletionStage may provide means of achieving such >>> effects, as appropriate >>> >>>>> > >>> >>>>> > >>> >>>>> > On Wed, Jan 23, 2019 at 11:31 AM Kenneth Knowles < >>> [email protected]> wrote: >>> >>>>> >> >>> >>>>> >> I think your concerns are valid but i want to clarify about >>> "first class async APIs". Does "first class" mean that it is a >>> well-encapsulated abstraction? or does it mean that the user can more or >>> less do whatever they want? These are opposite but both valid meanings for >>> "first class", to me. >>> >>>>> >> >>> >>>>> >> I would not want to encourage users to do explicit >>> multi-threaded programming or control parallelism. Part of the point of >>> Beam is to gain big data parallelism without explicit multithreading. I see >>> asynchronous chaining of futures (or their best-approximation in your >>> language of choice) as a highly disciplined way of doing asynchronous >>> dependency-driven computation that is nonetheless conceptually, and >>> readably, straight-line code. Threads are not required nor the only way to >>> execute this code. In fact you might often want to execute without >>> threading for a reference implementation to provide canonically correct >>> results. APIs that leak lower-level details of threads are asking for >>> trouble. >>> >>>>> >> >>> >>>>> >> One of our other ideas was to provide a dynamic parameter of >>> type ExecutorService. The SDK harness (pre-portability: the runner) would >>> control and observe parallelism while the user could simply register tasks. >>> Providing a future/promise API is even more disciplined. >>> >>>>> >> >>> >>>>> >> Kenn >>> >>>>> >> >>> >>>>> >> On Wed, Jan 23, 2019 at 10:35 AM Scott Wegner <[email protected]> >>> wrote: >>> >>>>> >>> >>> >>>>> >>> A related question is how to make execution observable such >>> that a runner can make proper scaling decisions. Runners decide how to >>> schedule bundles within and across multiple worker instances, and can use >>> information about execution to make dynamic scaling decisions. First-class >>> async APIs seem like they would encourage DoFn authors to implement their >>> own parallelization, rather than deferring to the runner that should be >>> more capable of providing the right level of parallelism. >>> >>>>> >>> >>> >>>>> >>> In the Dataflow worker harness, we estimate execution time to >>> PTransform steps by sampling execution time on the execution thread and >>> attributing it to the currently invoked method. This approach is fairly >>> simple and possible because we assume that execution happens within the >>> thread controlled by the runner. Some DoFn's already implement their own >>> async logic and break this assumption; I would expect more if we make async >>> built into the DoFn APIs. >>> >>>>> >>> >>> >>>>> >>> So: this isn't an argument against async APIs, but rather: >>> does this break execution observability, and are there other lightweight >>> mechanisms for attributing execution time of async work? >>> >>>>> >>> >>> >>>>> >>> On Tue, Jan 22, 2019 at 7:08 PM Kenneth Knowles < >>> [email protected]> wrote: >>> >>>>> >>>> >>> >>>>> >>>> When executed over the portable APIs, it will be primarily >>> the Java SDK harness that makes all of these decisions. If we wanted >>> runners to have some insight into it we would have to add it to the Beam >>> model protos. I don't have any suggestions there, so I would leave it out >>> of this discussion until there's good ideas. We could learn a lot by trying >>> it out just in the SDK harness. >>> >>>>> >>>> >>> >>>>> >>>> Kenn >>> >>>>> >>>> >>> >>>>> >>>> On Tue, Jan 22, 2019 at 6:12 PM Xinyu Liu < >>> [email protected]> wrote: >>> >>>>> >>>>> >>> >>>>> >>>>> I don't have a strong opinion on the resolution of the >>> futures regarding to @FinishBundle invocation. Leaving it to be unspecified >>> does give runners more room to implement it with their own support. >>> >>>>> >>>>> >>> >>>>> >>>>> Optimization is also another great point. Fuse seems pretty >>> complex to me too if we need to find a way to chain the resulting future >>> into the next transform, or leave the async transform as a standalone stage >>> initially? >>> >>>>> >>>>> >>> >>>>> >>>>> Btw, I was counting the number of replies before we hit the >>> portability. Seems after 4 replies fuse finally showed up :). >>> >>>>> >>>>> >>> >>>>> >>>>> Thanks, >>> >>>>> >>>>> Xinyu >>> >>>>> >>>>> >>> >>>>> >>>>> >>> >>>>> >>>>> On Tue, Jan 22, 2019 at 5:42 PM Kenneth Knowles < >>> [email protected]> wrote: >>> >>>>> >>>>>> >>> >>>>> >>>>>> >>> >>>>> >>>>>> >>> >>>>> >>>>>> On Tue, Jan 22, 2019, 17:23 Reuven Lax <[email protected] >>> wrote: >>> >>>>> >>>>>>> >>> >>>>> >>>>>>> >>> >>>>> >>>>>>> >>> >>>>> >>>>>>> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu < >>> [email protected]> wrote: >>> >>>>> >>>>>>>> >>> >>>>> >>>>>>>> @Steve: it's good to see that this is going to be useful >>> in your use cases as well. Thanks for sharing the code from Scio! I can see >>> in your implementation that waiting for the future completion is part of >>> the @FinishBundle. We are thinking of taking advantage of the underlying >>> runner async support so the user-level code won't need to implement this >>> logic, e.g. Samza has an AsyncSteamTask api that provides a callback to >>> invoke after future completion[1], and Flink also has AsyncFunction api [2] >>> which provides a ResultFuture similar to the API we discussed. >>> >>>>> >>>>>>> >>> >>>>> >>>>>>> >>> >>>>> >>>>>>> Can this be done correctly? What I mean is that if the >>> process dies, can you guarantee that no data is lost? Beam currently >>> guarantees this for FinishBundle, but if you use an arbitrary async >>> framework this might not be true. >>> >>>>> >>>>>> >>> >>>>> >>>>>> >>> >>>>> >>>>>> What a Beam runner guarantees is that *if* the bundle is >>> committed, *then* finishbundle has run. So it seems just as easy to say >>> *if* a bundle is committed, *then* every async result has been resolved. >>> >>>>> >>>>>> >>> >>>>> >>>>>> If the process dies the two cases should be naturally >>> analogous. >>> >>>>> >>>>>> >>> >>>>> >>>>>> But it raises the question of whether they should be >>> resolved prior to finishbundle, after, or unspecified. I lean toward >>> unspecified. >>> >>>>> >>>>>> >>> >>>>> >>>>>> That's for a single ParDo. Where this could get complex is >>> optimizing fused stages for greater asynchrony. >>> >>>>> >>>>>> >>> >>>>> >>>>>> Kenn >>> >>>>> >>>>>> >>> >>>>> >>>>>>> >>> >>>>> >>>>>>>> >>> >>>>> >>>>>>>> A simple use case for this is to execute a Runnable >>> asynchronously in user's own executor. The following code illustrates >>> Kenn's option #2, with a very simple single-thread pool being the executor: >>> >>>>> >>>>>>>> >>> >>>>> >>>>>>>> new DoFn<InputT, OutputT>() { >>> >>>>> >>>>>>>> @ProcessElement >>> >>>>> >>>>>>>> public void process(@Element InputT element, @Output >>> OutputReceiver<CompletionStage<OutputT>> outputReceiver) { >>> >>>>> >>>>>>>> CompletableFuture<OutputT> future = >>> CompletableFuture.supplyAsync( >>> >>>>> >>>>>>>> () -> someOutput, >>> >>>>> >>>>>>>> Executors.newSingleThreadExecutor()); >>> >>>>> >>>>>>>> outputReceiver.output(future); >>> >>>>> >>>>>>>> } >>> >>>>> >>>>>>>> } >>> >>>>> >>>>>>>> >>> >>>>> >>>>>>>> The neat thing about this API is that the user can choose >>> their own async framework and we only expect the output to be a >>> CompletionStage. >>> >>>>> >>>>>>>> >>> >>>>> >>>>>>>> >>> >>>>> >>>>>>>> For the implementation of bundling, can we compose a >>> CompletableFuture from each element in the bundle, e.g. >>> CompletableFuture.allOf(...), and then invoke @FinishBundle when this >>> future is complete? Seems this might work. >>> >>>>> >>>>>>>> >>> >>>>> >>>>>>>> Thanks, >>> >>>>> >>>>>>>> Xinyu >>> >>>>> >>>>>>>> >>> >>>>> >>>>>>>> >>> >>>>> >>>>>>>> [1] >>> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html >>> >>>>> >>>>>>>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html >>> >>>>> >>>>>>>> >>> >>>>> >>>>>>>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz < >>> [email protected]> wrote: >>> >>>>> >>>>>>>>> >>> >>>>> >>>>>>>>> I'd love to see something like this as well. Also +1 to >>> process(@Element InputT element, @Output >>> OutputReceiver<CompletionStage<OutputT>>). I don't know if there's much >>> benefit to passing a future in, since the framework itself could hook up >>> the process function to complete when the future completes. >>> >>>>> >>>>>>>>> >>> >>>>> >>>>>>>>> I feel like I've spent a bunch of time writing very >>> similar "kick off a future in ProcessElement, join it in FinishBundle" >>> code, and looking around beam itself a lot of built-in transforms do it as >>> well. Scio provides a few AsyncDoFn implementations [1] but it'd be great >>> to see this as a first-class concept in beam itself. Doing error handling, >>> concurrency, etc correctly can be tricky. >>> >>>>> >>>>>>>>> >>> >>>>> >>>>>>>>> [1] >>> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java >>> >>>>> >>>>>>>>> >>> >>>>> >>>>>>>>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles < >>> [email protected]> wrote: >>> >>>>> >>>>>>>>>> >>> >>>>> >>>>>>>>>> If the input is a CompletionStage<InputT> then the >>> output should also be a CompletionStage<OutputT>, since all you should do >>> is async chaining. We could enforce this by giving the DoFn an >>> OutputReceiver(CompletionStage<OutputT>). >>> >>>>> >>>>>>>>>> >>> >>>>> >>>>>>>>>> Another possibility that might be even more robust >>> against poor future use could be process(@Element InputT element, @Output >>> OutputReceiver<CompletionStage<OutputT>>). In this way, the process method >>> itself will be async chained, rather than counting on the user to do the >>> right thing. >>> >>>>> >>>>>>>>>> >>> >>>>> >>>>>>>>>> We should see how these look in real use cases. The way >>> that processing is split between @ProcessElement and @FinishBundle might >>> complicate things. >>> >>>>> >>>>>>>>>> >>> >>>>> >>>>>>>>>> Kenn >>> >>>>> >>>>>>>>>> >>> >>>>> >>>>>>>>>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu < >>> [email protected]> wrote: >>> >>>>> >>>>>>>>>>> >>> >>>>> >>>>>>>>>>> Hi, guys, >>> >>>>> >>>>>>>>>>> >>> >>>>> >>>>>>>>>>> As more users try out Beam running on the SamzaRunner, >>> we got a lot of asks for an asynchronous processing API. There are a few >>> reasons for these asks: >>> >>>>> >>>>>>>>>>> >>> >>>>> >>>>>>>>>>> The users here are experienced in asynchronous >>> programming. With async frameworks such as Netty and ParSeq and libs like >>> async jersey client, they are able to make remote calls efficiently and the >>> libraries help manage the execution threads underneath. Async remote calls >>> are very common in most of our streaming applications today. >>> >>>>> >>>>>>>>>>> Many jobs are running on a multi-tenancy cluster. >>> Async processing helps for less resource usage and fast computation (less >>> context switch). >>> >>>>> >>>>>>>>>>> >>> >>>>> >>>>>>>>>>> I asked about the async support in a previous email >>> thread. The following API was mentioned in the reply: >>> >>>>> >>>>>>>>>>> >>> >>>>> >>>>>>>>>>> new DoFn<InputT, OutputT>() { >>> >>>>> >>>>>>>>>>> @ProcessElement >>> >>>>> >>>>>>>>>>> public void process(@Element >>> CompletionStage<InputT> element, ...) { >>> >>>>> >>>>>>>>>>> element.thenApply(...) >>> >>>>> >>>>>>>>>>> } >>> >>>>> >>>>>>>>>>> } >>> >>>>> >>>>>>>>>>> >>> >>>>> >>>>>>>>>>> We are wondering whether there are any discussions on >>> this API and related docs. It is awesome that you guys already considered >>> having DoFn to process asynchronously. Out of curiosity, this API seems to >>> create a CompletionState out of the input element (probably using >>> framework's executor) and then allow user to chain on it. To us, it seems >>> more convenient if the DoFn output a CompletionStage<OutputT> or pass in a >>> CompletionStage<OutputT> to invoke upon completion. >>> >>>>> >>>>>>>>>>> >>> >>>>> >>>>>>>>>>> We would like to discuss further on the async API and >>> hopefully we will have a great support in Beam. Really appreciate the >>> feedback! >>> >>>>> >>>>>>>>>>> >>> >>>>> >>>>>>>>>>> Thanks, >>> >>>>> >>>>>>>>>>> Xinyu >>> >>>>> >>> >>> >>>>> >>> >>> >>>>> >>> >>> >>>>> >>> -- >>> >>>>> >>> >>> >>>>> >>> >>> >>>>> >>> >>> >>>>> >>> >>> >>>>> >>> Got feedback? tinyurl.com/swegner-feedback >>> > >>> > >>> > >>> > -- >>> > >>> > >>> > >>> > >>> > Got feedback? tinyurl.com/swegner-feedback >>> >>
