Yep. Introduce OutputEmitter, and Process context no longer has much use. On Sun, Mar 11, 2018, 11:19 AM Romain Manni-Bucau <rmannibu...@gmail.com> wrote:
> Which is still a key feature for sdf but agree it can be dropped for an > outputemitter pattern and the dofn moved to a plain parameters injection > based pattern. Both (which completionstage) stays compatible :). > > Le 11 mars 2018 13:12, "Reuven Lax" <re...@google.com> a écrit : > >> I think process context should go away completely. At that point it has >> little use except for a way to send output downstream. >> >> On Sun, Mar 11, 2018, 6:07 AM Romain Manni-Bucau <rmannibu...@gmail.com> >> wrote: >> >>> Hmm, thinking out loud but completionstage should/could be extended to >>> replace processcontext since it represents element and output at the same >>> time no? >>> >>> Le 11 mars 2018 00:57, "Kenneth Knowles" <k...@google.com> a écrit : >>> >>>> Yea, I think it could. But it is probably more readable to not overload >>>> the term, plus certainly a bit simpler in implementation. So perhaps >>>> @AsyncElement to make it very clear. >>>> >>>> Kenn >>>> >>>> On Sat, Mar 10, 2018 at 1:32 PM Reuven Lax <re...@google.com> wrote: >>>> >>>>> Ken, can NewDoFn distinguish at generation time the difference between: >>>>> >>>>> public void process(@Element CompletionStage<InputT> element, ...) >>>>> { >>>>> >>>>> and >>>>> >>>>> public void process(@Element Input element, ...) { >>>>> >>>>> If not, then we would probably need separate annotations.... >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Sat, Mar 10, 2018 at 11:09 AM Kenneth Knowles <k...@google.com> >>>>> wrote: >>>>> >>>>>> Nice! I agree that providing a CompletionStage for chaining is much >>>>>> better than an ExecutorService, and very clear. >>>>>> >>>>>> It is very feasible to add support that looks like >>>>>> >>>>>> new DoFn<InputT, OutputT>() { >>>>>> @ProcessElement >>>>>> public void process(@Element CompletionStage<InputT> element, >>>>>> ...) { >>>>>> element.thenApply(...) >>>>>> } >>>>>> } >>>>>> >>>>>> If we had this available, I think users could even experiment with >>>>>> this often as it might help even where it isn't obvious. >>>>>> >>>>>> My main hesitation is that big part of Beam is giving a >>>>>> basic/imperative style of programming a DoFn that executes in a very >>>>>> smart >>>>>> functional/parallel way. Full future-oriented programming is not >>>>>> explored much outside of Javascript (and maybe Haskell) and requires >>>>>> greater discipline in programming in a functional manner - if you are >>>>>> mutating stuff in your callback you are going to have bugs, and then when >>>>>> you add concurrency control you are going to have bad performance and >>>>>> deadlocks. So I definitely wouldn't make it the default or want to spend >>>>>> all our support effort on teaching advanced programming technique. >>>>>> >>>>>> Kenn >>>>>> >>>>>> On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau < >>>>>> rmannibu...@gmail.com> wrote: >>>>>> >>>>>>> >>>>>>> >>>>>>> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>: >>>>>>> >>>>>>>> Have you considered drafting in detail what you think this API >>>>>>>> might look like? >>>>>>>> >>>>>>> >>>>>>> >>>>>>> Yes, but it is after the "enhancements" - for my use cases - and >>>>>>> "bugs" list so didn't started to work on it much. >>>>>>> >>>>>>> >>>>>>>> >>>>>>>> If it's a radically different API, it might be more appropriate as >>>>>>>> an alternative parallel Beam API rather than a replacement for the >>>>>>>> current >>>>>>>> API (there is also one such fluent API in the works). >>>>>>>> >>>>>>> >>>>>>> What I plan is to draft it on top of beam (so the "useless" case I >>>>>>> spoke about before) and then propose to impl it ~natively and move it as >>>>>>> main API for another major. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau < >>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>: >>>>>>>>> >>>>>>>>>> This is another version (maybe a better, Java 8 idiomatic one?) >>>>>>>>>> of what Kenn suggested. >>>>>>>>>> >>>>>>>>>> Note that with NewDoFn this need not be incompatible (so might >>>>>>>>>> not require waiting till Beam 3.0). We can recognize new parameters >>>>>>>>>> to >>>>>>>>>> processElement and populate add needed. >>>>>>>>>> >>>>>>>>> >>>>>>>>> This is right however in my head it was a single way movemenent to >>>>>>>>> enforce the design to be reactive and not fake a reactive API with a >>>>>>>>> sync >>>>>>>>> and not reactive impl which is what would be done today with both >>>>>>>>> support I >>>>>>>>> fear. >>>>>>>>> >>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau < >>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Yes, for the dofn for instance, instead of having >>>>>>>>>>> processcontext.element()=<T> you get a CompletionStage<T> and >>>>>>>>>>> output gets >>>>>>>>>>> it as well. >>>>>>>>>>> >>>>>>>>>>> This way you register an execution chain. Mixed with streams you >>>>>>>>>>> get a big data java 8/9/10 API which enabkes any connectivity in a >>>>>>>>>>> wel >>>>>>>>>>> performing manner ;). >>>>>>>>>>> >>>>>>>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a écrit : >>>>>>>>>>> >>>>>>>>>>>> So you mean the user should have a way of registering >>>>>>>>>>>> asynchronous activity with a callback (the callback must be >>>>>>>>>>>> registered with >>>>>>>>>>>> Beam, because Beam needs to know not to mark the element as done >>>>>>>>>>>> until all >>>>>>>>>>>> associated callbacks have completed). I think that's basically >>>>>>>>>>>> what Kenn >>>>>>>>>>>> was suggesting, unless I'm missing something. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau < >>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Yes, callback based. Beam today is synchronous and until >>>>>>>>>>>>> bundles+combines are reactive friendly, beam will be synchronous >>>>>>>>>>>>> whatever >>>>>>>>>>>>> other parts do. Becoming reactive will enable to manage the >>>>>>>>>>>>> threading >>>>>>>>>>>>> issues properly and to have better scalability on the overall >>>>>>>>>>>>> execution >>>>>>>>>>>>> when remote IO are involved. >>>>>>>>>>>>> >>>>>>>>>>>>> However it requires to break source, sdf design to use >>>>>>>>>>>>> completionstage - or equivalent - to chain the processing >>>>>>>>>>>>> properly and in >>>>>>>>>>>>> an unified fashion. >>>>>>>>>>>>> >>>>>>>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a >>>>>>>>>>>>> écrit : >>>>>>>>>>>>> >>>>>>>>>>>>> If you're talking about reactive programming, at a certain >>>>>>>>>>>>> level beam is already reactive. Are you referring to a specific >>>>>>>>>>>>> way of >>>>>>>>>>>>> writing the code? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <re...@google.com> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> What do you mean by reactive? >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau < >>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> @Kenn: why not preferring to make beam reactive? Would alow >>>>>>>>>>>>>>> to scale way more without having to hardly synchronize >>>>>>>>>>>>>>> multithreading. >>>>>>>>>>>>>>> Elegant and efficient :). Beam 3? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <k...@google.com> a >>>>>>>>>>>>>>> écrit : >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I will start with the "exciting futuristic" answer, which >>>>>>>>>>>>>>>> is that we envision the new DoFn to be able to provide an >>>>>>>>>>>>>>>> automatic >>>>>>>>>>>>>>>> ExecutorService parameters that you can use as you wish. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> new DoFn<>() { >>>>>>>>>>>>>>>> @ProcessElement >>>>>>>>>>>>>>>> public void process(ProcessContext ctx, >>>>>>>>>>>>>>>> ExecutorService executorService) { >>>>>>>>>>>>>>>> ... launch some futures, put them in instance >>>>>>>>>>>>>>>> vars ... >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> @FinishBundle >>>>>>>>>>>>>>>> public void finish(...) { >>>>>>>>>>>>>>>> ... block on futures, output results if >>>>>>>>>>>>>>>> appropriate ... >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> This way, the Java SDK harness can use its overarching >>>>>>>>>>>>>>>> knowledge of what is going on in a computation to, for >>>>>>>>>>>>>>>> example, share a >>>>>>>>>>>>>>>> thread pool between different bits. This was one reason to >>>>>>>>>>>>>>>> delete >>>>>>>>>>>>>>>> IntraBundleParallelization - it didn't allow the runner and >>>>>>>>>>>>>>>> user code to >>>>>>>>>>>>>>>> properly manage how many things were going on concurrently. >>>>>>>>>>>>>>>> And mostly the >>>>>>>>>>>>>>>> runner should own parallelizing to max out cores and what user >>>>>>>>>>>>>>>> code needs >>>>>>>>>>>>>>>> is asynchrony hooks that can interact with that. However, this >>>>>>>>>>>>>>>> feature is >>>>>>>>>>>>>>>> not thoroughly considered. TBD how much the harness itself >>>>>>>>>>>>>>>> manages blocking >>>>>>>>>>>>>>>> on outstanding requests versus it being your responsibility in >>>>>>>>>>>>>>>> FinishBundle, etc. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I haven't explored rolling your own here, if you are >>>>>>>>>>>>>>>> willing to do the knob tuning to get the threading acceptable >>>>>>>>>>>>>>>> for your >>>>>>>>>>>>>>>> particular use case. Perhaps someone else can weigh in. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge < >>>>>>>>>>>>>>>> josh.fe...@bounceexchange.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hello all: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Our team has a pipeline that make external network calls. >>>>>>>>>>>>>>>>> These pipelines are currently super slow, and the hypothesis >>>>>>>>>>>>>>>>> is that they >>>>>>>>>>>>>>>>> are slow because we are not threading for our network calls. >>>>>>>>>>>>>>>>> The github >>>>>>>>>>>>>>>>> issue below provides some discussion around this: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> https://github.com/apache/beam/pull/957 >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization, which >>>>>>>>>>>>>>>>> helped with this. However, this was removed because it didn't >>>>>>>>>>>>>>>>> comply with a >>>>>>>>>>>>>>>>> few BEAM paradigms. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Questions going forward: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> What is advised for jobs that make blocking network calls? >>>>>>>>>>>>>>>>> It seems bundling the elements into groups of size X prior to >>>>>>>>>>>>>>>>> passing to >>>>>>>>>>>>>>>>> the DoFn, and managing the threading within the function >>>>>>>>>>>>>>>>> might work. >>>>>>>>>>>>>>>>> thoughts? >>>>>>>>>>>>>>>>> Are these types of jobs even suitable for beam? >>>>>>>>>>>>>>>>> Are there any plans to develop features that help with >>>>>>>>>>>>>>>>> this? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>> >>>>>>>