This is an automated email from the ASF dual-hosted git repository. robertwb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 7325762 [BEAM-6864] Pass JobInfo to PortablePipelineRunner.run new 3beb96d Merge pull request #8087 [BEAM-6864] Pass JobInfo to PortablePipelineRunner.run 7325762 is described below commit 7325762b5077e05329cc6038701691e6304d64fd Author: Kyle Weaver <kcwea...@google.com> AuthorDate: Mon Mar 18 16:52:00 2019 -0700 [BEAM-6864] Pass JobInfo to PortablePipelineRunner.run --- .../apache/beam/runners/flink/FlinkJobInvoker.java | 11 ++++++++-- .../beam/runners/flink/FlinkPipelineRunner.java | 24 +++++----------------- .../fnexecution/jobsubmission/JobInvocation.java | 11 +++++----- .../jobsubmission/PortablePipelineRunner.java | 3 ++- .../beam/runners/samza/SamzaJobServerDriver.java | 9 +++++++- .../beam/runners/samza/SamzaPipelineRunner.java | 3 ++- 6 files changed, 32 insertions(+), 29 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java index 38dcdc8..3dc22c2 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java @@ -27,6 +27,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.sdk.options.PortablePipelineOptions; import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Struct; import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService; @@ -93,8 +94,14 @@ public class FlinkJobInvoker extends JobInvoker { FlinkPipelineOptions flinkOptions, @Nullable String confDir, List<String> filesToStage) { + JobInfo jobInfo = + JobInfo.create( + invocationId, + flinkOptions.getJobName(), + retrievalToken, + PipelineOptionsTranslation.toProto(flinkOptions)); FlinkPipelineRunner pipelineRunner = - new FlinkPipelineRunner(invocationId, retrievalToken, flinkOptions, confDir, filesToStage); - return new JobInvocation(invocationId, executorService, pipeline, pipelineRunner); + new FlinkPipelineRunner(flinkOptions, confDir, filesToStage); + return new JobInvocation(jobInfo, executorService, pipeline, pipelineRunner); } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java index c340e23..7bd2709 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java @@ -24,7 +24,6 @@ import java.util.List; import javax.annotation.Nullable; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; -import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; import org.apache.beam.runners.core.construction.graph.ExecutableStage; import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; import org.apache.beam.runners.core.construction.graph.PipelineTrimmer; @@ -40,27 +39,19 @@ import org.slf4j.LoggerFactory; public class FlinkPipelineRunner implements PortablePipelineRunner { private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineRunner.class); - private final String id; - private final String retrievalToken; private final FlinkPipelineOptions pipelineOptions; private final String confDir; private final List<String> filesToStage; public FlinkPipelineRunner( - String id, - String retrievalToken, - FlinkPipelineOptions pipelineOptions, - @Nullable String confDir, - List<String> filesToStage) { - this.id = id; - this.retrievalToken = retrievalToken; + FlinkPipelineOptions pipelineOptions, @Nullable String confDir, List<String> filesToStage) { this.pipelineOptions = pipelineOptions; this.confDir = confDir; this.filesToStage = filesToStage; } @Override - public PipelineResult run(final Pipeline pipeline) throws Exception { + public PipelineResult run(final Pipeline pipeline, JobInfo jobInfo) throws Exception { MetricsEnvironment.setMetricsSupported(false); FlinkPortablePipelineTranslator<?> translator; @@ -70,12 +61,13 @@ public class FlinkPipelineRunner implements PortablePipelineRunner { } else { translator = new FlinkStreamingPortablePipelineTranslator(); } - return runPipelineWithTranslator(pipeline, translator); + return runPipelineWithTranslator(pipeline, jobInfo, translator); } private <T extends FlinkPortablePipelineTranslator.TranslationContext> PipelineResult runPipelineWithTranslator( - final Pipeline pipeline, FlinkPortablePipelineTranslator<T> translator) throws Exception { + final Pipeline pipeline, JobInfo jobInfo, FlinkPortablePipelineTranslator<T> translator) + throws Exception { LOG.info("Translating pipeline to Flink program."); // Don't let the fuser fuse any subcomponents of native transforms. @@ -88,12 +80,6 @@ public class FlinkPipelineRunner implements PortablePipelineRunner { .anyMatch(proto -> ExecutableStage.URN.equals(proto.getSpec().getUrn())) ? trimmedPipeline : GreedyPipelineFuser.fuse(trimmedPipeline).toPipeline(); - JobInfo jobInfo = - JobInfo.create( - id, - pipelineOptions.getJobName(), - retrievalToken, - PipelineOptionsTranslation.toProto(pipelineOptions)); FlinkPortablePipelineTranslator.Executor executor = translator.translate( diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java index 292e6e8..d32aef4 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java @@ -31,6 +31,7 @@ import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.FutureCallback; import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.Futures; @@ -46,7 +47,7 @@ public class JobInvocation { private final RunnerApi.Pipeline pipeline; private final PortablePipelineRunner pipelineRunner; - private final String id; + private final JobInfo jobInfo; private final ListeningExecutorService executorService; private List<Consumer<Enum>> stateObservers; private List<Consumer<JobMessage>> messageObservers; @@ -54,11 +55,11 @@ public class JobInvocation { @Nullable private ListenableFuture<PipelineResult> invocationFuture; public JobInvocation( - String id, + JobInfo jobInfo, ListeningExecutorService executorService, Pipeline pipeline, PortablePipelineRunner pipelineRunner) { - this.id = id; + this.jobInfo = jobInfo; this.executorService = executorService; this.pipeline = pipeline; this.pipelineRunner = pipelineRunner; @@ -69,7 +70,7 @@ public class JobInvocation { } private PipelineResult runPipeline() throws Exception { - return pipelineRunner.run(pipeline); + return pipelineRunner.run(pipeline, jobInfo); } /** Start the job. */ @@ -119,7 +120,7 @@ public class JobInvocation { /** @return Unique identifier for the job invocation. */ public String getId() { - return id; + return jobInfo.jobId(); } /** Cancel the job. */ diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineRunner.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineRunner.java index d6cf083..08adee6 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineRunner.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineRunner.java @@ -18,9 +18,10 @@ package org.apache.beam.runners.fnexecution.jobsubmission; import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.sdk.PipelineResult; /** Runs a portable Beam pipeline on some execution engine. */ public interface PortablePipelineRunner { - PipelineResult run(RunnerApi.Pipeline pipeline) throws Exception; + PipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) throws Exception; } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java index c582d17..9effd57 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java @@ -31,6 +31,7 @@ import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagin import org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService; import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Struct; import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService; import org.kohsuke.args4j.CmdLineException; @@ -102,7 +103,13 @@ public class SamzaJobServerDriver { String.format( "%s_%s", samzaPipelineOptions.getJobName(), UUID.randomUUID().toString()); SamzaPipelineRunner pipelineRunner = new SamzaPipelineRunner(samzaPipelineOptions); - return new JobInvocation(invocationId, executorService, pipeline, pipelineRunner); + JobInfo jobInfo = + JobInfo.create( + invocationId, + samzaPipelineOptions.getJobName(), + retrievalToken, + PipelineOptionsTranslation.toProto(samzaPipelineOptions)); + return new JobInvocation(jobInfo, executorService, pipeline, pipelineRunner); } }; return InMemoryJobService.create( diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java index 1a94e1d..4759d88 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java @@ -21,6 +21,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineRunner; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.runners.samza.util.PortablePipelineDotRenderer; import org.apache.beam.sdk.PipelineResult; import org.slf4j.Logger; @@ -34,7 +35,7 @@ public class SamzaPipelineRunner implements PortablePipelineRunner { private final SamzaPipelineOptions options; @Override - public PipelineResult run(final Pipeline pipeline) { + public PipelineResult run(final Pipeline pipeline, JobInfo jobInfo) { // Fused pipeline proto. final RunnerApi.Pipeline fusedPipeline = GreedyPipelineFuser.fuse(pipeline).toPipeline(); LOG.info("Portable pipeline to run:");