I have put together a design document
<https://docs.google.com/document/d/1t--UYXgaij0ULEoXUnhG3r8OZPBljN9r_WWlwQJBDrI/edit?usp=sharing>
that consolidates our discussion in this thread.
Please let me know your thoughts.

Thanks,
Bharath



On Wed, Jan 30, 2019 at 10:18 AM Xinyu Liu <[email protected]> wrote:

> I put the asks and email discussions in this JIRA to track the Async API:
> https://jira.apache.org/jira/browse/BEAM-6550. Bharath on the SamzaRunner
> side is willing to take a stab at this. He will come up with some design
> doc based on our discussions. Will update the thread once it's ready.
> Really appreciate all the suggestions and feedback here.
>
> Thanks,
> Xinyu
>
> On Thu, Jan 24, 2019 at 2:41 PM Robert Bradshaw <[email protected]>
> wrote:
>
>> That's a good point that this "IO" time should be tracked differently.
>>
>> For a single level, a wrapper/utility that correctly and completely
>> (and transparently) implements the "naive" bit I sketched above under
>> the hood may be sufficient and implementable purely in user-space, and
>> quite useful.
>>
>> On Thu, Jan 24, 2019 at 7:38 PM Scott Wegner <[email protected]> wrote:
>> >
>> > Makes sense to me. We should make it easier to write DoFn's in this
>> pattern that has emerged as common among I/O connectors.
>> >
>> > Enabling asynchronous task chaining across a fusion tree is more
>> complicated but not necessary for this scenario.
>> >
>> > On Thu, Jan 24, 2019 at 10:13 AM Steve Niemitz <[email protected]>
>> wrote:
>> >>
>> >> It's also important to note that in many (most?) IO frameworks (gRPC,
>> finagle, etc), asynchronous IO is typically completely non-blocking, so
>> there generally won't be a large number of threads waiting for IO to
>> complete.  (netty uses a small pool of threads for the Event Loop Group for
>> example).
>> >>
>> >> But in general I agree with Reuven, runners should not count threads
>> in use in other thread pools for IO for the purpose of autoscaling (or most
>> kinds of accounting).
>> >>
>> >> On Thu, Jan 24, 2019 at 12:54 PM Reuven Lax <[email protected]> wrote:
>> >>>
>> >>> As Steve said, the main rationale for this is so that asynchronous
>> IOs (or in general, asynchronous remote calls) call be made. To some degree
>> this addresses Scott's concern: the asynchronous threads should be, for the
>> most part, simply waiting for IOs to complete; the reason to do the waiting
>> asynchronously is so that the main threadpool does not become blocked,
>> causing the pipeline to become IO bound. A runner like Dataflow should not
>> be tracking these threads for the purpose of autoscaling, as adding more
>> workers will (usually) not cause these calls to complete any faster.
>> >>>
>> >>> Reuven
>> >>>
>> >>> On Thu, Jan 24, 2019 at 7:28 AM Steve Niemitz <[email protected]>
>> wrote:
>> >>>>
>> >>>> I think I agree with a lot of what you said here, I'm just going to
>> restate my initial use-case to try to make it more clear as well.
>> >>>>
>> >>>> From my usage of beam, I feel like the big benefit of async DoFns
>> would be to allow batched IO to be implemented more simply inside a DoFn.
>> Even in the Beam SDK itself, there are a lot of IOs that batch up IO
>> operations in ProcessElement and wait for them to complete in FinishBundle
>> ([1][2], etc).  From my experience, things like error handling, emitting
>> outputs as the result of an asynchronous operation completing (in the
>> correct window, with the correct timestamp, etc) get pretty tricky, and it
>> would be great for the SDK to provide support natively for it.
>> >>>>
>> >>>> It's also probably good to point out that really only DoFns that do
>> IO should be asynchronous, normal CPU bound DoFns have no reason to be
>> asynchronous.
>> >>>>
>> >>>> A really good example of this is an IO I had written recently for
>> Bigtable, it takes an input PCollection of ByteStrings representing row
>> keys, and returns a PCollection of the row data from bigtable.  Naively
>> this could be implemented by simply blocking on the Bigtable read inside
>> the ParDo, however this would limit throughput substantially (even assuming
>> an avg read latency is 1ms, thats still only 1000 QPS / instance of the
>> ParDo).  My implementation batches many reads together (as they arrive at
>> the DoFn), executes them once the batch is big enough (or some time
>> passes), and then emits them once the batch read completes.  Emitting them
>> in the correct window and handling errors gets tricky, so this is certainly
>> something I'd love the framework itself to handle.
>> >>>>
>> >>>> I also don't see a big benefit of making a DoFn receive a future, if
>> all a user is ever supposed to do is attach a continuation to it, that
>> could just as easily be done by the runner itself, basically just invoking
>> the entire ParDo as a continuation on the future (which then assumes the
>> runner is even representing these tasks as futures internally).
>> >>>>
>> >>>> Making the DoFn itself actually return a future could be an option,
>> even if the language itself doesn't support something like `await`, you
>> could still implement it yourself in the DoFn, however, it seems like it'd
>> be a strange contrast to the non-async version, which returns void.
>> >>>>
>> >>>> [1]
>> https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L720
>> >>>> [2]
>> https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L1080
>> >>>>
>> >>>>
>> >>>> On Thu, Jan 24, 2019 at 8:43 AM Robert Bradshaw <[email protected]>
>> wrote:
>> >>>>>
>> >>>>> If I understand correctly, the end goal is to process input elements
>> >>>>> of a DoFn asynchronously. Were I to do this naively, I would
>> implement
>> >>>>> DoFns that simply take and receive [Serializable?]CompletionStages
>> as
>> >>>>> element types, followed by a DoFn that adds a callback to emit on
>> >>>>> completion (possibly via a queue to avoid being-on-the-wrong-thread
>> >>>>> issues) and whose finalize forces all completions. This would, of
>> >>>>> course, interact poorly with processing time tracking, fusion
>> breaks,
>> >>>>> watermark tracking, counter attribution, window propagation, etc. so
>> >>>>> it is desirable to make it part of the system itself.
>> >>>>>
>> >>>>> Taking a OutputReceiver<CompletionStage<OutputT>> seems like a
>> decent
>> >>>>> API. The invoking of the downstream process could be chained onto
>> >>>>> this, with all the implicit tracking and tracing set up correctly.
>> >>>>> Taking a CompletionStage as input means a DoFn would not have to
>> >>>>> create its output CompletionStage ex nihilo and possibly allow for
>> >>>>> better chaining (depending on the asynchronous APIs used).
>> >>>>>
>> >>>>> Even better might be to simply let the invocation of all
>> >>>>> DoFn.process() methods be asynchronous, but as Java doesn't offer an
>> >>>>> await primitive to relinquish control in the middle of a function
>> body
>> >>>>> this might be hard.
>> >>>>>
>> >>>>> I think for correctness, completion would have to be forced at the
>> end
>> >>>>> of each bundle. If your bundles are large enough, this may not be
>> that
>> >>>>> big of a deal. In this case you could also start executing
>> subsequent
>> >>>>> bundles while waiting for prior ones to complete.
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>> On Wed, Jan 23, 2019 at 11:58 PM Bharath Kumara Subramanian
>> >>>>> <[email protected]> wrote:
>> >>>>> >>
>> >>>>> >> I'd love to see something like this as well.  Also +1 to
>> process(@Element InputT element, @Output
>> OutputReceiver<CompletionStage<OutputT>>). I don't know if there's much
>> benefit to passing a future in, since the framework itself could hook up
>> the process function to complete when the future completes.
>> >>>>> >
>> >>>>> >
>> >>>>> > One benefit we get by wrapping the input with CompletionStage is
>> to mandate[1] users to chain their processing logic to the input future;
>> thereby, ensuring asynchrony for the most part. However, it is still
>> possible for users to go out of their way and write blocking code.
>> >>>>> >
>> >>>>> > Although, I am not sure how counter intuitive it is for the
>> runners to wrap the input element into a future before passing it to the
>> user code.
>> >>>>> >
>> >>>>> > Bharath
>> >>>>> >
>> >>>>> > [1] CompletionStage interface does not define methods for
>> initially creating, forcibly completing normally or exceptionally, probing
>> completion status or results, or awaiting completion of a stage.
>> Implementations of CompletionStage may provide means of achieving such
>> effects, as appropriate
>> >>>>> >
>> >>>>> >
>> >>>>> > On Wed, Jan 23, 2019 at 11:31 AM Kenneth Knowles <[email protected]>
>> wrote:
>> >>>>> >>
>> >>>>> >> I think your concerns are valid but i want to clarify about
>> "first class async APIs". Does "first class" mean that it is a
>> well-encapsulated abstraction? or does it mean that the user can more or
>> less do whatever they want? These are opposite but both valid meanings for
>> "first class", to me.
>> >>>>> >>
>> >>>>> >> I would not want to encourage users to do explicit
>> multi-threaded programming or control parallelism. Part of the point of
>> Beam is to gain big data parallelism without explicit multithreading. I see
>> asynchronous chaining of futures (or their best-approximation in your
>> language of choice) as a highly disciplined way of doing asynchronous
>> dependency-driven computation that is nonetheless conceptually, and
>> readably, straight-line code. Threads are not required nor the only way to
>> execute this code. In fact you might often want to execute without
>> threading for a reference implementation to provide canonically correct
>> results. APIs that leak lower-level details of threads are asking for
>> trouble.
>> >>>>> >>
>> >>>>> >> One of our other ideas was to provide a dynamic parameter of
>> type ExecutorService. The SDK harness (pre-portability: the runner) would
>> control and observe parallelism while the user could simply register tasks.
>> Providing a future/promise API is even more disciplined.
>> >>>>> >>
>> >>>>> >> Kenn
>> >>>>> >>
>> >>>>> >> On Wed, Jan 23, 2019 at 10:35 AM Scott Wegner <[email protected]>
>> wrote:
>> >>>>> >>>
>> >>>>> >>> A related question is how to make execution observable such
>> that a runner can make proper scaling decisions. Runners decide how to
>> schedule bundles within and across multiple worker instances, and can use
>> information about execution to make dynamic scaling decisions. First-class
>> async APIs seem like they would encourage DoFn authors to implement their
>> own parallelization, rather than deferring to the runner that should be
>> more capable of providing the right level of parallelism.
>> >>>>> >>>
>> >>>>> >>> In the Dataflow worker harness, we estimate execution time to
>> PTransform steps by sampling execution time on the execution thread and
>> attributing it to the currently invoked method. This approach is fairly
>> simple and possible because we assume that execution happens within the
>> thread controlled by the runner. Some DoFn's already implement their own
>> async logic and break this assumption; I would expect more if we make async
>> built into the DoFn APIs.
>> >>>>> >>>
>> >>>>> >>> So: this isn't an argument against async APIs, but rather: does
>> this break execution observability, and are there other lightweight
>> mechanisms for attributing execution time of async work?
>> >>>>> >>>
>> >>>>> >>> On Tue, Jan 22, 2019 at 7:08 PM Kenneth Knowles <[email protected]>
>> wrote:
>> >>>>> >>>>
>> >>>>> >>>> When executed over the portable APIs, it will be primarily the
>> Java SDK harness that makes all of these decisions. If we wanted runners to
>> have some insight into it we would have to add it to the Beam model protos.
>> I don't have any suggestions there, so I would leave it out of this
>> discussion until there's good ideas. We could learn a lot by trying it out
>> just in the SDK harness.
>> >>>>> >>>>
>> >>>>> >>>> Kenn
>> >>>>> >>>>
>> >>>>> >>>> On Tue, Jan 22, 2019 at 6:12 PM Xinyu Liu <
>> [email protected]> wrote:
>> >>>>> >>>>>
>> >>>>> >>>>> I don't have a strong opinion on the resolution of the
>> futures regarding to @FinishBundle invocation. Leaving it to be unspecified
>> does give runners more room to implement it with their own support.
>> >>>>> >>>>>
>> >>>>> >>>>> Optimization is also another great point. Fuse seems pretty
>> complex to me too if we need to find a way to chain the resulting future
>> into the next transform, or leave the async transform as a standalone stage
>> initially?
>> >>>>> >>>>>
>> >>>>> >>>>> Btw, I was counting the number of replies before we hit the
>> portability. Seems after 4 replies fuse finally showed up :).
>> >>>>> >>>>>
>> >>>>> >>>>> Thanks,
>> >>>>> >>>>> Xinyu
>> >>>>> >>>>>
>> >>>>> >>>>>
>> >>>>> >>>>> On Tue, Jan 22, 2019 at 5:42 PM Kenneth Knowles <
>> [email protected]> wrote:
>> >>>>> >>>>>>
>> >>>>> >>>>>>
>> >>>>> >>>>>>
>> >>>>> >>>>>> On Tue, Jan 22, 2019, 17:23 Reuven Lax <[email protected]
>> wrote:
>> >>>>> >>>>>>>
>> >>>>> >>>>>>>
>> >>>>> >>>>>>>
>> >>>>> >>>>>>> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu <
>> [email protected]> wrote:
>> >>>>> >>>>>>>>
>> >>>>> >>>>>>>> @Steve: it's good to see that this is going to be useful
>> in your use cases as well. Thanks for sharing the code from Scio! I can see
>> in your implementation that waiting for the future completion is part of
>> the @FinishBundle. We are thinking of taking advantage of the underlying
>> runner async support so the user-level code won't need to implement this
>> logic, e.g. Samza has an AsyncSteamTask api that provides a callback to
>> invoke after future completion[1], and Flink also has AsyncFunction api [2]
>> which provides a ResultFuture similar to the API we discussed.
>> >>>>> >>>>>>>
>> >>>>> >>>>>>>
>> >>>>> >>>>>>> Can this be done correctly? What I mean is that if the
>> process dies, can you guarantee that no data is lost? Beam currently
>> guarantees this for FinishBundle, but if you use an arbitrary async
>> framework this might not be true.
>> >>>>> >>>>>>
>> >>>>> >>>>>>
>> >>>>> >>>>>> What a Beam runner guarantees is that *if* the bundle is
>> committed, *then* finishbundle has run. So it seems just as easy to say
>> *if* a bundle is committed, *then* every async result has been resolved.
>> >>>>> >>>>>>
>> >>>>> >>>>>> If the process dies the two cases should be naturally
>> analogous.
>> >>>>> >>>>>>
>> >>>>> >>>>>> But it raises the question of whether they should be
>> resolved prior to finishbundle, after, or unspecified. I lean toward
>> unspecified.
>> >>>>> >>>>>>
>> >>>>> >>>>>> That's for a single ParDo. Where this could get complex is
>> optimizing fused stages for greater asynchrony.
>> >>>>> >>>>>>
>> >>>>> >>>>>> Kenn
>> >>>>> >>>>>>
>> >>>>> >>>>>>>
>> >>>>> >>>>>>>>
>> >>>>> >>>>>>>> A simple use case for this is to execute a Runnable
>> asynchronously in user's own executor. The following code illustrates
>> Kenn's option #2, with a very simple single-thread pool being the executor:
>> >>>>> >>>>>>>>
>> >>>>> >>>>>>>> new DoFn<InputT, OutputT>() {
>> >>>>> >>>>>>>>   @ProcessElement
>> >>>>> >>>>>>>>   public void process(@Element InputT element, @Output
>> OutputReceiver<CompletionStage<OutputT>> outputReceiver) {
>> >>>>> >>>>>>>>     CompletableFuture<OutputT> future =
>> CompletableFuture.supplyAsync(
>> >>>>> >>>>>>>>         () -> someOutput,
>> >>>>> >>>>>>>>         Executors.newSingleThreadExecutor());
>> >>>>> >>>>>>>>     outputReceiver.output(future);
>> >>>>> >>>>>>>>   }
>> >>>>> >>>>>>>> }
>> >>>>> >>>>>>>>
>> >>>>> >>>>>>>> The neat thing about this API is that the user can choose
>> their own async framework and we only expect the output to be a
>> CompletionStage.
>> >>>>> >>>>>>>>
>> >>>>> >>>>>>>>
>> >>>>> >>>>>>>> For the implementation of bundling, can we compose a
>> CompletableFuture from each element in the bundle, e.g.
>> CompletableFuture.allOf(...), and then invoke @FinishBundle when this
>> future is complete? Seems this might work.
>> >>>>> >>>>>>>>
>> >>>>> >>>>>>>> Thanks,
>> >>>>> >>>>>>>> Xinyu
>> >>>>> >>>>>>>>
>> >>>>> >>>>>>>>
>> >>>>> >>>>>>>> [1]
>> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
>> >>>>> >>>>>>>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
>> >>>>> >>>>>>>>
>> >>>>> >>>>>>>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz <
>> [email protected]> wrote:
>> >>>>> >>>>>>>>>
>> >>>>> >>>>>>>>> I'd love to see something like this as well.  Also +1 to
>> process(@Element InputT element, @Output
>> OutputReceiver<CompletionStage<OutputT>>).  I don't know if there's much
>> benefit to passing a future in, since the framework itself could hook up
>> the process function to complete when the future completes.
>> >>>>> >>>>>>>>>
>> >>>>> >>>>>>>>> I feel like I've spent a bunch of time writing very
>> similar "kick off a future in ProcessElement, join it in FinishBundle"
>> code, and looking around beam itself a lot of built-in transforms do it as
>> well.  Scio provides a few AsyncDoFn implementations [1] but it'd be great
>> to see this as a first-class concept in beam itself.  Doing error handling,
>> concurrency, etc correctly can be tricky.
>> >>>>> >>>>>>>>>
>> >>>>> >>>>>>>>> [1]
>> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
>> >>>>> >>>>>>>>>
>> >>>>> >>>>>>>>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles <
>> [email protected]> wrote:
>> >>>>> >>>>>>>>>>
>> >>>>> >>>>>>>>>> If the input is a CompletionStage<InputT> then the
>> output should also be a CompletionStage<OutputT>, since all you should do
>> is async chaining. We could enforce this by giving the DoFn an
>> OutputReceiver(CompletionStage<OutputT>).
>> >>>>> >>>>>>>>>>
>> >>>>> >>>>>>>>>> Another possibility that might be even more robust
>> against poor future use could be process(@Element InputT element, @Output
>> OutputReceiver<CompletionStage<OutputT>>). In this way, the process method
>> itself will be async chained, rather than counting on the user to do the
>> right thing.
>> >>>>> >>>>>>>>>>
>> >>>>> >>>>>>>>>> We should see how these look in real use cases. The way
>> that processing is split between @ProcessElement and @FinishBundle might
>> complicate things.
>> >>>>> >>>>>>>>>>
>> >>>>> >>>>>>>>>> Kenn
>> >>>>> >>>>>>>>>>
>> >>>>> >>>>>>>>>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu <
>> [email protected]> wrote:
>> >>>>> >>>>>>>>>>>
>> >>>>> >>>>>>>>>>> Hi, guys,
>> >>>>> >>>>>>>>>>>
>> >>>>> >>>>>>>>>>> As more users try out Beam running on the SamzaRunner,
>> we got a lot of asks for an asynchronous processing API. There are a few
>> reasons for these asks:
>> >>>>> >>>>>>>>>>>
>> >>>>> >>>>>>>>>>> The users here are experienced in asynchronous
>> programming. With async frameworks such as Netty and ParSeq and libs like
>> async jersey client, they are able to make remote calls efficiently and the
>> libraries help manage the execution threads underneath. Async remote calls
>> are very common in most of our streaming applications today.
>> >>>>> >>>>>>>>>>> Many jobs are running on a multi-tenancy cluster. Async
>> processing helps for less resource usage and fast computation (less context
>> switch).
>> >>>>> >>>>>>>>>>>
>> >>>>> >>>>>>>>>>> I asked about the async support in a previous email
>> thread. The following API was mentioned in the reply:
>> >>>>> >>>>>>>>>>>
>> >>>>> >>>>>>>>>>>   new DoFn<InputT, OutputT>() {
>> >>>>> >>>>>>>>>>>     @ProcessElement
>> >>>>> >>>>>>>>>>>     public void process(@Element
>> CompletionStage<InputT> element, ...) {
>> >>>>> >>>>>>>>>>>       element.thenApply(...)
>> >>>>> >>>>>>>>>>>     }
>> >>>>> >>>>>>>>>>>   }
>> >>>>> >>>>>>>>>>>
>> >>>>> >>>>>>>>>>> We are wondering whether there are any discussions on
>> this API and related docs. It is awesome that you guys already considered
>> having DoFn to process asynchronously. Out of curiosity, this API seems to
>> create a CompletionState out of the input element (probably using
>> framework's executor) and then allow user to chain on it. To us, it seems
>> more convenient if the DoFn output a CompletionStage<OutputT> or pass in a
>> CompletionStage<OutputT> to invoke upon completion.
>> >>>>> >>>>>>>>>>>
>> >>>>> >>>>>>>>>>> We would like to discuss further on the async API and
>> hopefully we will have a great support in Beam. Really appreciate the
>> feedback!
>> >>>>> >>>>>>>>>>>
>> >>>>> >>>>>>>>>>> Thanks,
>> >>>>> >>>>>>>>>>> Xinyu
>> >>>>> >>>
>> >>>>> >>>
>> >>>>> >>>
>> >>>>> >>> --
>> >>>>> >>>
>> >>>>> >>>
>> >>>>> >>>
>> >>>>> >>>
>> >>>>> >>> Got feedback? tinyurl.com/swegner-feedback
>> >
>> >
>> >
>> > --
>> >
>> >
>> >
>> >
>> > Got feedback? tinyurl.com/swegner-feedback
>>
>

Reply via email to