All runners should support portable execution for Java, which should be
just as easy as supporting execution of non-Java pipelines over this API.

As for non-portable "specialized" execution of Java, I think it's a
tradeoff between the overhead of the portability framework vs. the
maintenance cost of providing a separate java-only runner. In time I see
the former dropping (though there's perhaps a lower bound for how far it
can go) and the latter increasing, and the cross-over point may be
different for different runners and users, but would echo the sentiments
that portable execution is the baseline.


On Thu, Mar 8, 2018 at 12:38 PM Kenneth Knowles <k...@google.com> wrote:

> +1 to Luke's answer of "yes" for everything to be "portable by default".
>
> However, I (always) favor decentralizing this decision as long as the
> "Beam model" is respected.
>
> Baseline:
>  - the input pipeline should always be in portable format
>  - the results of execution should match portable execution (which we have
> never defined clearly and maybe never will bother... the Fn API is geared
> toward performance and ad-hoc use according to a runner's physical plan,
> but if we decided to build a spec for the pipeline proto it would include
> at least the part where an SDK owns the semantics of a Fn)
>
> So in general each "DoFn" (and other Fn) is a struct with roughly { env =
> URL, urn = <"name" of fn, modulo parameterization>, bytes = <serialized
> form that the container understands> } where the runner has never seen the
> URL before, may not know the URN, and likely cannot interpret the bytes at
> all. There is no choice but to ask the SDK to apply it according to the
> required computational pattern (ParDo, etc). Any execution strategy that
> yields the same result is allowable.
>
> This format for user Fns is _intended_ to support direct execution by the
> runner without sending to an SDK and is already used for standard window
> fns that have an SDK-agnostic proto representation [1]. So the Go SDK can
> submit a Window.into(<fixed windows of 1 hour>) and the runner can just do
> that. The case where the URN is "java dofn" and the bytes are a serialized
> Java DoFn from the user's staged jars is more difficult.
>
> However, supporting portable execution alongside this specialization is a
> lot of maintenance overhead and as Luke points out causes other user pain
> having nothing to do with cross-language requirements. I would definitely
> reset our perspective to take portable black-box execution as the baseline.
>
> Kenn
>
> [1]
> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/standard_window_fns.proto
>
> On Thu, Mar 8, 2018 at 11:18 AM Lukasz Cwik <lc...@google.com> wrote:
>
>> I ran some very pessimistic pipelines that were shuffle heavy (Random KV
>> -> GBK -> IdentityDoFn) and found that the performance overhead was 15%
>> when executed with Dataflow. This is a while back and there was a lot of
>> inefficiencies due to coder encode/decode cycles and based upon profiling
>> information surmised the with some work to reduce the amount of times that
>> byte[] are copied that this could get reduced to about 8%. I can't say how
>> this will impact Flink as its a different execution engine but we should
>> gather data first.
>>
>> On Thu, Mar 8, 2018 at 11:10 AM, Thomas Weise <t...@apache.org> wrote:
>>
>>> Performance, due to the extra gRPC hop.
>>>
>>>
>>> On Thu, Mar 8, 2018 at 11:08 AM, Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> The goal is to use containers (and similar technologies) in the future.
>>>> It really hinders pipeline portability between runners if you also have to
>>>> deal with the dependency conflicts between Flink/Dataflow/Spark/...
>>>> execution runtimes.
>>>>
>>>> What kinds of penalty are you referring to (perf, user complexity, ...)?
>>>>
>>>>
>>>>
>>>> On Thu, Mar 8, 2018 at 11:02 AM, Thomas Weise <t...@apache.org> wrote:
>>>>
>>>>> I'm curious if pipelines that are exclusively Java will be executed
>>>>> (when running on Flink or other JVM based runnner) in separate harness
>>>>> containers also? This would impose a significant penalty compared to the
>>>>> current execution model. Will this be something the user can control?
>>>>>
>>>>> Thanks,
>>>>> Thomas
>>>>>
>>>>>
>>>>> On Wed, Mar 7, 2018 at 2:09 PM, Aljoscha Krettek <aljos...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> @Axel I assigned https://issues.apache.org/jira/browse/BEAM-2588 to
>>>>>> you. It might make sense to also grab other issues that you're already
>>>>>> working on.
>>>>>>
>>>>>>
>>>>>> On 7. Mar 2018, at 21:18, Aljoscha Krettek <aljos...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>> Cool, so we had the same ideas. I think this indicates that we're not
>>>>>> completely on the wrong track with this! ;-)
>>>>>>
>>>>>> Aljoscha
>>>>>>
>>>>>> On 7. Mar 2018, at 21:14, Thomas Weise <t...@apache.org> wrote:
>>>>>>
>>>>>> Ben,
>>>>>>
>>>>>> Looks like we hit the send button at the same time. Is the plan the
>>>>>> to derive the Flink implementation of the various execution services from
>>>>>> those under org.apache.beam.runners.fnexecution ?
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> On Wed, Mar 7, 2018 at 11:02 AM, Thomas Weise <t...@apache.org> wrote:
>>>>>>
>>>>>>> What's the plan for the endpoints that the Flink operator needs to
>>>>>>> provide (control/data plane, state, logging)? Is the intention to 
>>>>>>> provide
>>>>>>> base implementations that can be shared across runners and then 
>>>>>>> implement
>>>>>>> the Flink specific parts on top of it? Has work started on those?
>>>>>>>
>>>>>>> If there are subtasks ready to be taken up I would be interested.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Thomas
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Mar 7, 2018 at 9:35 AM, Ben Sidhom <sid...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Yes, Axel has started work on such a shim.
>>>>>>>>
>>>>>>>> Our plan in the short term is to keep the old FlinkRunner around
>>>>>>>> and to call into it to process jobs from the job service itself. That 
>>>>>>>> way
>>>>>>>> we can keep the non-portable runner fully-functional while working on
>>>>>>>> portability. Eventually, I think it makes sense for this to go away, 
>>>>>>>> but we
>>>>>>>> haven't given much thought to that. The translator layer will likely 
>>>>>>>> stay
>>>>>>>> the same, and the FlinkRunner bits are a relatively simple wrapper 
>>>>>>>> around
>>>>>>>> translation, so it should be simple enough to factor this out.
>>>>>>>>
>>>>>>>> Much of the service code from the Universal Local Runner (ULR)
>>>>>>>> should be composed and reused with other runner implementations. 
>>>>>>>> Thomas and
>>>>>>>> Axel have more context around that.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Mar 7, 2018 at 8:47 AM Aljoscha Krettek <
>>>>>>>> aljos...@apache.org> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Has anyone started on
>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-2588 (FlinkRunner shim
>>>>>>>>> for serving Job API). If not I would start on that.
>>>>>>>>>
>>>>>>>>> My plan is to implement a FlinkJobService that implements 
>>>>>>>>> JobServiceImplBase,
>>>>>>>>> similar to ReferenceRunnerJobService. This would have a lot of the
>>>>>>>>> functionality that FlinkRunner currently has. As a next step, I would 
>>>>>>>>> add a
>>>>>>>>> JobServiceRunner that can submit Pipelines to a JobService.
>>>>>>>>>
>>>>>>>>> For testing, I would probably add functionality that allows
>>>>>>>>> spinning up a JobService in-process with the JobServiceRunner. I can
>>>>>>>>> imagine for testing we could even eventually use something like:
>>>>>>>>> "--runner=JobServiceRunner", "--streaming=true",
>>>>>>>>> "--jobService=FlinkRunnerJobService".
>>>>>>>>>
>>>>>>>>> Once all of this is done, we only need the python component that
>>>>>>>>> talks to the JobService to submit a pipeline.
>>>>>>>>>
>>>>>>>>> What do you think about the plan?
>>>>>>>>>
>>>>>>>>> Btw, I feel that the thing currently called Runner, i.e.
>>>>>>>>> FlinkRunner will go way in the long run and we will have 
>>>>>>>>> FlinkJobService,
>>>>>>>>> SparkJobService and whatnot, what do you think?
>>>>>>>>>
>>>>>>>>> Aljoscha
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 9. Feb 2018, at 01:31, Ben Sidhom <sid...@google.com> wrote:
>>>>>>>>>
>>>>>>>>> Hey all,
>>>>>>>>>
>>>>>>>>> We're working on getting the portability framework plumbed through
>>>>>>>>> the Flink runner. The first iteration will likely only support batch 
>>>>>>>>> and
>>>>>>>>> will be limited in its deployment flexibility, but hopefully it 
>>>>>>>>> shouldn't
>>>>>>>>> be too painful to expand this.
>>>>>>>>>
>>>>>>>>> We have the start of a tracking doc here:
>>>>>>>>> https://s.apache.org/portable-beam-on-flink.
>>>>>>>>>
>>>>>>>>> We've documented the general deployment strategy here:
>>>>>>>>> https://s.apache.org/portable-flink-runner-overview.
>>>>>>>>>
>>>>>>>>> Feel free to provide comments on the docs or jump in on any of the
>>>>>>>>> referenced bugs.
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> -Ben
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> -Ben
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>

Reply via email to