Let me start a separate proposal thread and link this conversation.
Sorry about that.

On Fri, Sep 13, 2019 at 9:31 AM Bharath Kumara Subramanian <
[email protected]> wrote:

> I have put together a design document
> <https://docs.google.com/document/d/1t--UYXgaij0ULEoXUnhG3r8OZPBljN9r_WWlwQJBDrI/edit?usp=sharing>
> that consolidates our discussion in this thread.
> Please let me know your thoughts.
>
> Thanks,
> Bharath
>
>
>
> On Wed, Jan 30, 2019 at 10:18 AM Xinyu Liu <[email protected]> wrote:
>
>> I put the asks and email discussions in this JIRA to track the Async API:
>> https://jira.apache.org/jira/browse/BEAM-6550. Bharath on the
>> SamzaRunner side is willing to take a stab at this. He will come up with
>> some design doc based on our discussions. Will update the thread once it's
>> ready. Really appreciate all the suggestions and feedback here.
>>
>> Thanks,
>> Xinyu
>>
>> On Thu, Jan 24, 2019 at 2:41 PM Robert Bradshaw <[email protected]>
>> wrote:
>>
>>> That's a good point that this "IO" time should be tracked differently.
>>>
>>> For a single level, a wrapper/utility that correctly and completely
>>> (and transparently) implements the "naive" bit I sketched above under
>>> the hood may be sufficient and implementable purely in user-space, and
>>> quite useful.
>>>
>>> On Thu, Jan 24, 2019 at 7:38 PM Scott Wegner <[email protected]> wrote:
>>> >
>>> > Makes sense to me. We should make it easier to write DoFn's in this
>>> pattern that has emerged as common among I/O connectors.
>>> >
>>> > Enabling asynchronous task chaining across a fusion tree is more
>>> complicated but not necessary for this scenario.
>>> >
>>> > On Thu, Jan 24, 2019 at 10:13 AM Steve Niemitz <[email protected]>
>>> wrote:
>>> >>
>>> >> It's also important to note that in many (most?) IO frameworks (gRPC,
>>> finagle, etc), asynchronous IO is typically completely non-blocking, so
>>> there generally won't be a large number of threads waiting for IO to
>>> complete.  (netty uses a small pool of threads for the Event Loop Group for
>>> example).
>>> >>
>>> >> But in general I agree with Reuven, runners should not count threads
>>> in use in other thread pools for IO for the purpose of autoscaling (or most
>>> kinds of accounting).
>>> >>
>>> >> On Thu, Jan 24, 2019 at 12:54 PM Reuven Lax <[email protected]> wrote:
>>> >>>
>>> >>> As Steve said, the main rationale for this is so that asynchronous
>>> IOs (or in general, asynchronous remote calls) call be made. To some degree
>>> this addresses Scott's concern: the asynchronous threads should be, for the
>>> most part, simply waiting for IOs to complete; the reason to do the waiting
>>> asynchronously is so that the main threadpool does not become blocked,
>>> causing the pipeline to become IO bound. A runner like Dataflow should not
>>> be tracking these threads for the purpose of autoscaling, as adding more
>>> workers will (usually) not cause these calls to complete any faster.
>>> >>>
>>> >>> Reuven
>>> >>>
>>> >>> On Thu, Jan 24, 2019 at 7:28 AM Steve Niemitz <[email protected]>
>>> wrote:
>>> >>>>
>>> >>>> I think I agree with a lot of what you said here, I'm just going to
>>> restate my initial use-case to try to make it more clear as well.
>>> >>>>
>>> >>>> From my usage of beam, I feel like the big benefit of async DoFns
>>> would be to allow batched IO to be implemented more simply inside a DoFn.
>>> Even in the Beam SDK itself, there are a lot of IOs that batch up IO
>>> operations in ProcessElement and wait for them to complete in FinishBundle
>>> ([1][2], etc).  From my experience, things like error handling, emitting
>>> outputs as the result of an asynchronous operation completing (in the
>>> correct window, with the correct timestamp, etc) get pretty tricky, and it
>>> would be great for the SDK to provide support natively for it.
>>> >>>>
>>> >>>> It's also probably good to point out that really only DoFns that do
>>> IO should be asynchronous, normal CPU bound DoFns have no reason to be
>>> asynchronous.
>>> >>>>
>>> >>>> A really good example of this is an IO I had written recently for
>>> Bigtable, it takes an input PCollection of ByteStrings representing row
>>> keys, and returns a PCollection of the row data from bigtable.  Naively
>>> this could be implemented by simply blocking on the Bigtable read inside
>>> the ParDo, however this would limit throughput substantially (even assuming
>>> an avg read latency is 1ms, thats still only 1000 QPS / instance of the
>>> ParDo).  My implementation batches many reads together (as they arrive at
>>> the DoFn), executes them once the batch is big enough (or some time
>>> passes), and then emits them once the batch read completes.  Emitting them
>>> in the correct window and handling errors gets tricky, so this is certainly
>>> something I'd love the framework itself to handle.
>>> >>>>
>>> >>>> I also don't see a big benefit of making a DoFn receive a future,
>>> if all a user is ever supposed to do is attach a continuation to it, that
>>> could just as easily be done by the runner itself, basically just invoking
>>> the entire ParDo as a continuation on the future (which then assumes the
>>> runner is even representing these tasks as futures internally).
>>> >>>>
>>> >>>> Making the DoFn itself actually return a future could be an option,
>>> even if the language itself doesn't support something like `await`, you
>>> could still implement it yourself in the DoFn, however, it seems like it'd
>>> be a strange contrast to the non-async version, which returns void.
>>> >>>>
>>> >>>> [1]
>>> https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L720
>>> >>>> [2]
>>> https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L1080
>>> >>>>
>>> >>>>
>>> >>>> On Thu, Jan 24, 2019 at 8:43 AM Robert Bradshaw <
>>> [email protected]> wrote:
>>> >>>>>
>>> >>>>> If I understand correctly, the end goal is to process input
>>> elements
>>> >>>>> of a DoFn asynchronously. Were I to do this naively, I would
>>> implement
>>> >>>>> DoFns that simply take and receive [Serializable?]CompletionStages
>>> as
>>> >>>>> element types, followed by a DoFn that adds a callback to emit on
>>> >>>>> completion (possibly via a queue to avoid being-on-the-wrong-thread
>>> >>>>> issues) and whose finalize forces all completions. This would, of
>>> >>>>> course, interact poorly with processing time tracking, fusion
>>> breaks,
>>> >>>>> watermark tracking, counter attribution, window propagation, etc.
>>> so
>>> >>>>> it is desirable to make it part of the system itself.
>>> >>>>>
>>> >>>>> Taking a OutputReceiver<CompletionStage<OutputT>> seems like a
>>> decent
>>> >>>>> API. The invoking of the downstream process could be chained onto
>>> >>>>> this, with all the implicit tracking and tracing set up correctly.
>>> >>>>> Taking a CompletionStage as input means a DoFn would not have to
>>> >>>>> create its output CompletionStage ex nihilo and possibly allow for
>>> >>>>> better chaining (depending on the asynchronous APIs used).
>>> >>>>>
>>> >>>>> Even better might be to simply let the invocation of all
>>> >>>>> DoFn.process() methods be asynchronous, but as Java doesn't offer
>>> an
>>> >>>>> await primitive to relinquish control in the middle of a function
>>> body
>>> >>>>> this might be hard.
>>> >>>>>
>>> >>>>> I think for correctness, completion would have to be forced at the
>>> end
>>> >>>>> of each bundle. If your bundles are large enough, this may not be
>>> that
>>> >>>>> big of a deal. In this case you could also start executing
>>> subsequent
>>> >>>>> bundles while waiting for prior ones to complete.
>>> >>>>>
>>> >>>>>
>>> >>>>>
>>> >>>>>
>>> >>>>> On Wed, Jan 23, 2019 at 11:58 PM Bharath Kumara Subramanian
>>> >>>>> <[email protected]> wrote:
>>> >>>>> >>
>>> >>>>> >> I'd love to see something like this as well.  Also +1 to
>>> process(@Element InputT element, @Output
>>> OutputReceiver<CompletionStage<OutputT>>). I don't know if there's much
>>> benefit to passing a future in, since the framework itself could hook up
>>> the process function to complete when the future completes.
>>> >>>>> >
>>> >>>>> >
>>> >>>>> > One benefit we get by wrapping the input with CompletionStage is
>>> to mandate[1] users to chain their processing logic to the input future;
>>> thereby, ensuring asynchrony for the most part. However, it is still
>>> possible for users to go out of their way and write blocking code.
>>> >>>>> >
>>> >>>>> > Although, I am not sure how counter intuitive it is for the
>>> runners to wrap the input element into a future before passing it to the
>>> user code.
>>> >>>>> >
>>> >>>>> > Bharath
>>> >>>>> >
>>> >>>>> > [1] CompletionStage interface does not define methods for
>>> initially creating, forcibly completing normally or exceptionally, probing
>>> completion status or results, or awaiting completion of a stage.
>>> Implementations of CompletionStage may provide means of achieving such
>>> effects, as appropriate
>>> >>>>> >
>>> >>>>> >
>>> >>>>> > On Wed, Jan 23, 2019 at 11:31 AM Kenneth Knowles <
>>> [email protected]> wrote:
>>> >>>>> >>
>>> >>>>> >> I think your concerns are valid but i want to clarify about
>>> "first class async APIs". Does "first class" mean that it is a
>>> well-encapsulated abstraction? or does it mean that the user can more or
>>> less do whatever they want? These are opposite but both valid meanings for
>>> "first class", to me.
>>> >>>>> >>
>>> >>>>> >> I would not want to encourage users to do explicit
>>> multi-threaded programming or control parallelism. Part of the point of
>>> Beam is to gain big data parallelism without explicit multithreading. I see
>>> asynchronous chaining of futures (or their best-approximation in your
>>> language of choice) as a highly disciplined way of doing asynchronous
>>> dependency-driven computation that is nonetheless conceptually, and
>>> readably, straight-line code. Threads are not required nor the only way to
>>> execute this code. In fact you might often want to execute without
>>> threading for a reference implementation to provide canonically correct
>>> results. APIs that leak lower-level details of threads are asking for
>>> trouble.
>>> >>>>> >>
>>> >>>>> >> One of our other ideas was to provide a dynamic parameter of
>>> type ExecutorService. The SDK harness (pre-portability: the runner) would
>>> control and observe parallelism while the user could simply register tasks.
>>> Providing a future/promise API is even more disciplined.
>>> >>>>> >>
>>> >>>>> >> Kenn
>>> >>>>> >>
>>> >>>>> >> On Wed, Jan 23, 2019 at 10:35 AM Scott Wegner <[email protected]>
>>> wrote:
>>> >>>>> >>>
>>> >>>>> >>> A related question is how to make execution observable such
>>> that a runner can make proper scaling decisions. Runners decide how to
>>> schedule bundles within and across multiple worker instances, and can use
>>> information about execution to make dynamic scaling decisions. First-class
>>> async APIs seem like they would encourage DoFn authors to implement their
>>> own parallelization, rather than deferring to the runner that should be
>>> more capable of providing the right level of parallelism.
>>> >>>>> >>>
>>> >>>>> >>> In the Dataflow worker harness, we estimate execution time to
>>> PTransform steps by sampling execution time on the execution thread and
>>> attributing it to the currently invoked method. This approach is fairly
>>> simple and possible because we assume that execution happens within the
>>> thread controlled by the runner. Some DoFn's already implement their own
>>> async logic and break this assumption; I would expect more if we make async
>>> built into the DoFn APIs.
>>> >>>>> >>>
>>> >>>>> >>> So: this isn't an argument against async APIs, but rather:
>>> does this break execution observability, and are there other lightweight
>>> mechanisms for attributing execution time of async work?
>>> >>>>> >>>
>>> >>>>> >>> On Tue, Jan 22, 2019 at 7:08 PM Kenneth Knowles <
>>> [email protected]> wrote:
>>> >>>>> >>>>
>>> >>>>> >>>> When executed over the portable APIs, it will be primarily
>>> the Java SDK harness that makes all of these decisions. If we wanted
>>> runners to have some insight into it we would have to add it to the Beam
>>> model protos. I don't have any suggestions there, so I would leave it out
>>> of this discussion until there's good ideas. We could learn a lot by trying
>>> it out just in the SDK harness.
>>> >>>>> >>>>
>>> >>>>> >>>> Kenn
>>> >>>>> >>>>
>>> >>>>> >>>> On Tue, Jan 22, 2019 at 6:12 PM Xinyu Liu <
>>> [email protected]> wrote:
>>> >>>>> >>>>>
>>> >>>>> >>>>> I don't have a strong opinion on the resolution of the
>>> futures regarding to @FinishBundle invocation. Leaving it to be unspecified
>>> does give runners more room to implement it with their own support.
>>> >>>>> >>>>>
>>> >>>>> >>>>> Optimization is also another great point. Fuse seems pretty
>>> complex to me too if we need to find a way to chain the resulting future
>>> into the next transform, or leave the async transform as a standalone stage
>>> initially?
>>> >>>>> >>>>>
>>> >>>>> >>>>> Btw, I was counting the number of replies before we hit the
>>> portability. Seems after 4 replies fuse finally showed up :).
>>> >>>>> >>>>>
>>> >>>>> >>>>> Thanks,
>>> >>>>> >>>>> Xinyu
>>> >>>>> >>>>>
>>> >>>>> >>>>>
>>> >>>>> >>>>> On Tue, Jan 22, 2019 at 5:42 PM Kenneth Knowles <
>>> [email protected]> wrote:
>>> >>>>> >>>>>>
>>> >>>>> >>>>>>
>>> >>>>> >>>>>>
>>> >>>>> >>>>>> On Tue, Jan 22, 2019, 17:23 Reuven Lax <[email protected]
>>> wrote:
>>> >>>>> >>>>>>>
>>> >>>>> >>>>>>>
>>> >>>>> >>>>>>>
>>> >>>>> >>>>>>> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu <
>>> [email protected]> wrote:
>>> >>>>> >>>>>>>>
>>> >>>>> >>>>>>>> @Steve: it's good to see that this is going to be useful
>>> in your use cases as well. Thanks for sharing the code from Scio! I can see
>>> in your implementation that waiting for the future completion is part of
>>> the @FinishBundle. We are thinking of taking advantage of the underlying
>>> runner async support so the user-level code won't need to implement this
>>> logic, e.g. Samza has an AsyncSteamTask api that provides a callback to
>>> invoke after future completion[1], and Flink also has AsyncFunction api [2]
>>> which provides a ResultFuture similar to the API we discussed.
>>> >>>>> >>>>>>>
>>> >>>>> >>>>>>>
>>> >>>>> >>>>>>> Can this be done correctly? What I mean is that if the
>>> process dies, can you guarantee that no data is lost? Beam currently
>>> guarantees this for FinishBundle, but if you use an arbitrary async
>>> framework this might not be true.
>>> >>>>> >>>>>>
>>> >>>>> >>>>>>
>>> >>>>> >>>>>> What a Beam runner guarantees is that *if* the bundle is
>>> committed, *then* finishbundle has run. So it seems just as easy to say
>>> *if* a bundle is committed, *then* every async result has been resolved.
>>> >>>>> >>>>>>
>>> >>>>> >>>>>> If the process dies the two cases should be naturally
>>> analogous.
>>> >>>>> >>>>>>
>>> >>>>> >>>>>> But it raises the question of whether they should be
>>> resolved prior to finishbundle, after, or unspecified. I lean toward
>>> unspecified.
>>> >>>>> >>>>>>
>>> >>>>> >>>>>> That's for a single ParDo. Where this could get complex is
>>> optimizing fused stages for greater asynchrony.
>>> >>>>> >>>>>>
>>> >>>>> >>>>>> Kenn
>>> >>>>> >>>>>>
>>> >>>>> >>>>>>>
>>> >>>>> >>>>>>>>
>>> >>>>> >>>>>>>> A simple use case for this is to execute a Runnable
>>> asynchronously in user's own executor. The following code illustrates
>>> Kenn's option #2, with a very simple single-thread pool being the executor:
>>> >>>>> >>>>>>>>
>>> >>>>> >>>>>>>> new DoFn<InputT, OutputT>() {
>>> >>>>> >>>>>>>>   @ProcessElement
>>> >>>>> >>>>>>>>   public void process(@Element InputT element, @Output
>>> OutputReceiver<CompletionStage<OutputT>> outputReceiver) {
>>> >>>>> >>>>>>>>     CompletableFuture<OutputT> future =
>>> CompletableFuture.supplyAsync(
>>> >>>>> >>>>>>>>         () -> someOutput,
>>> >>>>> >>>>>>>>         Executors.newSingleThreadExecutor());
>>> >>>>> >>>>>>>>     outputReceiver.output(future);
>>> >>>>> >>>>>>>>   }
>>> >>>>> >>>>>>>> }
>>> >>>>> >>>>>>>>
>>> >>>>> >>>>>>>> The neat thing about this API is that the user can choose
>>> their own async framework and we only expect the output to be a
>>> CompletionStage.
>>> >>>>> >>>>>>>>
>>> >>>>> >>>>>>>>
>>> >>>>> >>>>>>>> For the implementation of bundling, can we compose a
>>> CompletableFuture from each element in the bundle, e.g.
>>> CompletableFuture.allOf(...), and then invoke @FinishBundle when this
>>> future is complete? Seems this might work.
>>> >>>>> >>>>>>>>
>>> >>>>> >>>>>>>> Thanks,
>>> >>>>> >>>>>>>> Xinyu
>>> >>>>> >>>>>>>>
>>> >>>>> >>>>>>>>
>>> >>>>> >>>>>>>> [1]
>>> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
>>> >>>>> >>>>>>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
>>> >>>>> >>>>>>>>
>>> >>>>> >>>>>>>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz <
>>> [email protected]> wrote:
>>> >>>>> >>>>>>>>>
>>> >>>>> >>>>>>>>> I'd love to see something like this as well.  Also +1 to
>>> process(@Element InputT element, @Output
>>> OutputReceiver<CompletionStage<OutputT>>).  I don't know if there's much
>>> benefit to passing a future in, since the framework itself could hook up
>>> the process function to complete when the future completes.
>>> >>>>> >>>>>>>>>
>>> >>>>> >>>>>>>>> I feel like I've spent a bunch of time writing very
>>> similar "kick off a future in ProcessElement, join it in FinishBundle"
>>> code, and looking around beam itself a lot of built-in transforms do it as
>>> well.  Scio provides a few AsyncDoFn implementations [1] but it'd be great
>>> to see this as a first-class concept in beam itself.  Doing error handling,
>>> concurrency, etc correctly can be tricky.
>>> >>>>> >>>>>>>>>
>>> >>>>> >>>>>>>>> [1]
>>> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
>>> >>>>> >>>>>>>>>
>>> >>>>> >>>>>>>>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles <
>>> [email protected]> wrote:
>>> >>>>> >>>>>>>>>>
>>> >>>>> >>>>>>>>>> If the input is a CompletionStage<InputT> then the
>>> output should also be a CompletionStage<OutputT>, since all you should do
>>> is async chaining. We could enforce this by giving the DoFn an
>>> OutputReceiver(CompletionStage<OutputT>).
>>> >>>>> >>>>>>>>>>
>>> >>>>> >>>>>>>>>> Another possibility that might be even more robust
>>> against poor future use could be process(@Element InputT element, @Output
>>> OutputReceiver<CompletionStage<OutputT>>). In this way, the process method
>>> itself will be async chained, rather than counting on the user to do the
>>> right thing.
>>> >>>>> >>>>>>>>>>
>>> >>>>> >>>>>>>>>> We should see how these look in real use cases. The way
>>> that processing is split between @ProcessElement and @FinishBundle might
>>> complicate things.
>>> >>>>> >>>>>>>>>>
>>> >>>>> >>>>>>>>>> Kenn
>>> >>>>> >>>>>>>>>>
>>> >>>>> >>>>>>>>>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu <
>>> [email protected]> wrote:
>>> >>>>> >>>>>>>>>>>
>>> >>>>> >>>>>>>>>>> Hi, guys,
>>> >>>>> >>>>>>>>>>>
>>> >>>>> >>>>>>>>>>> As more users try out Beam running on the SamzaRunner,
>>> we got a lot of asks for an asynchronous processing API. There are a few
>>> reasons for these asks:
>>> >>>>> >>>>>>>>>>>
>>> >>>>> >>>>>>>>>>> The users here are experienced in asynchronous
>>> programming. With async frameworks such as Netty and ParSeq and libs like
>>> async jersey client, they are able to make remote calls efficiently and the
>>> libraries help manage the execution threads underneath. Async remote calls
>>> are very common in most of our streaming applications today.
>>> >>>>> >>>>>>>>>>> Many jobs are running on a multi-tenancy cluster.
>>> Async processing helps for less resource usage and fast computation (less
>>> context switch).
>>> >>>>> >>>>>>>>>>>
>>> >>>>> >>>>>>>>>>> I asked about the async support in a previous email
>>> thread. The following API was mentioned in the reply:
>>> >>>>> >>>>>>>>>>>
>>> >>>>> >>>>>>>>>>>   new DoFn<InputT, OutputT>() {
>>> >>>>> >>>>>>>>>>>     @ProcessElement
>>> >>>>> >>>>>>>>>>>     public void process(@Element
>>> CompletionStage<InputT> element, ...) {
>>> >>>>> >>>>>>>>>>>       element.thenApply(...)
>>> >>>>> >>>>>>>>>>>     }
>>> >>>>> >>>>>>>>>>>   }
>>> >>>>> >>>>>>>>>>>
>>> >>>>> >>>>>>>>>>> We are wondering whether there are any discussions on
>>> this API and related docs. It is awesome that you guys already considered
>>> having DoFn to process asynchronously. Out of curiosity, this API seems to
>>> create a CompletionState out of the input element (probably using
>>> framework's executor) and then allow user to chain on it. To us, it seems
>>> more convenient if the DoFn output a CompletionStage<OutputT> or pass in a
>>> CompletionStage<OutputT> to invoke upon completion.
>>> >>>>> >>>>>>>>>>>
>>> >>>>> >>>>>>>>>>> We would like to discuss further on the async API and
>>> hopefully we will have a great support in Beam. Really appreciate the
>>> feedback!
>>> >>>>> >>>>>>>>>>>
>>> >>>>> >>>>>>>>>>> Thanks,
>>> >>>>> >>>>>>>>>>> Xinyu
>>> >>>>> >>>
>>> >>>>> >>>
>>> >>>>> >>>
>>> >>>>> >>> --
>>> >>>>> >>>
>>> >>>>> >>>
>>> >>>>> >>>
>>> >>>>> >>>
>>> >>>>> >>> Got feedback? tinyurl.com/swegner-feedback
>>> >
>>> >
>>> >
>>> > --
>>> >
>>> >
>>> >
>>> >
>>> > Got feedback? tinyurl.com/swegner-feedback
>>>
>>

Reply via email to