Makes sense to me. We should make it easier to write DoFn's in this pattern that has emerged as common among I/O connectors.
Enabling asynchronous task chaining across a fusion tree is more complicated but not necessary for this scenario. On Thu, Jan 24, 2019 at 10:13 AM Steve Niemitz <sniem...@apache.org> wrote: > It's also important to note that in many (most?) IO frameworks (gRPC, > finagle, etc), asynchronous IO is typically completely non-blocking, so > there generally won't be a large number of threads waiting for IO to > complete. (netty uses a small pool of threads for the Event Loop Group for > example). > > But in general I agree with Reuven, runners should not count threads in > use in other thread pools for IO for the purpose of autoscaling (or most > kinds of accounting). > > On Thu, Jan 24, 2019 at 12:54 PM Reuven Lax <re...@google.com> wrote: > >> As Steve said, the main rationale for this is so that asynchronous IOs >> (or in general, asynchronous remote calls) call be made. To some degree >> this addresses Scott's concern: the asynchronous threads should be, for the >> most part, simply waiting for IOs to complete; the reason to do the waiting >> asynchronously is so that the main threadpool does not become blocked, >> causing the pipeline to become IO bound. A runner like Dataflow should not >> be tracking these threads for the purpose of autoscaling, as adding more >> workers will (usually) not cause these calls to complete any faster. >> >> Reuven >> >> On Thu, Jan 24, 2019 at 7:28 AM Steve Niemitz <sniem...@apache.org> >> wrote: >> >>> I think I agree with a lot of what you said here, I'm just going to >>> restate my initial use-case to try to make it more clear as well. >>> >>> From my usage of beam, I feel like the big benefit of async DoFns would >>> be to allow batched IO to be implemented more simply inside a DoFn. Even >>> in the Beam SDK itself, there are a lot of IOs that batch up IO operations >>> in ProcessElement and wait for them to complete in FinishBundle ([1][2], >>> etc). From my experience, things like error handling, emitting outputs as >>> the result of an asynchronous operation completing (in the correct window, >>> with the correct timestamp, etc) get pretty tricky, and it would be great >>> for the SDK to provide support natively for it. >>> >>> It's also probably good to point out that really only DoFns that do IO >>> should be asynchronous, normal CPU bound DoFns have no reason to be >>> asynchronous. >>> >>> A really good example of this is an IO I had written recently for >>> Bigtable, it takes an input PCollection of ByteStrings representing row >>> keys, and returns a PCollection of the row data from bigtable. Naively >>> this could be implemented by simply blocking on the Bigtable read inside >>> the ParDo, however this would limit throughput substantially (even assuming >>> an avg read latency is 1ms, thats still only 1000 QPS / instance of the >>> ParDo). My implementation batches many reads together (as they arrive at >>> the DoFn), executes them once the batch is big enough (or some time >>> passes), and then emits them once the batch read completes. Emitting them >>> in the correct window and handling errors gets tricky, so this is certainly >>> something I'd love the framework itself to handle. >>> >>> I also don't see a big benefit of making a DoFn receive a future, if all >>> a user is ever supposed to do is attach a continuation to it, that could >>> just as easily be done by the runner itself, basically just invoking the >>> entire ParDo as a continuation on the future (which then assumes the runner >>> is even representing these tasks as futures internally). >>> >>> Making the DoFn itself actually return a future could be an option, even >>> if the language itself doesn't support something like `await`, you could >>> still implement it yourself in the DoFn, however, it seems like it'd be a >>> strange contrast to the non-async version, which returns void. >>> >>> [1] >>> https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L720 >>> [2] >>> https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L1080 >>> >>> >>> On Thu, Jan 24, 2019 at 8:43 AM Robert Bradshaw <rober...@google.com> >>> wrote: >>> >>>> If I understand correctly, the end goal is to process input elements >>>> of a DoFn asynchronously. Were I to do this naively, I would implement >>>> DoFns that simply take and receive [Serializable?]CompletionStages as >>>> element types, followed by a DoFn that adds a callback to emit on >>>> completion (possibly via a queue to avoid being-on-the-wrong-thread >>>> issues) and whose finalize forces all completions. This would, of >>>> course, interact poorly with processing time tracking, fusion breaks, >>>> watermark tracking, counter attribution, window propagation, etc. so >>>> it is desirable to make it part of the system itself. >>>> >>>> Taking a OutputReceiver<CompletionStage<OutputT>> seems like a decent >>>> API. The invoking of the downstream process could be chained onto >>>> this, with all the implicit tracking and tracing set up correctly. >>>> Taking a CompletionStage as input means a DoFn would not have to >>>> create its output CompletionStage ex nihilo and possibly allow for >>>> better chaining (depending on the asynchronous APIs used). >>>> >>>> Even better might be to simply let the invocation of all >>>> DoFn.process() methods be asynchronous, but as Java doesn't offer an >>>> await primitive to relinquish control in the middle of a function body >>>> this might be hard. >>>> >>>> I think for correctness, completion would have to be forced at the end >>>> of each bundle. If your bundles are large enough, this may not be that >>>> big of a deal. In this case you could also start executing subsequent >>>> bundles while waiting for prior ones to complete. >>>> >>>> >>>> >>>> >>>> On Wed, Jan 23, 2019 at 11:58 PM Bharath Kumara Subramanian >>>> <codin.mart...@gmail.com> wrote: >>>> >> >>>> >> I'd love to see something like this as well. Also +1 to >>>> process(@Element InputT element, @Output >>>> OutputReceiver<CompletionStage<OutputT>>). I don't know if there's much >>>> benefit to passing a future in, since the framework itself could hook up >>>> the process function to complete when the future completes. >>>> > >>>> > >>>> > One benefit we get by wrapping the input with CompletionStage is to >>>> mandate[1] users to chain their processing logic to the input future; >>>> thereby, ensuring asynchrony for the most part. However, it is still >>>> possible for users to go out of their way and write blocking code. >>>> > >>>> > Although, I am not sure how counter intuitive it is for the runners >>>> to wrap the input element into a future before passing it to the user code. >>>> > >>>> > Bharath >>>> > >>>> > [1] CompletionStage interface does not define methods for initially >>>> creating, forcibly completing normally or exceptionally, probing completion >>>> status or results, or awaiting completion of a stage. Implementations of >>>> CompletionStage may provide means of achieving such effects, as appropriate >>>> > >>>> > >>>> > On Wed, Jan 23, 2019 at 11:31 AM Kenneth Knowles <k...@apache.org> >>>> wrote: >>>> >> >>>> >> I think your concerns are valid but i want to clarify about "first >>>> class async APIs". Does "first class" mean that it is a well-encapsulated >>>> abstraction? or does it mean that the user can more or less do whatever >>>> they want? These are opposite but both valid meanings for "first class", to >>>> me. >>>> >> >>>> >> I would not want to encourage users to do explicit multi-threaded >>>> programming or control parallelism. Part of the point of Beam is to gain >>>> big data parallelism without explicit multithreading. I see asynchronous >>>> chaining of futures (or their best-approximation in your language of >>>> choice) as a highly disciplined way of doing asynchronous dependency-driven >>>> computation that is nonetheless conceptually, and readably, straight-line >>>> code. Threads are not required nor the only way to execute this code. In >>>> fact you might often want to execute without threading for a reference >>>> implementation to provide canonically correct results. APIs that leak >>>> lower-level details of threads are asking for trouble. >>>> >> >>>> >> One of our other ideas was to provide a dynamic parameter of type >>>> ExecutorService. The SDK harness (pre-portability: the runner) would >>>> control and observe parallelism while the user could simply register tasks. >>>> Providing a future/promise API is even more disciplined. >>>> >> >>>> >> Kenn >>>> >> >>>> >> On Wed, Jan 23, 2019 at 10:35 AM Scott Wegner <sc...@apache.org> >>>> wrote: >>>> >>> >>>> >>> A related question is how to make execution observable such that a >>>> runner can make proper scaling decisions. Runners decide how to schedule >>>> bundles within and across multiple worker instances, and can use >>>> information about execution to make dynamic scaling decisions. First-class >>>> async APIs seem like they would encourage DoFn authors to implement their >>>> own parallelization, rather than deferring to the runner that should be >>>> more capable of providing the right level of parallelism. >>>> >>> >>>> >>> In the Dataflow worker harness, we estimate execution time to >>>> PTransform steps by sampling execution time on the execution thread and >>>> attributing it to the currently invoked method. This approach is fairly >>>> simple and possible because we assume that execution happens within the >>>> thread controlled by the runner. Some DoFn's already implement their own >>>> async logic and break this assumption; I would expect more if we make async >>>> built into the DoFn APIs. >>>> >>> >>>> >>> So: this isn't an argument against async APIs, but rather: does >>>> this break execution observability, and are there other lightweight >>>> mechanisms for attributing execution time of async work? >>>> >>> >>>> >>> On Tue, Jan 22, 2019 at 7:08 PM Kenneth Knowles <k...@google.com> >>>> wrote: >>>> >>>> >>>> >>>> When executed over the portable APIs, it will be primarily the >>>> Java SDK harness that makes all of these decisions. If we wanted runners to >>>> have some insight into it we would have to add it to the Beam model protos. >>>> I don't have any suggestions there, so I would leave it out of this >>>> discussion until there's good ideas. We could learn a lot by trying it out >>>> just in the SDK harness. >>>> >>>> >>>> >>>> Kenn >>>> >>>> >>>> >>>> On Tue, Jan 22, 2019 at 6:12 PM Xinyu Liu <xinyuliu...@gmail.com> >>>> wrote: >>>> >>>>> >>>> >>>>> I don't have a strong opinion on the resolution of the futures >>>> regarding to @FinishBundle invocation. Leaving it to be unspecified does >>>> give runners more room to implement it with their own support. >>>> >>>>> >>>> >>>>> Optimization is also another great point. Fuse seems pretty >>>> complex to me too if we need to find a way to chain the resulting future >>>> into the next transform, or leave the async transform as a standalone stage >>>> initially? >>>> >>>>> >>>> >>>>> Btw, I was counting the number of replies before we hit the >>>> portability. Seems after 4 replies fuse finally showed up :). >>>> >>>>> >>>> >>>>> Thanks, >>>> >>>>> Xinyu >>>> >>>>> >>>> >>>>> >>>> >>>>> On Tue, Jan 22, 2019 at 5:42 PM Kenneth Knowles <k...@google.com> >>>> wrote: >>>> >>>>>> >>>> >>>>>> >>>> >>>>>> >>>> >>>>>> On Tue, Jan 22, 2019, 17:23 Reuven Lax <re...@google.com wrote: >>>> >>>>>>> >>>> >>>>>>> >>>> >>>>>>> >>>> >>>>>>> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu < >>>> xinyuliu...@gmail.com> wrote: >>>> >>>>>>>> >>>> >>>>>>>> @Steve: it's good to see that this is going to be useful in >>>> your use cases as well. Thanks for sharing the code from Scio! I can see in >>>> your implementation that waiting for the future completion is part of the >>>> @FinishBundle. We are thinking of taking advantage of the underlying runner >>>> async support so the user-level code won't need to implement this logic, >>>> e.g. Samza has an AsyncSteamTask api that provides a callback to invoke >>>> after future completion[1], and Flink also has AsyncFunction api [2] which >>>> provides a ResultFuture similar to the API we discussed. >>>> >>>>>>> >>>> >>>>>>> >>>> >>>>>>> Can this be done correctly? What I mean is that if the process >>>> dies, can you guarantee that no data is lost? Beam currently guarantees >>>> this for FinishBundle, but if you use an arbitrary async framework this >>>> might not be true. >>>> >>>>>> >>>> >>>>>> >>>> >>>>>> What a Beam runner guarantees is that *if* the bundle is >>>> committed, *then* finishbundle has run. So it seems just as easy to say >>>> *if* a bundle is committed, *then* every async result has been resolved. >>>> >>>>>> >>>> >>>>>> If the process dies the two cases should be naturally analogous. >>>> >>>>>> >>>> >>>>>> But it raises the question of whether they should be resolved >>>> prior to finishbundle, after, or unspecified. I lean toward unspecified. >>>> >>>>>> >>>> >>>>>> That's for a single ParDo. Where this could get complex is >>>> optimizing fused stages for greater asynchrony. >>>> >>>>>> >>>> >>>>>> Kenn >>>> >>>>>> >>>> >>>>>>> >>>> >>>>>>>> >>>> >>>>>>>> A simple use case for this is to execute a Runnable >>>> asynchronously in user's own executor. The following code illustrates >>>> Kenn's option #2, with a very simple single-thread pool being the executor: >>>> >>>>>>>> >>>> >>>>>>>> new DoFn<InputT, OutputT>() { >>>> >>>>>>>> @ProcessElement >>>> >>>>>>>> public void process(@Element InputT element, @Output >>>> OutputReceiver<CompletionStage<OutputT>> outputReceiver) { >>>> >>>>>>>> CompletableFuture<OutputT> future = >>>> CompletableFuture.supplyAsync( >>>> >>>>>>>> () -> someOutput, >>>> >>>>>>>> Executors.newSingleThreadExecutor()); >>>> >>>>>>>> outputReceiver.output(future); >>>> >>>>>>>> } >>>> >>>>>>>> } >>>> >>>>>>>> >>>> >>>>>>>> The neat thing about this API is that the user can choose >>>> their own async framework and we only expect the output to be a >>>> CompletionStage. >>>> >>>>>>>> >>>> >>>>>>>> >>>> >>>>>>>> For the implementation of bundling, can we compose a >>>> CompletableFuture from each element in the bundle, e.g. >>>> CompletableFuture.allOf(...), and then invoke @FinishBundle when this >>>> future is complete? Seems this might work. >>>> >>>>>>>> >>>> >>>>>>>> Thanks, >>>> >>>>>>>> Xinyu >>>> >>>>>>>> >>>> >>>>>>>> >>>> >>>>>>>> [1] >>>> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html >>>> >>>>>>>> [2] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html >>>> >>>>>>>> >>>> >>>>>>>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz < >>>> sniem...@apache.org> wrote: >>>> >>>>>>>>> >>>> >>>>>>>>> I'd love to see something like this as well. Also +1 to >>>> process(@Element InputT element, @Output >>>> OutputReceiver<CompletionStage<OutputT>>). I don't know if there's much >>>> benefit to passing a future in, since the framework itself could hook up >>>> the process function to complete when the future completes. >>>> >>>>>>>>> >>>> >>>>>>>>> I feel like I've spent a bunch of time writing very similar >>>> "kick off a future in ProcessElement, join it in FinishBundle" code, and >>>> looking around beam itself a lot of built-in transforms do it as well. >>>> Scio provides a few AsyncDoFn implementations [1] but it'd be great to see >>>> this as a first-class concept in beam itself. Doing error handling, >>>> concurrency, etc correctly can be tricky. >>>> >>>>>>>>> >>>> >>>>>>>>> [1] >>>> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java >>>> >>>>>>>>> >>>> >>>>>>>>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles < >>>> k...@google.com> wrote: >>>> >>>>>>>>>> >>>> >>>>>>>>>> If the input is a CompletionStage<InputT> then the output >>>> should also be a CompletionStage<OutputT>, since all you should do is async >>>> chaining. We could enforce this by giving the DoFn an >>>> OutputReceiver(CompletionStage<OutputT>). >>>> >>>>>>>>>> >>>> >>>>>>>>>> Another possibility that might be even more robust against >>>> poor future use could be process(@Element InputT element, @Output >>>> OutputReceiver<CompletionStage<OutputT>>). In this way, the process method >>>> itself will be async chained, rather than counting on the user to do the >>>> right thing. >>>> >>>>>>>>>> >>>> >>>>>>>>>> We should see how these look in real use cases. The way that >>>> processing is split between @ProcessElement and @FinishBundle might >>>> complicate things. >>>> >>>>>>>>>> >>>> >>>>>>>>>> Kenn >>>> >>>>>>>>>> >>>> >>>>>>>>>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu < >>>> xinyuliu...@gmail.com> wrote: >>>> >>>>>>>>>>> >>>> >>>>>>>>>>> Hi, guys, >>>> >>>>>>>>>>> >>>> >>>>>>>>>>> As more users try out Beam running on the SamzaRunner, we >>>> got a lot of asks for an asynchronous processing API. There are a few >>>> reasons for these asks: >>>> >>>>>>>>>>> >>>> >>>>>>>>>>> The users here are experienced in asynchronous programming. >>>> With async frameworks such as Netty and ParSeq and libs like async jersey >>>> client, they are able to make remote calls efficiently and the libraries >>>> help manage the execution threads underneath. Async remote calls are very >>>> common in most of our streaming applications today. >>>> >>>>>>>>>>> Many jobs are running on a multi-tenancy cluster. Async >>>> processing helps for less resource usage and fast computation (less context >>>> switch). >>>> >>>>>>>>>>> >>>> >>>>>>>>>>> I asked about the async support in a previous email thread. >>>> The following API was mentioned in the reply: >>>> >>>>>>>>>>> >>>> >>>>>>>>>>> new DoFn<InputT, OutputT>() { >>>> >>>>>>>>>>> @ProcessElement >>>> >>>>>>>>>>> public void process(@Element CompletionStage<InputT> >>>> element, ...) { >>>> >>>>>>>>>>> element.thenApply(...) >>>> >>>>>>>>>>> } >>>> >>>>>>>>>>> } >>>> >>>>>>>>>>> >>>> >>>>>>>>>>> We are wondering whether there are any discussions on this >>>> API and related docs. It is awesome that you guys already considered having >>>> DoFn to process asynchronously. Out of curiosity, this API seems to create >>>> a CompletionState out of the input element (probably using framework's >>>> executor) and then allow user to chain on it. To us, it seems more >>>> convenient if the DoFn output a CompletionStage<OutputT> or pass in a >>>> CompletionStage<OutputT> to invoke upon completion. >>>> >>>>>>>>>>> >>>> >>>>>>>>>>> We would like to discuss further on the async API and >>>> hopefully we will have a great support in Beam. Really appreciate the >>>> feedback! >>>> >>>>>>>>>>> >>>> >>>>>>>>>>> Thanks, >>>> >>>>>>>>>>> Xinyu >>>> >>> >>>> >>> >>>> >>> >>>> >>> -- >>>> >>> >>>> >>> >>>> >>> >>>> >>> >>>> >>> Got feedback? tinyurl.com/swegner-feedback >>>> >>> -- Got feedback? tinyurl.com/swegner-feedback