Makes sense to me. We should make it easier to write DoFn's in this pattern
that has emerged as common among I/O connectors.

Enabling asynchronous task chaining across a fusion tree is more
complicated but not necessary for this scenario.

On Thu, Jan 24, 2019 at 10:13 AM Steve Niemitz <sniem...@apache.org> wrote:

> It's also important to note that in many (most?) IO frameworks (gRPC,
> finagle, etc), asynchronous IO is typically completely non-blocking, so
> there generally won't be a large number of threads waiting for IO to
> complete.  (netty uses a small pool of threads for the Event Loop Group for
> example).
>
> But in general I agree with Reuven, runners should not count threads in
> use in other thread pools for IO for the purpose of autoscaling (or most
> kinds of accounting).
>
> On Thu, Jan 24, 2019 at 12:54 PM Reuven Lax <re...@google.com> wrote:
>
>> As Steve said, the main rationale for this is so that asynchronous IOs
>> (or in general, asynchronous remote calls) call be made. To some degree
>> this addresses Scott's concern: the asynchronous threads should be, for the
>> most part, simply waiting for IOs to complete; the reason to do the waiting
>> asynchronously is so that the main threadpool does not become blocked,
>> causing the pipeline to become IO bound. A runner like Dataflow should not
>> be tracking these threads for the purpose of autoscaling, as adding more
>> workers will (usually) not cause these calls to complete any faster.
>>
>> Reuven
>>
>> On Thu, Jan 24, 2019 at 7:28 AM Steve Niemitz <sniem...@apache.org>
>> wrote:
>>
>>> I think I agree with a lot of what you said here, I'm just going to
>>> restate my initial use-case to try to make it more clear as well.
>>>
>>> From my usage of beam, I feel like the big benefit of async DoFns would
>>> be to allow batched IO to be implemented more simply inside a DoFn.  Even
>>> in the Beam SDK itself, there are a lot of IOs that batch up IO operations
>>> in ProcessElement and wait for them to complete in FinishBundle ([1][2],
>>> etc).  From my experience, things like error handling, emitting outputs as
>>> the result of an asynchronous operation completing (in the correct window,
>>> with the correct timestamp, etc) get pretty tricky, and it would be great
>>> for the SDK to provide support natively for it.
>>>
>>> It's also probably good to point out that really only DoFns that do IO
>>> should be asynchronous, normal CPU bound DoFns have no reason to be
>>> asynchronous.
>>>
>>> A really good example of this is an IO I had written recently for
>>> Bigtable, it takes an input PCollection of ByteStrings representing row
>>> keys, and returns a PCollection of the row data from bigtable.  Naively
>>> this could be implemented by simply blocking on the Bigtable read inside
>>> the ParDo, however this would limit throughput substantially (even assuming
>>> an avg read latency is 1ms, thats still only 1000 QPS / instance of the
>>> ParDo).  My implementation batches many reads together (as they arrive at
>>> the DoFn), executes them once the batch is big enough (or some time
>>> passes), and then emits them once the batch read completes.  Emitting them
>>> in the correct window and handling errors gets tricky, so this is certainly
>>> something I'd love the framework itself to handle.
>>>
>>> I also don't see a big benefit of making a DoFn receive a future, if all
>>> a user is ever supposed to do is attach a continuation to it, that could
>>> just as easily be done by the runner itself, basically just invoking the
>>> entire ParDo as a continuation on the future (which then assumes the runner
>>> is even representing these tasks as futures internally).
>>>
>>> Making the DoFn itself actually return a future could be an option, even
>>> if the language itself doesn't support something like `await`, you could
>>> still implement it yourself in the DoFn, however, it seems like it'd be a
>>> strange contrast to the non-async version, which returns void.
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L720
>>> [2]
>>> https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L1080
>>>
>>>
>>> On Thu, Jan 24, 2019 at 8:43 AM Robert Bradshaw <rober...@google.com>
>>> wrote:
>>>
>>>> If I understand correctly, the end goal is to process input elements
>>>> of a DoFn asynchronously. Were I to do this naively, I would implement
>>>> DoFns that simply take and receive [Serializable?]CompletionStages as
>>>> element types, followed by a DoFn that adds a callback to emit on
>>>> completion (possibly via a queue to avoid being-on-the-wrong-thread
>>>> issues) and whose finalize forces all completions. This would, of
>>>> course, interact poorly with processing time tracking, fusion breaks,
>>>> watermark tracking, counter attribution, window propagation, etc. so
>>>> it is desirable to make it part of the system itself.
>>>>
>>>> Taking a OutputReceiver<CompletionStage<OutputT>> seems like a decent
>>>> API. The invoking of the downstream process could be chained onto
>>>> this, with all the implicit tracking and tracing set up correctly.
>>>> Taking a CompletionStage as input means a DoFn would not have to
>>>> create its output CompletionStage ex nihilo and possibly allow for
>>>> better chaining (depending on the asynchronous APIs used).
>>>>
>>>> Even better might be to simply let the invocation of all
>>>> DoFn.process() methods be asynchronous, but as Java doesn't offer an
>>>> await primitive to relinquish control in the middle of a function body
>>>> this might be hard.
>>>>
>>>> I think for correctness, completion would have to be forced at the end
>>>> of each bundle. If your bundles are large enough, this may not be that
>>>> big of a deal. In this case you could also start executing subsequent
>>>> bundles while waiting for prior ones to complete.
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Jan 23, 2019 at 11:58 PM Bharath Kumara Subramanian
>>>> <codin.mart...@gmail.com> wrote:
>>>> >>
>>>> >> I'd love to see something like this as well.  Also +1 to
>>>> process(@Element InputT element, @Output
>>>> OutputReceiver<CompletionStage<OutputT>>). I don't know if there's much
>>>> benefit to passing a future in, since the framework itself could hook up
>>>> the process function to complete when the future completes.
>>>> >
>>>> >
>>>> > One benefit we get by wrapping the input with CompletionStage is to
>>>> mandate[1] users to chain their processing logic to the input future;
>>>> thereby, ensuring asynchrony for the most part. However, it is still
>>>> possible for users to go out of their way and write blocking code.
>>>> >
>>>> > Although, I am not sure how counter intuitive it is for the runners
>>>> to wrap the input element into a future before passing it to the user code.
>>>> >
>>>> > Bharath
>>>> >
>>>> > [1] CompletionStage interface does not define methods for initially
>>>> creating, forcibly completing normally or exceptionally, probing completion
>>>> status or results, or awaiting completion of a stage. Implementations of
>>>> CompletionStage may provide means of achieving such effects, as appropriate
>>>> >
>>>> >
>>>> > On Wed, Jan 23, 2019 at 11:31 AM Kenneth Knowles <k...@apache.org>
>>>> wrote:
>>>> >>
>>>> >> I think your concerns are valid but i want to clarify about "first
>>>> class async APIs". Does "first class" mean that it is a well-encapsulated
>>>> abstraction? or does it mean that the user can more or less do whatever
>>>> they want? These are opposite but both valid meanings for "first class", to
>>>> me.
>>>> >>
>>>> >> I would not want to encourage users to do explicit multi-threaded
>>>> programming or control parallelism. Part of the point of Beam is to gain
>>>> big data parallelism without explicit multithreading. I see asynchronous
>>>> chaining of futures (or their best-approximation in your language of
>>>> choice) as a highly disciplined way of doing asynchronous dependency-driven
>>>> computation that is nonetheless conceptually, and readably, straight-line
>>>> code. Threads are not required nor the only way to execute this code. In
>>>> fact you might often want to execute without threading for a reference
>>>> implementation to provide canonically correct results. APIs that leak
>>>> lower-level details of threads are asking for trouble.
>>>> >>
>>>> >> One of our other ideas was to provide a dynamic parameter of type
>>>> ExecutorService. The SDK harness (pre-portability: the runner) would
>>>> control and observe parallelism while the user could simply register tasks.
>>>> Providing a future/promise API is even more disciplined.
>>>> >>
>>>> >> Kenn
>>>> >>
>>>> >> On Wed, Jan 23, 2019 at 10:35 AM Scott Wegner <sc...@apache.org>
>>>> wrote:
>>>> >>>
>>>> >>> A related question is how to make execution observable such that a
>>>> runner can make proper scaling decisions. Runners decide how to schedule
>>>> bundles within and across multiple worker instances, and can use
>>>> information about execution to make dynamic scaling decisions. First-class
>>>> async APIs seem like they would encourage DoFn authors to implement their
>>>> own parallelization, rather than deferring to the runner that should be
>>>> more capable of providing the right level of parallelism.
>>>> >>>
>>>> >>> In the Dataflow worker harness, we estimate execution time to
>>>> PTransform steps by sampling execution time on the execution thread and
>>>> attributing it to the currently invoked method. This approach is fairly
>>>> simple and possible because we assume that execution happens within the
>>>> thread controlled by the runner. Some DoFn's already implement their own
>>>> async logic and break this assumption; I would expect more if we make async
>>>> built into the DoFn APIs.
>>>> >>>
>>>> >>> So: this isn't an argument against async APIs, but rather: does
>>>> this break execution observability, and are there other lightweight
>>>> mechanisms for attributing execution time of async work?
>>>> >>>
>>>> >>> On Tue, Jan 22, 2019 at 7:08 PM Kenneth Knowles <k...@google.com>
>>>> wrote:
>>>> >>>>
>>>> >>>> When executed over the portable APIs, it will be primarily the
>>>> Java SDK harness that makes all of these decisions. If we wanted runners to
>>>> have some insight into it we would have to add it to the Beam model protos.
>>>> I don't have any suggestions there, so I would leave it out of this
>>>> discussion until there's good ideas. We could learn a lot by trying it out
>>>> just in the SDK harness.
>>>> >>>>
>>>> >>>> Kenn
>>>> >>>>
>>>> >>>> On Tue, Jan 22, 2019 at 6:12 PM Xinyu Liu <xinyuliu...@gmail.com>
>>>> wrote:
>>>> >>>>>
>>>> >>>>> I don't have a strong opinion on the resolution of the futures
>>>> regarding to @FinishBundle invocation. Leaving it to be unspecified does
>>>> give runners more room to implement it with their own support.
>>>> >>>>>
>>>> >>>>> Optimization is also another great point. Fuse seems pretty
>>>> complex to me too if we need to find a way to chain the resulting future
>>>> into the next transform, or leave the async transform as a standalone stage
>>>> initially?
>>>> >>>>>
>>>> >>>>> Btw, I was counting the number of replies before we hit the
>>>> portability. Seems after 4 replies fuse finally showed up :).
>>>> >>>>>
>>>> >>>>> Thanks,
>>>> >>>>> Xinyu
>>>> >>>>>
>>>> >>>>>
>>>> >>>>> On Tue, Jan 22, 2019 at 5:42 PM Kenneth Knowles <k...@google.com>
>>>> wrote:
>>>> >>>>>>
>>>> >>>>>>
>>>> >>>>>>
>>>> >>>>>> On Tue, Jan 22, 2019, 17:23 Reuven Lax <re...@google.com wrote:
>>>> >>>>>>>
>>>> >>>>>>>
>>>> >>>>>>>
>>>> >>>>>>> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu <
>>>> xinyuliu...@gmail.com> wrote:
>>>> >>>>>>>>
>>>> >>>>>>>> @Steve: it's good to see that this is going to be useful in
>>>> your use cases as well. Thanks for sharing the code from Scio! I can see in
>>>> your implementation that waiting for the future completion is part of the
>>>> @FinishBundle. We are thinking of taking advantage of the underlying runner
>>>> async support so the user-level code won't need to implement this logic,
>>>> e.g. Samza has an AsyncSteamTask api that provides a callback to invoke
>>>> after future completion[1], and Flink also has AsyncFunction api [2] which
>>>> provides a ResultFuture similar to the API we discussed.
>>>> >>>>>>>
>>>> >>>>>>>
>>>> >>>>>>> Can this be done correctly? What I mean is that if the process
>>>> dies, can you guarantee that no data is lost? Beam currently guarantees
>>>> this for FinishBundle, but if you use an arbitrary async framework this
>>>> might not be true.
>>>> >>>>>>
>>>> >>>>>>
>>>> >>>>>> What a Beam runner guarantees is that *if* the bundle is
>>>> committed, *then* finishbundle has run. So it seems just as easy to say
>>>> *if* a bundle is committed, *then* every async result has been resolved.
>>>> >>>>>>
>>>> >>>>>> If the process dies the two cases should be naturally analogous.
>>>> >>>>>>
>>>> >>>>>> But it raises the question of whether they should be resolved
>>>> prior to finishbundle, after, or unspecified. I lean toward unspecified.
>>>> >>>>>>
>>>> >>>>>> That's for a single ParDo. Where this could get complex is
>>>> optimizing fused stages for greater asynchrony.
>>>> >>>>>>
>>>> >>>>>> Kenn
>>>> >>>>>>
>>>> >>>>>>>
>>>> >>>>>>>>
>>>> >>>>>>>> A simple use case for this is to execute a Runnable
>>>> asynchronously in user's own executor. The following code illustrates
>>>> Kenn's option #2, with a very simple single-thread pool being the executor:
>>>> >>>>>>>>
>>>> >>>>>>>> new DoFn<InputT, OutputT>() {
>>>> >>>>>>>>   @ProcessElement
>>>> >>>>>>>>   public void process(@Element InputT element, @Output
>>>> OutputReceiver<CompletionStage<OutputT>> outputReceiver) {
>>>> >>>>>>>>     CompletableFuture<OutputT> future =
>>>> CompletableFuture.supplyAsync(
>>>> >>>>>>>>         () -> someOutput,
>>>> >>>>>>>>         Executors.newSingleThreadExecutor());
>>>> >>>>>>>>     outputReceiver.output(future);
>>>> >>>>>>>>   }
>>>> >>>>>>>> }
>>>> >>>>>>>>
>>>> >>>>>>>> The neat thing about this API is that the user can choose
>>>> their own async framework and we only expect the output to be a
>>>> CompletionStage.
>>>> >>>>>>>>
>>>> >>>>>>>>
>>>> >>>>>>>> For the implementation of bundling, can we compose a
>>>> CompletableFuture from each element in the bundle, e.g.
>>>> CompletableFuture.allOf(...), and then invoke @FinishBundle when this
>>>> future is complete? Seems this might work.
>>>> >>>>>>>>
>>>> >>>>>>>> Thanks,
>>>> >>>>>>>> Xinyu
>>>> >>>>>>>>
>>>> >>>>>>>>
>>>> >>>>>>>> [1]
>>>> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
>>>> >>>>>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
>>>> >>>>>>>>
>>>> >>>>>>>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz <
>>>> sniem...@apache.org> wrote:
>>>> >>>>>>>>>
>>>> >>>>>>>>> I'd love to see something like this as well.  Also +1 to
>>>> process(@Element InputT element, @Output
>>>> OutputReceiver<CompletionStage<OutputT>>).  I don't know if there's much
>>>> benefit to passing a future in, since the framework itself could hook up
>>>> the process function to complete when the future completes.
>>>> >>>>>>>>>
>>>> >>>>>>>>> I feel like I've spent a bunch of time writing very similar
>>>> "kick off a future in ProcessElement, join it in FinishBundle" code, and
>>>> looking around beam itself a lot of built-in transforms do it as well.
>>>> Scio provides a few AsyncDoFn implementations [1] but it'd be great to see
>>>> this as a first-class concept in beam itself.  Doing error handling,
>>>> concurrency, etc correctly can be tricky.
>>>> >>>>>>>>>
>>>> >>>>>>>>> [1]
>>>> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
>>>> >>>>>>>>>
>>>> >>>>>>>>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles <
>>>> k...@google.com> wrote:
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> If the input is a CompletionStage<InputT> then the output
>>>> should also be a CompletionStage<OutputT>, since all you should do is async
>>>> chaining. We could enforce this by giving the DoFn an
>>>> OutputReceiver(CompletionStage<OutputT>).
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> Another possibility that might be even more robust against
>>>> poor future use could be process(@Element InputT element, @Output
>>>> OutputReceiver<CompletionStage<OutputT>>). In this way, the process method
>>>> itself will be async chained, rather than counting on the user to do the
>>>> right thing.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> We should see how these look in real use cases. The way that
>>>> processing is split between @ProcessElement and @FinishBundle might
>>>> complicate things.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> Kenn
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu <
>>>> xinyuliu...@gmail.com> wrote:
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>> Hi, guys,
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>> As more users try out Beam running on the SamzaRunner, we
>>>> got a lot of asks for an asynchronous processing API. There are a few
>>>> reasons for these asks:
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>> The users here are experienced in asynchronous programming.
>>>> With async frameworks such as Netty and ParSeq and libs like async jersey
>>>> client, they are able to make remote calls efficiently and the libraries
>>>> help manage the execution threads underneath. Async remote calls are very
>>>> common in most of our streaming applications today.
>>>> >>>>>>>>>>> Many jobs are running on a multi-tenancy cluster. Async
>>>> processing helps for less resource usage and fast computation (less context
>>>> switch).
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>> I asked about the async support in a previous email thread.
>>>> The following API was mentioned in the reply:
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>>   new DoFn<InputT, OutputT>() {
>>>> >>>>>>>>>>>     @ProcessElement
>>>> >>>>>>>>>>>     public void process(@Element CompletionStage<InputT>
>>>> element, ...) {
>>>> >>>>>>>>>>>       element.thenApply(...)
>>>> >>>>>>>>>>>     }
>>>> >>>>>>>>>>>   }
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>> We are wondering whether there are any discussions on this
>>>> API and related docs. It is awesome that you guys already considered having
>>>> DoFn to process asynchronously. Out of curiosity, this API seems to create
>>>> a CompletionState out of the input element (probably using framework's
>>>> executor) and then allow user to chain on it. To us, it seems more
>>>> convenient if the DoFn output a CompletionStage<OutputT> or pass in a
>>>> CompletionStage<OutputT> to invoke upon completion.
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>> We would like to discuss further on the async API and
>>>> hopefully we will have a great support in Beam. Really appreciate the
>>>> feedback!
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>> Thanks,
>>>> >>>>>>>>>>> Xinyu
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>> --
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>> Got feedback? tinyurl.com/swegner-feedback
>>>>
>>>

-- 




Got feedback? tinyurl.com/swegner-feedback

Reply via email to