This is an automated email from the ASF dual-hosted git repository. robertwb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 06054cc [BEAM-6865] DefaultJobBundleFactory: create using default environmentFactoryProviderMap new c961139 Merge pull request #8086 [BEAM-6865] DefaultJobBundleFactory and environmentFactoryProviderMap 06054cc is described below commit 06054ccae7995ed2a83d6d9653b911abb16636d7 Author: Kyle Weaver <kcwea...@google.com> AuthorDate: Mon Mar 18 16:07:30 2019 -0700 [BEAM-6865] DefaultJobBundleFactory: create using default environmentFactoryProviderMap --- .../FlinkDefaultExecutableStageContext.java | 23 +-------------------- .../control/DefaultJobBundleFactory.java | 24 ++++++++++++++++++++++ 2 files changed, 25 insertions(+), 22 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java index ccad674..1632a4a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java @@ -21,43 +21,22 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import org.apache.beam.model.pipeline.v1.RunnerApi.StandardEnvironments; -import org.apache.beam.runners.core.construction.BeamUrns; -import org.apache.beam.runners.core.construction.Environments; import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; import org.apache.beam.runners.core.construction.graph.ExecutableStage; import org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory; import org.apache.beam.runners.fnexecution.control.JobBundleFactory; import org.apache.beam.runners.fnexecution.control.StageBundleFactory; -import org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory; -import org.apache.beam.runners.fnexecution.environment.EmbeddedEnvironmentFactory; -import org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory; -import org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory; import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.sdk.options.PortablePipelineOptions; import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions; -import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; /** Implementation of a {@link FlinkExecutableStageContext}. */ class FlinkDefaultExecutableStageContext implements FlinkExecutableStageContext, AutoCloseable { private final JobBundleFactory jobBundleFactory; private static FlinkDefaultExecutableStageContext create(JobInfo jobInfo) { - JobBundleFactory jobBundleFactory = - DefaultJobBundleFactory.create( - jobInfo, - ImmutableMap.of( - BeamUrns.getUrn(StandardEnvironments.Environments.DOCKER), - new DockerEnvironmentFactory.Provider( - PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions())), - BeamUrns.getUrn(StandardEnvironments.Environments.PROCESS), - new ProcessEnvironmentFactory.Provider(), - BeamUrns.getUrn(StandardEnvironments.Environments.EXTERNAL), - new ExternalEnvironmentFactory.Provider(), - Environments.ENVIRONMENT_EMBEDDED, // Non Public urn for testing. - new EmbeddedEnvironmentFactory.Provider( - PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions())))); + JobBundleFactory jobBundleFactory = DefaultJobBundleFactory.create(jobInfo); return new FlinkDefaultExecutableStageContext(jobBundleFactory); } diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java index 7a6e647..5e9ae9d 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java @@ -26,6 +26,10 @@ import java.util.concurrent.Executors; import javax.annotation.concurrent.ThreadSafe; import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target; import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.model.pipeline.v1.RunnerApi.StandardEnvironments; +import org.apache.beam.runners.core.construction.BeamUrns; +import org.apache.beam.runners.core.construction.Environments; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; import org.apache.beam.runners.core.construction.graph.ExecutableStage; import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider; import org.apache.beam.runners.fnexecution.GrpcFnServer; @@ -35,7 +39,11 @@ import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrie import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.ExecutableProcessBundleDescriptor; import org.apache.beam.runners.fnexecution.control.SdkHarnessClient.BundleProcessor; import org.apache.beam.runners.fnexecution.data.GrpcDataService; +import org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory; +import org.apache.beam.runners.fnexecution.environment.EmbeddedEnvironmentFactory; import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory; +import org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory; +import org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory; import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment; import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService; import org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter; @@ -78,6 +86,22 @@ public class DefaultJobBundleFactory implements JobBundleFactory { private final MapControlClientPool clientPool; private final IdGenerator stageIdGenerator; + public static DefaultJobBundleFactory create(JobInfo jobInfo) { + Map<String, EnvironmentFactory.Provider> environmentFactoryProviderMap = + ImmutableMap.of( + BeamUrns.getUrn(StandardEnvironments.Environments.DOCKER), + new DockerEnvironmentFactory.Provider( + PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions())), + BeamUrns.getUrn(StandardEnvironments.Environments.PROCESS), + new ProcessEnvironmentFactory.Provider(), + BeamUrns.getUrn(StandardEnvironments.Environments.EXTERNAL), + new ExternalEnvironmentFactory.Provider(), + Environments.ENVIRONMENT_EMBEDDED, // Non Public urn for testing. + new EmbeddedEnvironmentFactory.Provider( + PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions()))); + return new DefaultJobBundleFactory(jobInfo, environmentFactoryProviderMap); + } + public static DefaultJobBundleFactory create( JobInfo jobInfo, Map<String, EnvironmentFactory.Provider> environmentFactoryProviderMap) { return new DefaultJobBundleFactory(jobInfo, environmentFactoryProviderMap);