Yep. Introduce OutputEmitter, and Process context no longer has much use.

On Sun, Mar 11, 2018, 11:19 AM Romain Manni-Bucau <rmannibu...@gmail.com>
wrote:

> Which is still a key feature for sdf but agree it can be dropped for an
> outputemitter pattern and the dofn moved to a plain parameters injection
> based pattern. Both (which completionstage) stays compatible :).
>
> Le 11 mars 2018 13:12, "Reuven Lax" <re...@google.com> a écrit :
>
>> I think process context should go away completely. At that point it has
>> little use except for a way to send output downstream.
>>
>> On Sun, Mar 11, 2018, 6:07 AM Romain Manni-Bucau <rmannibu...@gmail.com>
>> wrote:
>>
>>> Hmm, thinking out loud but completionstage should/could be extended to
>>> replace processcontext since it represents element and output at the same
>>> time no?
>>>
>>> Le 11 mars 2018 00:57, "Kenneth Knowles" <k...@google.com> a écrit :
>>>
>>>> Yea, I think it could. But it is probably more readable to not overload
>>>> the term, plus certainly a bit simpler in implementation. So perhaps
>>>> @AsyncElement to make it very clear.
>>>>
>>>> Kenn
>>>>
>>>> On Sat, Mar 10, 2018 at 1:32 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Ken, can NewDoFn distinguish at generation time the difference between:
>>>>>
>>>>>     public void process(@Element CompletionStage<InputT> element, ...)
>>>>> {
>>>>>
>>>>> and
>>>>>
>>>>>     public void process(@Element Input element, ...) {
>>>>>
>>>>> If not, then we would probably need separate annotations....
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Sat, Mar 10, 2018 at 11:09 AM Kenneth Knowles <k...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Nice! I agree that providing a CompletionStage for chaining is much
>>>>>> better than an ExecutorService, and very clear.
>>>>>>
>>>>>> It is very feasible to add support that looks like
>>>>>>
>>>>>>   new DoFn<InputT, OutputT>() {
>>>>>>     @ProcessElement
>>>>>>     public void process(@Element CompletionStage<InputT> element,
>>>>>> ...) {
>>>>>>       element.thenApply(...)
>>>>>>     }
>>>>>>   }
>>>>>>
>>>>>> If we had this available, I think users could even experiment with
>>>>>> this often as it might help even where it isn't obvious.
>>>>>>
>>>>>> My main hesitation is that big part of Beam is giving a
>>>>>> basic/imperative style of programming a DoFn that executes in a very 
>>>>>> smart
>>>>>> functional/parallel way. Full future-oriented programming is not
>>>>>> explored much outside of Javascript (and maybe Haskell) and requires
>>>>>> greater discipline in programming in a functional manner - if you are
>>>>>> mutating stuff in your callback you are going to have bugs, and then when
>>>>>> you add concurrency control you are going to have bad performance and
>>>>>> deadlocks. So I definitely wouldn't make it the default or want to spend
>>>>>> all our support effort on teaching advanced programming technique.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau <
>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>
>>>>>>>> Have you considered drafting in detail what you think this API
>>>>>>>> might look like?
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Yes, but it is after the "enhancements" - for my use cases - and
>>>>>>> "bugs" list so didn't started to work on it much.
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> If it's a radically different API, it might be more appropriate as
>>>>>>>> an alternative parallel Beam API rather than a replacement for the 
>>>>>>>> current
>>>>>>>> API (there is also one such fluent API in the works).
>>>>>>>>
>>>>>>>
>>>>>>> What I plan is to draft it on top of beam (so the "useless" case I
>>>>>>> spoke about before) and then propose to impl it ~natively and move it as
>>>>>>> main API for another major.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau <
>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>>>
>>>>>>>>>> This is another version (maybe a better, Java 8 idiomatic one?)
>>>>>>>>>> of what Kenn suggested.
>>>>>>>>>>
>>>>>>>>>> Note that with NewDoFn this need not be incompatible (so might
>>>>>>>>>> not require waiting till Beam 3.0). We can recognize new parameters 
>>>>>>>>>> to
>>>>>>>>>> processElement and populate add needed.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> This is right however in my head it was a single way movemenent to
>>>>>>>>> enforce the design to be reactive and not fake a reactive API with a 
>>>>>>>>> sync
>>>>>>>>> and not reactive impl which is what would be done today with both 
>>>>>>>>> support I
>>>>>>>>> fear.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau <
>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Yes, for the dofn for instance, instead of having
>>>>>>>>>>> processcontext.element()=<T> you get a CompletionStage<T> and 
>>>>>>>>>>> output gets
>>>>>>>>>>> it as well.
>>>>>>>>>>>
>>>>>>>>>>> This way you register an execution chain. Mixed with streams you
>>>>>>>>>>> get a big data java 8/9/10 API which enabkes any connectivity in a 
>>>>>>>>>>> wel
>>>>>>>>>>> performing manner ;).
>>>>>>>>>>>
>>>>>>>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a écrit :
>>>>>>>>>>>
>>>>>>>>>>>> So you mean the user should have a way of registering
>>>>>>>>>>>> asynchronous activity with a callback (the callback must be 
>>>>>>>>>>>> registered with
>>>>>>>>>>>> Beam, because Beam needs to know not to mark the element as done 
>>>>>>>>>>>> until all
>>>>>>>>>>>> associated callbacks have completed). I think that's basically 
>>>>>>>>>>>> what Kenn
>>>>>>>>>>>> was suggesting, unless I'm missing something.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau <
>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Yes, callback based. Beam today is synchronous and until
>>>>>>>>>>>>> bundles+combines are reactive friendly, beam will be synchronous 
>>>>>>>>>>>>> whatever
>>>>>>>>>>>>> other parts do. Becoming reactive will enable to manage the 
>>>>>>>>>>>>> threading
>>>>>>>>>>>>> issues properly and to have better scalability on the overall 
>>>>>>>>>>>>> execution
>>>>>>>>>>>>> when remote IO are involved.
>>>>>>>>>>>>>
>>>>>>>>>>>>> However it requires to break source, sdf design to use
>>>>>>>>>>>>> completionstage - or equivalent - to chain the processing 
>>>>>>>>>>>>> properly and in
>>>>>>>>>>>>> an unified fashion.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a
>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>>
>>>>>>>>>>>>> If you're talking about reactive programming, at a certain
>>>>>>>>>>>>> level beam is already reactive. Are you referring to a specific 
>>>>>>>>>>>>> way of
>>>>>>>>>>>>> writing the code?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <re...@google.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> What do you mean by reactive?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <
>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> @Kenn: why not preferring to make beam reactive? Would alow
>>>>>>>>>>>>>>> to scale way more without having to hardly synchronize 
>>>>>>>>>>>>>>> multithreading.
>>>>>>>>>>>>>>> Elegant and efficient :). Beam 3?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <k...@google.com> a
>>>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I will start with the "exciting futuristic" answer, which
>>>>>>>>>>>>>>>> is that we envision the new DoFn to be able to provide an 
>>>>>>>>>>>>>>>> automatic
>>>>>>>>>>>>>>>> ExecutorService parameters that you can use as you wish.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     new DoFn<>() {
>>>>>>>>>>>>>>>>       @ProcessElement
>>>>>>>>>>>>>>>>       public void process(ProcessContext ctx,
>>>>>>>>>>>>>>>> ExecutorService executorService) {
>>>>>>>>>>>>>>>>           ... launch some futures, put them in instance
>>>>>>>>>>>>>>>> vars ...
>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>       @FinishBundle
>>>>>>>>>>>>>>>>       public void finish(...) {
>>>>>>>>>>>>>>>>          ... block on futures, output results if
>>>>>>>>>>>>>>>> appropriate ...
>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> This way, the Java SDK harness can use its overarching
>>>>>>>>>>>>>>>> knowledge of what is going on in a computation to, for 
>>>>>>>>>>>>>>>> example, share a
>>>>>>>>>>>>>>>> thread pool between different bits. This was one reason to 
>>>>>>>>>>>>>>>> delete
>>>>>>>>>>>>>>>> IntraBundleParallelization - it didn't allow the runner and 
>>>>>>>>>>>>>>>> user code to
>>>>>>>>>>>>>>>> properly manage how many things were going on concurrently. 
>>>>>>>>>>>>>>>> And mostly the
>>>>>>>>>>>>>>>> runner should own parallelizing to max out cores and what user 
>>>>>>>>>>>>>>>> code needs
>>>>>>>>>>>>>>>> is asynchrony hooks that can interact with that. However, this 
>>>>>>>>>>>>>>>> feature is
>>>>>>>>>>>>>>>> not thoroughly considered. TBD how much the harness itself 
>>>>>>>>>>>>>>>> manages blocking
>>>>>>>>>>>>>>>> on outstanding requests versus it being your responsibility in
>>>>>>>>>>>>>>>> FinishBundle, etc.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I haven't explored rolling your own here, if you are
>>>>>>>>>>>>>>>> willing to do the knob tuning to get the threading acceptable 
>>>>>>>>>>>>>>>> for your
>>>>>>>>>>>>>>>> particular use case. Perhaps someone else can weigh in.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <
>>>>>>>>>>>>>>>> josh.fe...@bounceexchange.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hello all:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Our team has a pipeline that make external network calls.
>>>>>>>>>>>>>>>>> These pipelines are currently super slow, and the hypothesis 
>>>>>>>>>>>>>>>>> is that they
>>>>>>>>>>>>>>>>> are slow because we are not threading for our network calls. 
>>>>>>>>>>>>>>>>> The github
>>>>>>>>>>>>>>>>> issue below provides some discussion around this:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> https://github.com/apache/beam/pull/957
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization, which
>>>>>>>>>>>>>>>>> helped with this. However, this was removed because it didn't 
>>>>>>>>>>>>>>>>> comply with a
>>>>>>>>>>>>>>>>> few BEAM paradigms.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Questions going forward:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> What is advised for jobs that make blocking network calls?
>>>>>>>>>>>>>>>>> It seems bundling the elements into groups of size X prior to 
>>>>>>>>>>>>>>>>> passing to
>>>>>>>>>>>>>>>>> the DoFn, and managing the threading within the function 
>>>>>>>>>>>>>>>>> might work.
>>>>>>>>>>>>>>>>> thoughts?
>>>>>>>>>>>>>>>>> Are these types of jobs even suitable for beam?
>>>>>>>>>>>>>>>>> Are there any plans to develop features that help with
>>>>>>>>>>>>>>>>> this?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>
>>>>>>>

Reply via email to