All runners should support portable execution for Java, which should be just as easy as supporting execution of non-Java pipelines over this API.
As for non-portable "specialized" execution of Java, I think it's a tradeoff between the overhead of the portability framework vs. the maintenance cost of providing a separate java-only runner. In time I see the former dropping (though there's perhaps a lower bound for how far it can go) and the latter increasing, and the cross-over point may be different for different runners and users, but would echo the sentiments that portable execution is the baseline. On Thu, Mar 8, 2018 at 12:38 PM Kenneth Knowles <k...@google.com> wrote: > +1 to Luke's answer of "yes" for everything to be "portable by default". > > However, I (always) favor decentralizing this decision as long as the > "Beam model" is respected. > > Baseline: > - the input pipeline should always be in portable format > - the results of execution should match portable execution (which we have > never defined clearly and maybe never will bother... the Fn API is geared > toward performance and ad-hoc use according to a runner's physical plan, > but if we decided to build a spec for the pipeline proto it would include > at least the part where an SDK owns the semantics of a Fn) > > So in general each "DoFn" (and other Fn) is a struct with roughly { env = > URL, urn = <"name" of fn, modulo parameterization>, bytes = <serialized > form that the container understands> } where the runner has never seen the > URL before, may not know the URN, and likely cannot interpret the bytes at > all. There is no choice but to ask the SDK to apply it according to the > required computational pattern (ParDo, etc). Any execution strategy that > yields the same result is allowable. > > This format for user Fns is _intended_ to support direct execution by the > runner without sending to an SDK and is already used for standard window > fns that have an SDK-agnostic proto representation [1]. So the Go SDK can > submit a Window.into(<fixed windows of 1 hour>) and the runner can just do > that. The case where the URN is "java dofn" and the bytes are a serialized > Java DoFn from the user's staged jars is more difficult. > > However, supporting portable execution alongside this specialization is a > lot of maintenance overhead and as Luke points out causes other user pain > having nothing to do with cross-language requirements. I would definitely > reset our perspective to take portable black-box execution as the baseline. > > Kenn > > [1] > https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/standard_window_fns.proto > > On Thu, Mar 8, 2018 at 11:18 AM Lukasz Cwik <lc...@google.com> wrote: > >> I ran some very pessimistic pipelines that were shuffle heavy (Random KV >> -> GBK -> IdentityDoFn) and found that the performance overhead was 15% >> when executed with Dataflow. This is a while back and there was a lot of >> inefficiencies due to coder encode/decode cycles and based upon profiling >> information surmised the with some work to reduce the amount of times that >> byte[] are copied that this could get reduced to about 8%. I can't say how >> this will impact Flink as its a different execution engine but we should >> gather data first. >> >> On Thu, Mar 8, 2018 at 11:10 AM, Thomas Weise <t...@apache.org> wrote: >> >>> Performance, due to the extra gRPC hop. >>> >>> >>> On Thu, Mar 8, 2018 at 11:08 AM, Lukasz Cwik <lc...@google.com> wrote: >>> >>>> The goal is to use containers (and similar technologies) in the future. >>>> It really hinders pipeline portability between runners if you also have to >>>> deal with the dependency conflicts between Flink/Dataflow/Spark/... >>>> execution runtimes. >>>> >>>> What kinds of penalty are you referring to (perf, user complexity, ...)? >>>> >>>> >>>> >>>> On Thu, Mar 8, 2018 at 11:02 AM, Thomas Weise <t...@apache.org> wrote: >>>> >>>>> I'm curious if pipelines that are exclusively Java will be executed >>>>> (when running on Flink or other JVM based runnner) in separate harness >>>>> containers also? This would impose a significant penalty compared to the >>>>> current execution model. Will this be something the user can control? >>>>> >>>>> Thanks, >>>>> Thomas >>>>> >>>>> >>>>> On Wed, Mar 7, 2018 at 2:09 PM, Aljoscha Krettek <aljos...@apache.org> >>>>> wrote: >>>>> >>>>>> @Axel I assigned https://issues.apache.org/jira/browse/BEAM-2588 to >>>>>> you. It might make sense to also grab other issues that you're already >>>>>> working on. >>>>>> >>>>>> >>>>>> On 7. Mar 2018, at 21:18, Aljoscha Krettek <aljos...@apache.org> >>>>>> wrote: >>>>>> >>>>>> Cool, so we had the same ideas. I think this indicates that we're not >>>>>> completely on the wrong track with this! ;-) >>>>>> >>>>>> Aljoscha >>>>>> >>>>>> On 7. Mar 2018, at 21:14, Thomas Weise <t...@apache.org> wrote: >>>>>> >>>>>> Ben, >>>>>> >>>>>> Looks like we hit the send button at the same time. Is the plan the >>>>>> to derive the Flink implementation of the various execution services from >>>>>> those under org.apache.beam.runners.fnexecution ? >>>>>> >>>>>> Thanks >>>>>> >>>>>> On Wed, Mar 7, 2018 at 11:02 AM, Thomas Weise <t...@apache.org> wrote: >>>>>> >>>>>>> What's the plan for the endpoints that the Flink operator needs to >>>>>>> provide (control/data plane, state, logging)? Is the intention to >>>>>>> provide >>>>>>> base implementations that can be shared across runners and then >>>>>>> implement >>>>>>> the Flink specific parts on top of it? Has work started on those? >>>>>>> >>>>>>> If there are subtasks ready to be taken up I would be interested. >>>>>>> >>>>>>> Thanks, >>>>>>> Thomas >>>>>>> >>>>>>> >>>>>>> On Wed, Mar 7, 2018 at 9:35 AM, Ben Sidhom <sid...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Yes, Axel has started work on such a shim. >>>>>>>> >>>>>>>> Our plan in the short term is to keep the old FlinkRunner around >>>>>>>> and to call into it to process jobs from the job service itself. That >>>>>>>> way >>>>>>>> we can keep the non-portable runner fully-functional while working on >>>>>>>> portability. Eventually, I think it makes sense for this to go away, >>>>>>>> but we >>>>>>>> haven't given much thought to that. The translator layer will likely >>>>>>>> stay >>>>>>>> the same, and the FlinkRunner bits are a relatively simple wrapper >>>>>>>> around >>>>>>>> translation, so it should be simple enough to factor this out. >>>>>>>> >>>>>>>> Much of the service code from the Universal Local Runner (ULR) >>>>>>>> should be composed and reused with other runner implementations. >>>>>>>> Thomas and >>>>>>>> Axel have more context around that. >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Mar 7, 2018 at 8:47 AM Aljoscha Krettek < >>>>>>>> aljos...@apache.org> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> Has anyone started on >>>>>>>>> https://issues.apache.org/jira/browse/BEAM-2588 (FlinkRunner shim >>>>>>>>> for serving Job API). If not I would start on that. >>>>>>>>> >>>>>>>>> My plan is to implement a FlinkJobService that implements >>>>>>>>> JobServiceImplBase, >>>>>>>>> similar to ReferenceRunnerJobService. This would have a lot of the >>>>>>>>> functionality that FlinkRunner currently has. As a next step, I would >>>>>>>>> add a >>>>>>>>> JobServiceRunner that can submit Pipelines to a JobService. >>>>>>>>> >>>>>>>>> For testing, I would probably add functionality that allows >>>>>>>>> spinning up a JobService in-process with the JobServiceRunner. I can >>>>>>>>> imagine for testing we could even eventually use something like: >>>>>>>>> "--runner=JobServiceRunner", "--streaming=true", >>>>>>>>> "--jobService=FlinkRunnerJobService". >>>>>>>>> >>>>>>>>> Once all of this is done, we only need the python component that >>>>>>>>> talks to the JobService to submit a pipeline. >>>>>>>>> >>>>>>>>> What do you think about the plan? >>>>>>>>> >>>>>>>>> Btw, I feel that the thing currently called Runner, i.e. >>>>>>>>> FlinkRunner will go way in the long run and we will have >>>>>>>>> FlinkJobService, >>>>>>>>> SparkJobService and whatnot, what do you think? >>>>>>>>> >>>>>>>>> Aljoscha >>>>>>>>> >>>>>>>>> >>>>>>>>> On 9. Feb 2018, at 01:31, Ben Sidhom <sid...@google.com> wrote: >>>>>>>>> >>>>>>>>> Hey all, >>>>>>>>> >>>>>>>>> We're working on getting the portability framework plumbed through >>>>>>>>> the Flink runner. The first iteration will likely only support batch >>>>>>>>> and >>>>>>>>> will be limited in its deployment flexibility, but hopefully it >>>>>>>>> shouldn't >>>>>>>>> be too painful to expand this. >>>>>>>>> >>>>>>>>> We have the start of a tracking doc here: >>>>>>>>> https://s.apache.org/portable-beam-on-flink. >>>>>>>>> >>>>>>>>> We've documented the general deployment strategy here: >>>>>>>>> https://s.apache.org/portable-flink-runner-overview. >>>>>>>>> >>>>>>>>> Feel free to provide comments on the docs or jump in on any of the >>>>>>>>> referenced bugs. >>>>>>>>> >>>>>>>>> -- >>>>>>>>> -Ben >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> -Ben >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >>