[ 
https://issues.apache.org/jira/browse/BEAM-5288?focusedWorklogId=148426&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-148426
 ]

ASF GitHub Bot logged work on BEAM-5288:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Sep/18 22:04
            Start Date: 26/Sep/18 22:04
    Worklog Time Spent: 10m 
      Work Description: tweise closed pull request #6441: [BEAM-5288] Support 
environment pipeline option in Java and Python.
URL: https://github.com/apache/beam/pull/6441
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto 
b/model/pipeline/src/main/proto/beam_runner_api.proto
index 72e608e00d2..194797f166c 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -992,7 +992,7 @@ message SideInput {
 // An environment for executing UDFs. By default, an SDK container URL, but
 // can also be a process forked by a command, or an externally managed process.
 message Environment {
-  // Deprecated
+  // Deprecated. Tracked in BEAM-5433
   string url = 1;
 
   // (Required) The URN of the payload
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
index c75be4e9ee7..2997e7222f3 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
@@ -18,6 +18,8 @@
 
 package org.apache.beam.runners.core.construction;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.MoreObjects;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
@@ -34,6 +36,8 @@
 import org.apache.beam.model.pipeline.v1.RunnerApi.ReadPayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.StandardEnvironments;
 import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
+import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.ByteString;
 import 
org.apache.beam.vendor.protobuf.v3.com.google.protobuf.InvalidProtocolBufferException;
 
 /** Utilities for interacting with portability {@link Environment 
environments}. */
@@ -49,6 +53,13 @@
 
   private static final EnvironmentIdExtractor DEFAULT_SPEC_EXTRACTOR = 
(transform) -> null;
 
+  private static final ObjectMapper MAPPER =
+      new ObjectMapper()
+          
.registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
+  public static final String ENVIRONMENT_DOCKER = "DOCKER";
+  public static final String ENVIRONMENT_PROCESS = "PROCESS";
+  public static final String ENVIRONMENT_EMBEDDED = "EMBEDDED"; // Non Public 
urn for testing
+
   /* For development, use the container build by the current user to ensure 
that the SDK harness and
    * the SDK agree on how they should interact. This should be changed to a 
version-specific
    * container during a release.
@@ -63,11 +74,20 @@
 
   private Environments() {}
 
-  public static Environment createOrGetDefaultDockerEnvironment(String url) {
-    if (Strings.isNullOrEmpty(url)) {
+  public static Environment createOrGetDefaultEnvironment(String type, String 
config) {
+    if (Strings.isNullOrEmpty(type)) {
       return JAVA_SDK_HARNESS_ENVIRONMENT;
     }
-    return createDockerEnvironment(url);
+
+    switch (type) {
+      case ENVIRONMENT_EMBEDDED:
+        return createEmbeddedEnvironment(config);
+      case ENVIRONMENT_PROCESS:
+        return createProcessEnvironment(config);
+      case ENVIRONMENT_DOCKER:
+      default:
+        return createDockerEnvironment(config);
+    }
   }
 
   public static Environment createDockerEnvironment(String dockerImageUrl) {
@@ -79,18 +99,46 @@ public static Environment createDockerEnvironment(String 
dockerImageUrl) {
         .build();
   }
 
+  private static Environment createProcessEnvironment(String config) {
+    try {
+      ProcessPayloadReferenceJSON payloadReferenceJSON =
+          MAPPER.readValue(config, ProcessPayloadReferenceJSON.class);
+      return createProcessEnvironment(
+          payloadReferenceJSON.getOs(),
+          payloadReferenceJSON.getArch(),
+          payloadReferenceJSON.getCommand(),
+          payloadReferenceJSON.getEnv());
+    } catch (IOException e) {
+      throw new RuntimeException(
+          String.format("Unable to parse process environment config: %s", 
config), e);
+    }
+  }
+
+  private static Environment createEmbeddedEnvironment(String config) {
+    return Environment.newBuilder()
+        .setUrn(ENVIRONMENT_EMBEDDED)
+        .setPayload(ByteString.copyFromUtf8(MoreObjects.firstNonNull(config, 
"")))
+        .build();
+  }
+
   public static Environment createProcessEnvironment(
       String os, String arch, String command, Map<String, String> env) {
+    ProcessPayload.Builder builder = ProcessPayload.newBuilder();
+    if (!Strings.isNullOrEmpty(os)) {
+      builder.setOs(os);
+    }
+    if (!Strings.isNullOrEmpty(arch)) {
+      builder.setArch(arch);
+    }
+    if (!Strings.isNullOrEmpty(command)) {
+      builder.setCommand(command);
+    }
+    if (env != null) {
+      builder.putAllEnv(env);
+    }
     return Environment.newBuilder()
         .setUrn(BeamUrns.getUrn(StandardEnvironments.Environments.PROCESS))
-        .setPayload(
-            ProcessPayload.newBuilder()
-                .setOs(os)
-                .setArch(arch)
-                .setCommand(command)
-                .putAllEnv(env)
-                .build()
-                .toByteString())
+        .setPayload(builder.build().toByteString())
         .build();
   }
 
@@ -161,4 +209,31 @@ private static String windowExtractor(PTransform transform)
         .getWindowFn()
         .getEnvironmentId();
   }
+
+  private static class ProcessPayloadReferenceJSON {
+    @Nullable private String os;
+    @Nullable private String arch;
+    @Nullable private String command;
+    @Nullable private Map<String, String> env;
+
+    @Nullable
+    public String getOs() {
+      return os;
+    }
+
+    @Nullable
+    public String getArch() {
+      return arch;
+    }
+
+    @Nullable
+    public String getCommand() {
+      return command;
+    }
+
+    @Nullable
+    public Map<String, String> getEnv() {
+      return env;
+    }
+  }
 }
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
index e733396c71b..4f73a7b243a 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
@@ -61,9 +61,11 @@ public static SdkComponents create() {
 
   public static SdkComponents create(PipelineOptions options) {
     SdkComponents sdkComponents = new SdkComponents();
+    PortablePipelineOptions portablePipelineOptions = 
options.as(PortablePipelineOptions.class);
     sdkComponents.registerEnvironment(
-        Environments.createOrGetDefaultDockerEnvironment(
-            
options.as(PortablePipelineOptions.class).getDefaultJavaEnvironmentUrl()));
+        Environments.createOrGetDefaultEnvironment(
+            portablePipelineOptions.getDefaultEnvironmentType(),
+            portablePipelineOptions.getDefaultEnvironmentConfig()));
     return sdkComponents;
   }
 
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EnvironmentsTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EnvironmentsTest.java
index 93ea0251cf1..b9aae9df531 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EnvironmentsTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EnvironmentsTest.java
@@ -26,11 +26,14 @@
 import java.io.Serializable;
 import java.util.Optional;
 import org.apache.beam.model.pipeline.v1.RunnerApi.CombinePayload;
+import org.apache.beam.model.pipeline.v1.RunnerApi.DockerPayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
 import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
+import org.apache.beam.model.pipeline.v1.RunnerApi.ProcessPayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.ReadPayload;
+import org.apache.beam.model.pipeline.v1.RunnerApi.StandardEnvironments;
 import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
@@ -54,6 +57,44 @@
 /** Tests for {@link Environments}. */
 @RunWith(JUnit4.class)
 public class EnvironmentsTest implements Serializable {
+  @Test
+  public void createEnvironments() throws IOException {
+    assertThat(
+        
Environments.createOrGetDefaultEnvironment(Environments.ENVIRONMENT_DOCKER, 
"java"),
+        is(
+            Environment.newBuilder()
+                .setUrl("java")
+                
.setUrn(BeamUrns.getUrn(StandardEnvironments.Environments.DOCKER))
+                .setPayload(
+                    
DockerPayload.newBuilder().setContainerImage("java").build().toByteString())
+                .build()));
+    assertThat(
+        Environments.createOrGetDefaultEnvironment(
+            Environments.ENVIRONMENT_PROCESS,
+            "{\"os\": \"linux\", \"arch\": \"amd64\", \"command\": \"run.sh\", 
\"env\":{\"k1\": \"v1\", \"k2\": \"v2\"} }"),
+        is(
+            Environment.newBuilder()
+                
.setUrn(BeamUrns.getUrn(StandardEnvironments.Environments.PROCESS))
+                .setPayload(
+                    ProcessPayload.newBuilder()
+                        .setOs("linux")
+                        .setArch("amd64")
+                        .setCommand("run.sh")
+                        .putEnv("k1", "v1")
+                        .putEnv("k2", "v2")
+                        .build()
+                        .toByteString())
+                .build()));
+    assertThat(
+        Environments.createOrGetDefaultEnvironment(
+            Environments.ENVIRONMENT_PROCESS, "{\"command\": \"run.sh\"}"),
+        is(
+            Environment.newBuilder()
+                
.setUrn(BeamUrns.getUrn(StandardEnvironments.Environments.PROCESS))
+                
.setPayload(ProcessPayload.newBuilder().setCommand("run.sh").build().toByteString())
+                .build()));
+  }
+
   @Test
   public void getEnvironmentUnknownFnType() throws IOException {
     SdkComponents components = SdkComponents.create();
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
index ae4eb57fb8a..056a16ba817 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
@@ -195,8 +195,7 @@ public void execute() throws Exception {
             GrpcFnServer.allocatePortAndCreateFor(GrpcStateService.create(), 
serverFactory)) {
 
       EnvironmentFactory environmentFactory =
-          createEnvironmentFactory(
-              control, logging, artifact, provisioning, 
controlClientPool.getSource());
+          createEnvironmentFactory(control, logging, artifact, provisioning, 
controlClientPool);
       JobBundleFactory jobBundleFactory =
           SingleEnvironmentInstanceJobBundleFactory.create(environmentFactory, 
data, state);
 
@@ -234,19 +233,20 @@ private EnvironmentFactory createEnvironmentFactory(
       GrpcFnServer<GrpcLoggingService> logging,
       GrpcFnServer<ArtifactRetrievalService> artifact,
       GrpcFnServer<StaticGrpcProvisionService> provisioning,
-      ControlClientPool.Source controlClientSource) {
+      ControlClientPool controlClient) {
     switch (environmentType) {
       case DOCKER:
-        return DockerEnvironmentFactory.forServices(
-            control,
-            logging,
-            artifact,
-            provisioning,
-            controlClientSource,
-            IdGenerators.incrementingLongs());
+        return new DockerEnvironmentFactory.Provider()
+            .createEnvironmentFactory(
+                control,
+                logging,
+                artifact,
+                provisioning,
+                controlClient,
+                IdGenerators.incrementingLongs());
       case IN_PROCESS:
         return InProcessEnvironmentFactory.create(
-            PipelineOptionsFactory.create(), logging, control, 
controlClientSource);
+            PipelineOptionsFactory.create(), logging, control, 
controlClient.getSource());
       default:
         throw new IllegalArgumentException(
             String.format("Unknown %s %s", 
EnvironmentType.class.getSimpleName(), environmentType));
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java
index e7aeb475aa4..015af26c2e1 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java
@@ -17,10 +17,17 @@
  */
 package org.apache.beam.runners.flink.translation.functions;
 
+import com.google.common.collect.ImmutableMap;
+import org.apache.beam.model.pipeline.v1.RunnerApi.StandardEnvironments;
+import org.apache.beam.runners.core.construction.BeamUrns;
+import org.apache.beam.runners.core.construction.Environments;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
-import org.apache.beam.runners.fnexecution.control.DockerJobBundleFactory;
+import org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory;
 import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
 import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
+import 
org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory;
+import 
org.apache.beam.runners.fnexecution.environment.InProcessEnvironmentFactory;
+import 
org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 
 /** Implementation of a {@link FlinkExecutableStageContext}. */
@@ -28,7 +35,16 @@
   private final JobBundleFactory jobBundleFactory;
 
   private static FlinkDefaultExecutableStageContext create(JobInfo jobInfo) 
throws Exception {
-    JobBundleFactory jobBundleFactory = 
DockerJobBundleFactory.FACTORY.get().create(jobInfo);
+    JobBundleFactory jobBundleFactory =
+        DefaultJobBundleFactory.create(
+            jobInfo,
+            ImmutableMap.of(
+                BeamUrns.getUrn(StandardEnvironments.Environments.DOCKER),
+                new DockerEnvironmentFactory.Provider(),
+                BeamUrns.getUrn(StandardEnvironments.Environments.PROCESS),
+                new ProcessEnvironmentFactory.Provider(),
+                Environments.ENVIRONMENT_EMBEDDED, // Non Public urn for 
testing.
+                new InProcessEnvironmentFactory.Provider()));
     return new FlinkDefaultExecutableStageContext(jobBundleFactory);
   }
 
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
index 3c8bcd8f13b..0f89bc4b94c 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
@@ -30,25 +30,16 @@
 import java.util.concurrent.Executors;
 import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.Environments;
 import org.apache.beam.runners.core.construction.PipelineTranslation;
-import org.apache.beam.runners.fnexecution.GrpcFnServer;
-import org.apache.beam.runners.fnexecution.InProcessServerFactory;
-import org.apache.beam.runners.fnexecution.ServerFactory;
-import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
-import org.apache.beam.runners.fnexecution.control.ControlClientPool.Source;
-import org.apache.beam.runners.fnexecution.control.DockerJobBundleFactory;
-import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
-import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
-import 
org.apache.beam.runners.fnexecution.environment.InProcessEnvironmentFactory;
-import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
-import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
-import 
org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.fn.IdGenerator;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.testing.CrashingRunner;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.Impulse;
@@ -80,31 +71,8 @@
 
   private transient ListeningExecutorService flinkJobExecutor;
 
-  private DockerJobBundleFactory createJobBundleFactory(JobInfo jobInfo) 
throws Exception {
-    return new DockerJobBundleFactory(jobInfo) {
-
-      @Override
-      protected ServerFactory getServerFactory() {
-        return InProcessServerFactory.create();
-      }
-
-      @Override
-      protected EnvironmentFactory getEnvironmentFactory(
-          GrpcFnServer<FnApiControlClientPoolService> controlServer,
-          GrpcFnServer<GrpcLoggingService> loggingServer,
-          GrpcFnServer<ArtifactRetrievalService> retrievalServer,
-          GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
-          Source clientSource,
-          IdGenerator idGenerator) {
-        return InProcessEnvironmentFactory.create(
-            PipelineOptionsFactory.create(), loggingServer, controlServer, 
clientSource);
-      }
-    };
-  }
-
   @Before
   public void setup() {
-    DockerJobBundleFactory.FACTORY.set(this::createJobBundleFactory);
     flinkJobExecutor = 
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
   }
 
@@ -117,7 +85,14 @@ public void tearDown() {
 
   @Test
   public void testExecution() throws Exception {
-    Pipeline p = Pipeline.create();
+    PipelineOptions options = PipelineOptionsFactory.create();
+    options.setRunner(CrashingRunner.class);
+    options.as(FlinkPipelineOptions.class).setFlinkMaster("[local]");
+    options.as(FlinkPipelineOptions.class).setStreaming(isStreaming);
+    options
+        .as(PortablePipelineOptions.class)
+        .setDefaultEnvironmentType(Environments.ENVIRONMENT_EMBEDDED);
+    Pipeline p = Pipeline.create(options);
     p.apply("impulse", Impulse.create())
         .apply(
             "create",
@@ -158,16 +133,13 @@ public void process(ProcessContext ctx) {
 
     outputValues.clear();
     // execute the pipeline
-    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
-    options.setFlinkMaster("[local]");
-    options.setStreaming(isStreaming);
     FlinkJobInvocation jobInvocation =
         FlinkJobInvocation.create(
             "fakeId",
             "fakeRetrievalToken",
             flinkJobExecutor,
             pipelineProto,
-            options,
+            options.as(FlinkPipelineOptions.class),
             Collections.EMPTY_LIST);
     jobInvocation.start();
     long timeout = System.currentTimeMillis() + 60 * 1000;
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/JobBundleFactoryBase.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
similarity index 68%
rename from 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/JobBundleFactoryBase.java
rename to 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
index 551dbe82c3b..e24a3319fab 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/JobBundleFactoryBase.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.fnexecution.control;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
@@ -29,6 +30,7 @@
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
@@ -53,92 +55,81 @@
 import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.fn.IdGenerators;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.fn.function.ThrowingFunction;
 import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A base for a {@link JobBundleFactory} for which the implementation can 
specify a custom {@link
+ * A {@link JobBundleFactory} for which the implementation can specify a 
custom {@link
  * EnvironmentFactory} for environment management. Note that returned {@link 
StageBundleFactory
  * stage bundle factories} are not thread-safe. Instead, a new stage factory 
should be created for
- * each client.
+ * each client. {@link DefaultJobBundleFactory} initializes the Environment 
lazily when the forStage
+ * is called for a stage. This factory is not capable of handling a mixed 
types of environment.
  */
 @ThreadSafe
-public abstract class JobBundleFactoryBase implements JobBundleFactory {
-  private static final Logger LOG = 
LoggerFactory.getLogger(JobBundleFactoryBase.class);
+public class DefaultJobBundleFactory implements JobBundleFactory {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DefaultJobBundleFactory.class);
 
   private final IdGenerator stageIdGenerator;
-  private final GrpcFnServer<FnApiControlClientPoolService> controlServer;
-  private final GrpcFnServer<GrpcLoggingService> loggingServer;
-  private final GrpcFnServer<ArtifactRetrievalService> retrievalServer;
-  private final GrpcFnServer<StaticGrpcProvisionService> provisioningServer;
-
   private final LoadingCache<Environment, WrappedSdkHarnessClient> 
environmentCache;
+  // Using environment as the initialization marker.
+  private Environment environment;
+  private ExecutorService executor;
+  private GrpcFnServer<FnApiControlClientPoolService> controlServer;
+  private GrpcFnServer<GrpcLoggingService> loggingServer;
+  private GrpcFnServer<ArtifactRetrievalService> retrievalServer;
+  private GrpcFnServer<StaticGrpcProvisionService> provisioningServer;
+  private GrpcFnServer<GrpcDataService> dataServer;
+  private GrpcFnServer<GrpcStateService> stateServer;
+  private MapControlClientPool clientPool;
+  private EnvironmentFactory environmentFactory;
 
-  JobBundleFactoryBase(JobInfo jobInfo) throws Exception {
-    ServerFactory serverFactory = getServerFactory();
-    IdGenerator stageIdGenerator = IdGenerators.incrementingLongs();
-    ControlClientPool clientPool = MapControlClientPool.create();
+  public static DefaultJobBundleFactory create(
+      JobInfo jobInfo, Map<String, EnvironmentFactory.Provider> 
environmentFactoryProviderMap) {
+    return new DefaultJobBundleFactory(jobInfo, environmentFactoryProviderMap);
+  }
 
-    GrpcFnServer<FnApiControlClientPoolService> controlServer =
-        GrpcFnServer.allocatePortAndCreateFor(
-            FnApiControlClientPoolService.offeringClientsToPool(
-                clientPool.getSink(), 
GrpcContextHeaderAccessorProvider.getHeaderAccessor()),
-            serverFactory);
-    GrpcFnServer<GrpcLoggingService> loggingServer =
-        GrpcFnServer.allocatePortAndCreateFor(
-            GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), 
serverFactory);
-    GrpcFnServer<ArtifactRetrievalService> retrievalServer =
-        GrpcFnServer.allocatePortAndCreateFor(
-            BeamFileSystemArtifactRetrievalService.create(), serverFactory);
-    GrpcFnServer<StaticGrpcProvisionService> provisioningServer =
-        GrpcFnServer.allocatePortAndCreateFor(
-            StaticGrpcProvisionService.create(jobInfo.toProvisionInfo()), 
serverFactory);
-    EnvironmentFactory environmentFactory =
-        getEnvironmentFactory(
-            controlServer,
-            loggingServer,
-            retrievalServer,
-            provisioningServer,
-            clientPool.getSource(),
-            IdGenerators.incrementingLongs());
+  DefaultJobBundleFactory(
+      JobInfo jobInfo, Map<String, EnvironmentFactory.Provider> 
environmentFactoryMap) {
+    IdGenerator stageIdGenerator = IdGenerators.incrementingLongs();
     this.stageIdGenerator = stageIdGenerator;
-    this.controlServer = controlServer;
-    this.loggingServer = loggingServer;
-    this.retrievalServer = retrievalServer;
-    this.provisioningServer = provisioningServer;
-    this.environmentCache = createEnvironmentCache(environmentFactory, 
serverFactory);
+    this.environmentCache =
+        createEnvironmentCache(
+            (environment) -> {
+              synchronized (this) {
+                checkAndInitialize(jobInfo, environmentFactoryMap, 
environment);
+              }
+              return environmentFactory.createEnvironment(environment);
+            });
   }
 
   @VisibleForTesting
-  JobBundleFactoryBase(
+  DefaultJobBundleFactory(
       EnvironmentFactory environmentFactory,
-      ServerFactory serverFactory,
       IdGenerator stageIdGenerator,
       GrpcFnServer<FnApiControlClientPoolService> controlServer,
       GrpcFnServer<GrpcLoggingService> loggingServer,
       GrpcFnServer<ArtifactRetrievalService> retrievalServer,
-      GrpcFnServer<StaticGrpcProvisionService> provisioningServer) {
+      GrpcFnServer<StaticGrpcProvisionService> provisioningServer,
+      GrpcFnServer<GrpcDataService> dataServer,
+      GrpcFnServer<GrpcStateService> stateServer)
+      throws Exception {
+    this.executor = Executors.newCachedThreadPool();
     this.stageIdGenerator = stageIdGenerator;
     this.controlServer = controlServer;
     this.loggingServer = loggingServer;
     this.retrievalServer = retrievalServer;
     this.provisioningServer = provisioningServer;
-    this.environmentCache = createEnvironmentCache(environmentFactory, 
serverFactory);
+    this.dataServer = dataServer;
+    this.stateServer = stateServer;
+    this.environmentCache =
+        createEnvironmentCache((env) -> 
environmentFactory.createEnvironment(env));
   }
 
-  /** Create {@link EnvironmentFactory} for the given services. */
-  abstract EnvironmentFactory getEnvironmentFactory(
-      GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
-      GrpcFnServer<GrpcLoggingService> loggingServiceServer,
-      GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
-      GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
-      ControlClientPool.Source clientSource,
-      IdGenerator idGenerator);
-
   private LoadingCache<Environment, WrappedSdkHarnessClient> 
createEnvironmentCache(
-      EnvironmentFactory environmentFactory, ServerFactory serverFactory) {
+      ThrowingFunction<Environment, RemoteEnvironment> environmentCreator) {
     return CacheBuilder.newBuilder()
         .removalListener(
             ((RemovalNotification<Environment, WrappedSdkHarnessClient> 
notification) -> {
@@ -154,9 +145,8 @@ abstract EnvironmentFactory getEnvironmentFactory(
             new CacheLoader<Environment, WrappedSdkHarnessClient>() {
               @Override
               public WrappedSdkHarnessClient load(Environment environment) 
throws Exception {
-                RemoteEnvironment remoteEnvironment =
-                    environmentFactory.createEnvironment(environment);
-                return WrappedSdkHarnessClient.wrapping(remoteEnvironment, 
serverFactory);
+                return WrappedSdkHarnessClient.wrapping(
+                    environmentCreator.apply(environment), dataServer);
               }
             });
   }
@@ -171,12 +161,12 @@ public StageBundleFactory forStage(ExecutableStage 
executableStage) {
           ProcessBundleDescriptors.fromExecutableStage(
               stageIdGenerator.getId(),
               executableStage,
-              wrappedClient.getDataServer().getApiServiceDescriptor(),
-              wrappedClient.getStateServer().getApiServiceDescriptor());
+              dataServer.getApiServiceDescriptor(),
+              stateServer.getApiServiceDescriptor());
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
-    return SimpleStageBundleFactory.create(wrappedClient, 
processBundleDescriptor);
+    return SimpleStageBundleFactory.create(wrappedClient, 
processBundleDescriptor, stateServer);
   }
 
   @Override
@@ -186,14 +176,13 @@ public void close() throws Exception {
     environmentCache.cleanUp();
 
     // Tear down common servers.
+    stateServer.close();
+    dataServer.close();
     controlServer.close();
     loggingServer.close();
     retrievalServer.close();
     provisioningServer.close();
-  }
-
-  protected ServerFactory getServerFactory() {
-    return ServerFactory.createDefault();
+    executor.shutdown();
   }
 
   /** A simple stage bundle factory for remotely processing bundles. */
@@ -207,7 +196,8 @@ protected ServerFactory getServerFactory() {
 
     static SimpleStageBundleFactory create(
         WrappedSdkHarnessClient wrappedClient,
-        ExecutableProcessBundleDescriptor processBundleDescriptor) {
+        ExecutableProcessBundleDescriptor processBundleDescriptor,
+        GrpcFnServer<GrpcStateService> stateServer) {
       @SuppressWarnings("unchecked")
       BundleProcessor processor =
           wrappedClient
@@ -215,7 +205,7 @@ static SimpleStageBundleFactory create(
               .getProcessor(
                   processBundleDescriptor.getProcessBundleDescriptor(),
                   processBundleDescriptor.getRemoteInputDestinations(),
-                  wrappedClient.getStateServer().getService());
+                  stateServer.getService());
       return new SimpleStageBundleFactory(processBundleDescriptor, processor, 
wrappedClient);
     }
 
@@ -269,39 +259,20 @@ public void close() throws Exception {
    * packaged here to tie server lifetimes to harness client lifetimes.
    */
   protected static class WrappedSdkHarnessClient implements AutoCloseable {
+
     private final RemoteEnvironment environment;
-    private final ExecutorService executor;
-    // TODO: How should data server lifetime be scoped? It is necessary here 
for now because
-    // SdkHarnessClient requires one at construction.
-    private final GrpcFnServer<GrpcDataService> dataServer;
-    private final GrpcFnServer<GrpcStateService> stateServer;
     private final SdkHarnessClient client;
 
     static WrappedSdkHarnessClient wrapping(
-        RemoteEnvironment environment, ServerFactory serverFactory) throws 
Exception {
-      ExecutorService executor = Executors.newCachedThreadPool();
-      GrpcFnServer<GrpcDataService> dataServer =
-          GrpcFnServer.allocatePortAndCreateFor(
-              GrpcDataService.create(executor, 
OutboundObserverFactory.serverDirect()),
-              serverFactory);
-      GrpcFnServer<GrpcStateService> stateServer =
-          GrpcFnServer.allocatePortAndCreateFor(GrpcStateService.create(), 
serverFactory);
+        RemoteEnvironment environment, GrpcFnServer<GrpcDataService> 
dataServer) {
       SdkHarnessClient client =
           SdkHarnessClient.usingFnApiClient(
               environment.getInstructionRequestHandler(), 
dataServer.getService());
-      return new WrappedSdkHarnessClient(environment, executor, dataServer, 
stateServer, client);
+      return new WrappedSdkHarnessClient(environment, client);
     }
 
-    private WrappedSdkHarnessClient(
-        RemoteEnvironment environment,
-        ExecutorService executor,
-        GrpcFnServer<GrpcDataService> dataServer,
-        GrpcFnServer<GrpcStateService> stateServer,
-        SdkHarnessClient client) {
-      this.executor = executor;
+    private WrappedSdkHarnessClient(RemoteEnvironment environment, 
SdkHarnessClient client) {
       this.environment = environment;
-      this.dataServer = dataServer;
-      this.stateServer = stateServer;
       this.client = client;
     }
 
@@ -309,23 +280,69 @@ SdkHarnessClient getClient() {
       return client;
     }
 
-    GrpcFnServer<GrpcStateService> getStateServer() {
-      return stateServer;
-    }
-
-    GrpcFnServer<GrpcDataService> getDataServer() {
-      return dataServer;
-    }
-
     @Override
     public void close() throws Exception {
-      try (AutoCloseable stateServerCloser = stateServer;
-          AutoCloseable dataServerCloser = dataServer;
-          AutoCloseable envCloser = environment;
-          AutoCloseable executorCloser = executor::shutdown) {
+      try (AutoCloseable envCloser = environment) {
         // Wrap resources in try-with-resources to ensure all are cleaned up.
       }
       // TODO: Wait for executor shutdown?
     }
   }
+
+  @GuardedBy("this")
+  private void checkAndInitialize(
+      JobInfo jobInfo,
+      Map<String, EnvironmentFactory.Provider> environmentFactoryProviderMap,
+      Environment environment)
+      throws IOException {
+    Preconditions.checkNotNull(environment, "Environment can not be null");
+    if (this.environment != null) {
+      Preconditions.checkArgument(
+          this.environment.getUrn().equals(environment.getUrn()),
+          "Unsupported: Mixing environment types (%s, %s) is not supported for 
a job.",
+          this.environment.getUrn(),
+          environment.getUrn());
+      // Nothing to do. Already initialized.
+      return;
+    }
+
+    EnvironmentFactory.Provider environmentProviderFactory =
+        environmentFactoryProviderMap.get(environment.getUrn());
+    ServerFactory serverFactory = 
environmentProviderFactory.getServerFactory();
+
+    this.clientPool = MapControlClientPool.create();
+    this.executor = Executors.newCachedThreadPool();
+    this.controlServer =
+        GrpcFnServer.allocatePortAndCreateFor(
+            FnApiControlClientPoolService.offeringClientsToPool(
+                clientPool.getSink(), 
GrpcContextHeaderAccessorProvider.getHeaderAccessor()),
+            serverFactory);
+    this.loggingServer =
+        GrpcFnServer.allocatePortAndCreateFor(
+            GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), 
serverFactory);
+    this.retrievalServer =
+        GrpcFnServer.allocatePortAndCreateFor(
+            BeamFileSystemArtifactRetrievalService.create(), serverFactory);
+    this.provisioningServer =
+        GrpcFnServer.allocatePortAndCreateFor(
+            StaticGrpcProvisionService.create(jobInfo.toProvisionInfo()), 
serverFactory);
+    this.dataServer =
+        GrpcFnServer.allocatePortAndCreateFor(
+            GrpcDataService.create(executor, 
OutboundObserverFactory.serverDirect()),
+            serverFactory);
+    this.stateServer =
+        GrpcFnServer.allocatePortAndCreateFor(GrpcStateService.create(), 
serverFactory);
+
+    this.environmentFactory =
+        environmentFactoryProviderMap
+            .get(environment.getUrn())
+            .createEnvironmentFactory(
+                controlServer,
+                loggingServer,
+                retrievalServer,
+                provisioningServer,
+                clientPool,
+                stageIdGenerator);
+    this.environment = environment;
+  }
 }
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
deleted file mode 100644
index ee8ef520c04..00000000000
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.fnexecution.control;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.net.HostAndPort;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.annotation.concurrent.ThreadSafe;
-import org.apache.beam.runners.fnexecution.GrpcFnServer;
-import org.apache.beam.runners.fnexecution.ServerFactory;
-import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
-import 
org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory;
-import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
-import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
-import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
-import 
org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
-import org.apache.beam.sdk.fn.IdGenerator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A {@link JobBundleFactory} that uses a {@link DockerEnvironmentFactory} for 
environment
- * management. Note that returned {@link StageBundleFactory stage bundle 
factories} are not
- * thread-safe. Instead, a new stage factory should be created for each client.
- */
-@ThreadSafe
-public class DockerJobBundleFactory extends JobBundleFactoryBase {
-  private static final Logger LOG = 
LoggerFactory.getLogger(DockerJobBundleFactory.class);
-
-  /** Factory that creates {@link JobBundleFactory} for the given {@link 
JobInfo}. */
-  public interface JobBundleFactoryFactory {
-    JobBundleFactory create(JobInfo jobInfo) throws Exception;
-  }
-  // TODO (BEAM-4819): a hacky way to override the factory for testing.
-  // Should be replaced with mechanism that let's users configure their own 
factory
-  public static final AtomicReference<JobBundleFactoryFactory> FACTORY =
-      new AtomicReference(
-          new JobBundleFactoryFactory() {
-            @Override
-            public JobBundleFactory create(JobInfo jobInfo) throws Exception {
-              return new DockerJobBundleFactory(jobInfo);
-            }
-          });
-
-  protected DockerJobBundleFactory(JobInfo jobInfo) throws Exception {
-    super(jobInfo);
-  }
-
-  @VisibleForTesting
-  DockerJobBundleFactory(
-      EnvironmentFactory environmentFactory,
-      ServerFactory serverFactory,
-      IdGenerator stageIdGenerator,
-      GrpcFnServer<FnApiControlClientPoolService> controlServer,
-      GrpcFnServer<GrpcLoggingService> loggingServer,
-      GrpcFnServer<ArtifactRetrievalService> retrievalServer,
-      GrpcFnServer<StaticGrpcProvisionService> provisioningServer) {
-    super(
-        environmentFactory,
-        serverFactory,
-        stageIdGenerator,
-        controlServer,
-        loggingServer,
-        retrievalServer,
-        provisioningServer);
-  }
-
-  @Override
-  protected ServerFactory getServerFactory() {
-    switch (getPlatform()) {
-      case LINUX:
-        return ServerFactory.createDefault();
-      case MAC:
-        return DockerOnMac.getServerFactory();
-      default:
-        LOG.warn("Unknown Docker platform. Falling back to default server 
factory");
-        return ServerFactory.createDefault();
-    }
-  }
-
-  private static Platform getPlatform() {
-    String osName = System.getProperty("os.name").toLowerCase();
-    // TODO: Make this more robust?
-    // The DOCKER_MAC_CONTAINER environment variable is necessary to detect 
whether we run on
-    // a container on MacOs. MacOs internally uses a Linux VM which makes it 
indistinguishable from Linux.
-    // We still need to apply port mapping due to missing host networking.
-    if (osName.startsWith("mac") || DockerOnMac.RUNNING_INSIDE_DOCKER_ON_MAC) {
-      return Platform.MAC;
-    } else if (osName.startsWith("linux")) {
-      return Platform.LINUX;
-    }
-    return Platform.OTHER;
-  }
-
-  private enum Platform {
-    MAC,
-    LINUX,
-    OTHER,
-  }
-
-  /**
-   * NOTE: Deployment on Macs is intended for local development. As of 18.03, 
Docker-for-Mac does
-   * not implement host networking (--networking=host is effectively a no-op). 
Instead, we use a
-   * special DNS entry that points to the host:
-   * 
https://docs.docker.com/docker-for-mac/networking/#use-cases-and-workarounds 
The special
-   * hostname has historically changed between versions, so this is subject to 
breakages and will
-   * likely only support the latest version at any time.
-   */
-  private static class DockerOnMac {
-    // TODO: This host name seems to change with every other Docker release. 
Do we attempt to keep up
-    // or attempt to document the supported Docker version(s)?
-    private static final String DOCKER_FOR_MAC_HOST = "host.docker.internal";
-
-    // True if we're inside a container (i.e. job-server container) with MacOS 
as the host system
-    private static final boolean RUNNING_INSIDE_DOCKER_ON_MAC =
-        "1".equals(System.getenv("DOCKER_MAC_CONTAINER"));
-    // Port offset for MacOS since we don't have host networking and need to 
use published ports
-    private static final int MAC_PORT_START = 8100;
-    private static final int MAC_PORT_END = 8200;
-    private static final AtomicInteger MAC_PORT = new 
AtomicInteger(MAC_PORT_START);
-
-    private static ServerFactory getServerFactory() {
-      ServerFactory.UrlFactory dockerUrlFactory =
-          (host, port) -> HostAndPort.fromParts(DOCKER_FOR_MAC_HOST, 
port).toString();
-      if (RUNNING_INSIDE_DOCKER_ON_MAC) {
-        // If we're already running in a container, we need to use a fixed 
port range due to
-        // non-existing host networking in Docker-for-Mac. The port range 
needs to be published
-        // when bringing up the Docker container, see DockerEnvironmentFactory.
-        return ServerFactory.createWithUrlFactoryAndPortSupplier(
-            dockerUrlFactory,
-            // We only use the published Docker ports 8100-8200 in a 
round-robin fashion
-            () -> MAC_PORT.getAndUpdate(val -> val == MAC_PORT_END ? 
MAC_PORT_START : val + 1));
-      } else {
-        return ServerFactory.createWithUrlFactory(dockerUrlFactory);
-      }
-    }
-  }
-
-  /** Create {@link EnvironmentFactory} for the given services. */
-  @Override
-  protected EnvironmentFactory getEnvironmentFactory(
-      GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
-      GrpcFnServer<GrpcLoggingService> loggingServiceServer,
-      GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
-      GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
-      ControlClientPool.Source clientSource,
-      IdGenerator idGenerator) {
-    return DockerEnvironmentFactory.forServices(
-        controlServiceServer,
-        loggingServiceServer,
-        retrievalServiceServer,
-        provisioningServiceServer,
-        clientSource,
-        idGenerator);
-  }
-}
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessJobBundleFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessJobBundleFactory.java
deleted file mode 100644
index d16573389c8..00000000000
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessJobBundleFactory.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.fnexecution.control;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.beam.runners.fnexecution.GrpcFnServer;
-import org.apache.beam.runners.fnexecution.ServerFactory;
-import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
-import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
-import 
org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory;
-import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
-import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
-import 
org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
-import org.apache.beam.sdk.fn.IdGenerator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A {@link JobBundleFactoryBase} which uses a {@link 
ProcessEnvironmentFactory} to run the SDK
- * harness in an external process.
- */
-public class ProcessJobBundleFactory extends JobBundleFactoryBase {
-  private static final Logger LOG = 
LoggerFactory.getLogger(ProcessJobBundleFactory.class);
-
-  public static ProcessJobBundleFactory create(JobInfo jobInfo) throws 
Exception {
-    return new ProcessJobBundleFactory(jobInfo);
-  }
-
-  protected ProcessJobBundleFactory(JobInfo jobInfo) throws Exception {
-    super(jobInfo);
-  }
-
-  @VisibleForTesting
-  ProcessJobBundleFactory(
-      ProcessEnvironmentFactory envFactory,
-      ServerFactory serverFactory,
-      IdGenerator stageIdGenerator,
-      GrpcFnServer<FnApiControlClientPoolService> controlServer,
-      GrpcFnServer<GrpcLoggingService> loggingServer,
-      GrpcFnServer<ArtifactRetrievalService> retrievalServer,
-      GrpcFnServer<StaticGrpcProvisionService> provisioningServer) {
-    super(
-        envFactory,
-        serverFactory,
-        stageIdGenerator,
-        controlServer,
-        loggingServer,
-        retrievalServer,
-        provisioningServer);
-  }
-
-  @Override
-  protected EnvironmentFactory getEnvironmentFactory(
-      GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
-      GrpcFnServer<GrpcLoggingService> loggingServiceServer,
-      GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
-      GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
-      ControlClientPool.Source clientSource,
-      IdGenerator idGenerator) {
-    return ProcessEnvironmentFactory.create(
-        controlServiceServer,
-        loggingServiceServer,
-        retrievalServiceServer,
-        provisioningServiceServer,
-        clientSource,
-        idGenerator);
-  }
-}
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
index c50c9ad37ec..a6b4fbe10d0 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
@@ -43,7 +43,7 @@
 /**
  * A {@link JobBundleFactory} which can manage a single instance of an {@link 
Environment}.
  *
- * @deprecated replace with a {@link DockerJobBundleFactory} when appropriate 
if the {@link
+ * @deprecated replace with a {@link DefaultJobBundleFactory} when appropriate 
if the {@link
  *     EnvironmentFactory} is a {@link
  *     
org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory}, or 
create an
  *     {@code InProcessJobBundleFactory} and inline the creation of the 
environment if appropriate.
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java
index 6ea72383573..c403ff94b65 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java
@@ -21,15 +21,18 @@
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.net.HostAndPort;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.List;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
 import org.apache.beam.runners.core.construction.BeamUrns;
 import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.ServerFactory;
 import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
 import org.apache.beam.runners.fnexecution.control.ControlClientPool;
 import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
@@ -49,27 +52,6 @@
 
   private static final Logger LOG = 
LoggerFactory.getLogger(DockerEnvironmentFactory.class);
 
-  /**
-   * Returns a {@link DockerEnvironmentFactory} for the provided {@link 
GrpcFnServer servers} using
-   * the default {@link DockerCommand}.
-   */
-  public static DockerEnvironmentFactory forServices(
-      GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
-      GrpcFnServer<GrpcLoggingService> loggingServiceServer,
-      GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
-      GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
-      ControlClientPool.Source clientSource,
-      IdGenerator idGenerator) {
-    return forServicesWithDocker(
-        DockerCommand.getDefault(),
-        controlServiceServer,
-        loggingServiceServer,
-        retrievalServiceServer,
-        provisioningServiceServer,
-        clientSource,
-        idGenerator);
-  }
-
   static DockerEnvironmentFactory forServicesWithDocker(
       DockerCommand docker,
       GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
@@ -202,4 +184,97 @@ public RemoteEnvironment createEnvironment(Environment 
environment) throws Excep
       return ImmutableList.of();
     }
   }
+
+  /**
+   * NOTE: Deployment on Macs is intended for local development. As of 18.03, 
Docker-for-Mac does
+   * not implement host networking (--networking=host is effectively a no-op). 
Instead, we use a
+   * special DNS entry that points to the host:
+   * 
https://docs.docker.com/docker-for-mac/networking/#use-cases-and-workarounds 
The special
+   * hostname has historically changed between versions, so this is subject to 
breakages and will
+   * likely only support the latest version at any time.
+   */
+  private static class DockerOnMac {
+    // TODO: This host name seems to change with every other Docker release. 
Do we attempt to keep up
+    // or attempt to document the supported Docker version(s)?
+    private static final String DOCKER_FOR_MAC_HOST = "host.docker.internal";
+
+    // True if we're inside a container (i.e. job-server container) with MacOS 
as the host system
+    private static final boolean RUNNING_INSIDE_DOCKER_ON_MAC =
+        "1".equals(System.getenv("DOCKER_MAC_CONTAINER"));
+    // Port offset for MacOS since we don't have host networking and need to 
use published ports
+    private static final int MAC_PORT_START = 8100;
+    private static final int MAC_PORT_END = 8200;
+    private static final AtomicInteger MAC_PORT = new 
AtomicInteger(MAC_PORT_START);
+
+    private static ServerFactory getServerFactory() {
+      ServerFactory.UrlFactory dockerUrlFactory =
+          (host, port) -> HostAndPort.fromParts(DOCKER_FOR_MAC_HOST, 
port).toString();
+      if (RUNNING_INSIDE_DOCKER_ON_MAC) {
+        // If we're already running in a container, we need to use a fixed 
port range due to
+        // non-existing host networking in Docker-for-Mac. The port range 
needs to be published
+        // when bringing up the Docker container, see DockerEnvironmentFactory.
+        return ServerFactory.createWithUrlFactoryAndPortSupplier(
+            dockerUrlFactory,
+            // We only use the published Docker ports 8100-8200 in a 
round-robin fashion
+            () -> MAC_PORT.getAndUpdate(val -> val == MAC_PORT_END ? 
MAC_PORT_START : val + 1));
+      } else {
+        return ServerFactory.createWithUrlFactory(dockerUrlFactory);
+      }
+    }
+  }
+
+  /** Provider for DockerEnvironmentFactory. */
+  public static class Provider implements EnvironmentFactory.Provider {
+
+    @Override
+    public EnvironmentFactory createEnvironmentFactory(
+        GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
+        GrpcFnServer<GrpcLoggingService> loggingServiceServer,
+        GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
+        GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
+        ControlClientPool clientPool,
+        IdGenerator idGenerator) {
+      return DockerEnvironmentFactory.forServicesWithDocker(
+          DockerCommand.getDefault(),
+          controlServiceServer,
+          loggingServiceServer,
+          retrievalServiceServer,
+          provisioningServiceServer,
+          clientPool.getSource(),
+          idGenerator);
+    }
+
+    @Override
+    public ServerFactory getServerFactory() {
+      switch (getPlatform()) {
+        case LINUX:
+          return ServerFactory.createDefault();
+        case MAC:
+          return DockerOnMac.getServerFactory();
+        default:
+          LOG.warn("Unknown Docker platform. Falling back to default server 
factory");
+          return ServerFactory.createDefault();
+      }
+    }
+
+    private static Platform getPlatform() {
+      String osName = System.getProperty("os.name").toLowerCase();
+      // TODO: Make this more robust?
+      // The DOCKER_MAC_CONTAINER environment variable is necessary to detect 
whether we run on
+      // a container on MacOs. MacOs internally uses a Linux VM which makes it 
indistinguishable from Linux.
+      // We still need to apply port mapping due to missing host networking.
+      if (osName.startsWith("mac") || 
DockerOnMac.RUNNING_INSIDE_DOCKER_ON_MAC) {
+        return Platform.MAC;
+      } else if (osName.startsWith("linux")) {
+        return Platform.LINUX;
+      }
+      return Platform.OTHER;
+    }
+
+    private enum Platform {
+      MAC,
+      LINUX,
+      OTHER,
+    }
+  }
 }
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EnvironmentFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EnvironmentFactory.java
index cb423f434f1..f6559613f08 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EnvironmentFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EnvironmentFactory.java
@@ -20,10 +20,36 @@
 
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.ServerFactory;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
+import org.apache.beam.runners.fnexecution.control.ControlClientPool;
+import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
 import org.apache.beam.runners.fnexecution.control.SdkHarnessClient;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import 
org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
+import org.apache.beam.sdk.fn.IdGenerator;
 
 /** Creates {@link Environment environments} which communicate to an {@link 
SdkHarnessClient}. */
 public interface EnvironmentFactory {
   /** Creates an active {@link Environment} and returns a handle to it. */
   RemoteEnvironment createEnvironment(RunnerApi.Environment environment) 
throws Exception;
+
+  /** Provider for a {@link EnvironmentFactory} and {@link ServerFactory} for 
the environment. */
+  interface Provider {
+
+    /** Creates {@link EnvironmentFactory} for the provided GrpcServices. */
+    EnvironmentFactory createEnvironmentFactory(
+        GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
+        GrpcFnServer<GrpcLoggingService> loggingServiceServer,
+        GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
+        GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
+        ControlClientPool clientPool,
+        IdGenerator idGenerator);
+
+    /** Create the {@link ServerFactory} applicable to this environment. */
+    default ServerFactory getServerFactory() {
+      return ServerFactory.createDefault();
+    }
+  }
 }
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/InProcessEnvironmentFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/InProcessEnvironmentFactory.java
index 2c018f771c2..827dac34d00 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/InProcessEnvironmentFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/InProcessEnvironmentFactory.java
@@ -28,14 +28,20 @@
 import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
 import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.InProcessServerFactory;
+import org.apache.beam.runners.fnexecution.ServerFactory;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
 import org.apache.beam.runners.fnexecution.control.ControlClientPool;
 import org.apache.beam.runners.fnexecution.control.ControlClientPool.Source;
 import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
 import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
 import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import 
org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
+import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
 import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -123,4 +129,25 @@ public RemoteEnvironment createEnvironment(Environment 
environment) throws Excep
     InstructionRequestHandler handler = clientSource.take("", 
Duration.ofMinutes(1L));
     return RemoteEnvironment.forHandler(environment, handler);
   }
+
+  /** Provider of InProcessEnvironmentFactory. */
+  public static class Provider implements EnvironmentFactory.Provider {
+
+    @Override
+    public EnvironmentFactory createEnvironmentFactory(
+        GrpcFnServer<FnApiControlClientPoolService> controlServer,
+        GrpcFnServer<GrpcLoggingService> loggingServer,
+        GrpcFnServer<ArtifactRetrievalService> retrievalServer,
+        GrpcFnServer<StaticGrpcProvisionService> provisioningServer,
+        ControlClientPool clientPool,
+        IdGenerator idGenerator) {
+      return InProcessEnvironmentFactory.create(
+          PipelineOptionsFactory.create(), loggingServer, controlServer, 
clientPool.getSource());
+    }
+
+    @Override
+    public ServerFactory getServerFactory() {
+      return InProcessServerFactory.create();
+    }
+  }
 }
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironmentFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironmentFactory.java
index 3c109383a49..41f577c8806 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironmentFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironmentFactory.java
@@ -43,23 +43,6 @@
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ProcessEnvironmentFactory.class);
 
-  public static ProcessEnvironmentFactory create(
-      GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
-      GrpcFnServer<GrpcLoggingService> loggingServiceServer,
-      GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
-      GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
-      ControlClientPool.Source clientSource,
-      IdGenerator idGenerator) {
-    return create(
-        ProcessManager.create(),
-        controlServiceServer,
-        loggingServiceServer,
-        retrievalServiceServer,
-        provisioningServiceServer,
-        clientSource,
-        idGenerator);
-  }
-
   public static ProcessEnvironmentFactory create(
       ProcessManager processManager,
       GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
@@ -162,4 +145,25 @@ public RemoteEnvironment createEnvironment(Environment 
environment) throws Excep
 
     return ProcessEnvironment.create(processManager, environment, workerId, 
instructionHandler);
   }
+
+  /** Provider of ProcessEnvironmentFactory. */
+  public static class Provider implements EnvironmentFactory.Provider {
+    @Override
+    public EnvironmentFactory createEnvironmentFactory(
+        GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
+        GrpcFnServer<GrpcLoggingService> loggingServiceServer,
+        GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
+        GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
+        ControlClientPool clientPool,
+        IdGenerator idGenerator) {
+      return create(
+          ProcessManager.create(),
+          controlServiceServer,
+          loggingServiceServer,
+          retrievalServiceServer,
+          provisioningServiceServer,
+          clientPool.getSource(),
+          idGenerator);
+    }
+  }
 }
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/ProcessJobBundleFactoryTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java
similarity index 50%
rename from 
runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/ProcessJobBundleFactoryTest.java
rename to 
runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java
index 0c7354a4d0b..39984efe3ea 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/ProcessJobBundleFactoryTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java
@@ -23,8 +23,11 @@
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Coder;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
@@ -33,39 +36,49 @@
 import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
 import org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec;
 import org.apache.beam.model.pipeline.v1.RunnerApi.WindowingStrategy;
-import org.apache.beam.runners.core.construction.Environments;
 import org.apache.beam.runners.core.construction.ModelCoders;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.fnexecution.GrpcFnServer;
 import org.apache.beam.runners.fnexecution.ServerFactory;
 import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
-import 
org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory;
+import org.apache.beam.runners.fnexecution.data.GrpcDataService;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import 
org.apache.beam.runners.fnexecution.environment.EnvironmentFactory.Provider;
 import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
 import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import 
org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
+import org.apache.beam.runners.fnexecution.state.GrpcStateService;
 import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.fn.IdGenerators;
+import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.Struct;
+import org.hamcrest.Matchers;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 
-/** Tests for {@link ProcessEnvironmentFactory}. */
+/** Tests for {@link DefaultJobBundleFactory}. */
 @RunWith(JUnit4.class)
-public class ProcessJobBundleFactoryTest {
-
-  @Mock private ProcessEnvironmentFactory envFactory;
+public class DefaultJobBundleFactoryTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+  @Mock private EnvironmentFactory envFactory;
   @Mock private RemoteEnvironment remoteEnvironment;
   @Mock private InstructionRequestHandler instructionHandler;
-  @Mock private ServerFactory serverFactory;
   @Mock GrpcFnServer<FnApiControlClientPoolService> controlServer;
   @Mock GrpcFnServer<GrpcLoggingService> loggingServer;
   @Mock GrpcFnServer<ArtifactRetrievalService> retrievalServer;
   @Mock GrpcFnServer<StaticGrpcProvisionService> provisioningServer;
+  @Mock private GrpcFnServer<GrpcDataService> dataServer;
+  @Mock private GrpcFnServer<GrpcStateService> stateServer;
 
-  private final Environment environment = 
Environments.createDockerEnvironment("env-url");
+  private final Environment environment = 
Environment.newBuilder().setUrn("dummy:urn").build();
   private final IdGenerator stageIdGenerator = 
IdGenerators.incrementingLongs();
   private final InstructionResponse instructionResponse =
       
InstructionResponse.newBuilder().setInstructionId("instruction-id").build();
@@ -77,35 +90,139 @@ public void setUpMocks() throws Exception {
     
when(remoteEnvironment.getInstructionRequestHandler()).thenReturn(instructionHandler);
     when(instructionHandler.handle(any()))
         .thenReturn(CompletableFuture.completedFuture(instructionResponse));
+    when(dataServer.getApiServiceDescriptor())
+        .thenReturn(ApiServiceDescriptor.getDefaultInstance());
+    when(stateServer.getApiServiceDescriptor())
+        .thenReturn(ApiServiceDescriptor.getDefaultInstance());
   }
 
   @Test
   public void createsCorrectEnvironment() throws Exception {
-    try (ProcessJobBundleFactory bundleFactory =
-        new ProcessJobBundleFactory(
+    try (DefaultJobBundleFactory bundleFactory =
+        new DefaultJobBundleFactory(
             envFactory,
-            serverFactory,
             stageIdGenerator,
             controlServer,
             loggingServer,
             retrievalServer,
-            provisioningServer)) {
+            provisioningServer,
+            dataServer,
+            stateServer)) {
       bundleFactory.forStage(getExecutableStage(environment));
       verify(envFactory).createEnvironment(environment);
     }
   }
 
+  @Test
+  public void createsMultipleEnvironmentOfSingleType() throws Exception {
+    ServerFactory serverFactory = ServerFactory.createDefault();
+
+    Environment environmentA =
+        Environment.newBuilder()
+            .setUrn("env:urn:a")
+            .setPayload(ByteString.copyFrom(new byte[1]))
+            .build();
+    Environment environmentAA =
+        Environment.newBuilder()
+            .setUrn("env:urn:a")
+            .setPayload(ByteString.copyFrom(new byte[2]))
+            .build();
+    EnvironmentFactory envFactoryA = mock(EnvironmentFactory.class);
+    
when(envFactoryA.createEnvironment(environmentA)).thenReturn(remoteEnvironment);
+    
when(envFactoryA.createEnvironment(environmentAA)).thenReturn(remoteEnvironment);
+    EnvironmentFactory.Provider environmentProviderFactoryA =
+        mock(EnvironmentFactory.Provider.class);
+    when(environmentProviderFactoryA.createEnvironmentFactory(
+            any(), any(), any(), any(), any(), any()))
+        .thenReturn(envFactoryA);
+    
when(environmentProviderFactoryA.getServerFactory()).thenReturn(serverFactory);
+
+    Environment environmentB = 
Environment.newBuilder().setUrn("env:urn:b").build();
+    EnvironmentFactory envFactoryB = mock(EnvironmentFactory.class);
+    
when(envFactoryB.createEnvironment(environmentB)).thenReturn(remoteEnvironment);
+    EnvironmentFactory.Provider environmentProviderFactoryB =
+        mock(EnvironmentFactory.Provider.class);
+    when(environmentProviderFactoryB.createEnvironmentFactory(
+            any(), any(), any(), any(), any(), any()))
+        .thenReturn(envFactoryB);
+    
when(environmentProviderFactoryB.getServerFactory()).thenReturn(serverFactory);
+
+    Map<String, Provider> environmentFactoryProviderMap =
+        ImmutableMap.of(
+            environmentA.getUrn(), environmentProviderFactoryA,
+            environmentB.getUrn(), environmentProviderFactoryB);
+    try (DefaultJobBundleFactory bundleFactory =
+        DefaultJobBundleFactory.create(
+            JobInfo.create("testJob", "testJob", "token", 
Struct.getDefaultInstance()),
+            environmentFactoryProviderMap)) {
+      bundleFactory.forStage(getExecutableStage(environmentA));
+      verify(environmentProviderFactoryA, Mockito.times(1))
+          .createEnvironmentFactory(any(), any(), any(), any(), any(), any());
+      verify(environmentProviderFactoryB, Mockito.times(0))
+          .createEnvironmentFactory(any(), any(), any(), any(), any(), any());
+      verify(envFactoryA, Mockito.times(1)).createEnvironment(environmentA);
+      verify(envFactoryA, Mockito.times(0)).createEnvironment(environmentAA);
+
+      bundleFactory.forStage(getExecutableStage(environmentAA));
+      verify(environmentProviderFactoryA, Mockito.times(1))
+          .createEnvironmentFactory(any(), any(), any(), any(), any(), any());
+      verify(environmentProviderFactoryB, Mockito.times(0))
+          .createEnvironmentFactory(any(), any(), any(), any(), any(), any());
+      verify(envFactoryA, Mockito.times(1)).createEnvironment(environmentA);
+      verify(envFactoryA, Mockito.times(1)).createEnvironment(environmentAA);
+    }
+  }
+
+  @Test
+  public void failedCreatingMultipleEnvironmentFromMultipleTypes() throws 
Exception {
+    ServerFactory serverFactory = ServerFactory.createDefault();
+
+    Environment environmentA = 
Environment.newBuilder().setUrn("env:urn:a").build();
+    EnvironmentFactory envFactoryA = mock(EnvironmentFactory.class);
+    
when(envFactoryA.createEnvironment(environmentA)).thenReturn(remoteEnvironment);
+    EnvironmentFactory.Provider environmentProviderFactoryA =
+        mock(EnvironmentFactory.Provider.class);
+    when(environmentProviderFactoryA.createEnvironmentFactory(
+            any(), any(), any(), any(), any(), any()))
+        .thenReturn(envFactoryA);
+    
when(environmentProviderFactoryA.getServerFactory()).thenReturn(serverFactory);
+
+    Environment environmentB = 
Environment.newBuilder().setUrn("env:urn:b").build();
+    EnvironmentFactory envFactoryB = mock(EnvironmentFactory.class);
+    
when(envFactoryB.createEnvironment(environmentB)).thenReturn(remoteEnvironment);
+    EnvironmentFactory.Provider environmentProviderFactoryB =
+        mock(EnvironmentFactory.Provider.class);
+    when(environmentProviderFactoryB.createEnvironmentFactory(
+            any(), any(), any(), any(), any(), any()))
+        .thenReturn(envFactoryB);
+    
when(environmentProviderFactoryB.getServerFactory()).thenReturn(serverFactory);
+
+    Map<String, Provider> environmentFactoryProviderMap =
+        ImmutableMap.of(
+            environmentA.getUrn(), environmentProviderFactoryA,
+            environmentB.getUrn(), environmentProviderFactoryB);
+    try (DefaultJobBundleFactory bundleFactory =
+        DefaultJobBundleFactory.create(
+            JobInfo.create("testJob", "testJob", "token", 
Struct.getDefaultInstance()),
+            environmentFactoryProviderMap)) {
+      bundleFactory.forStage(getExecutableStage(environmentB));
+      thrown.expectCause(Matchers.any(IllegalArgumentException.class));
+      bundleFactory.forStage(getExecutableStage(environmentA));
+    }
+  }
+
   @Test
   public void closesEnvironmentOnCleanup() throws Exception {
-    ProcessJobBundleFactory bundleFactory =
-        new ProcessJobBundleFactory(
+    DefaultJobBundleFactory bundleFactory =
+        new DefaultJobBundleFactory(
             envFactory,
-            serverFactory,
             stageIdGenerator,
             controlServer,
             loggingServer,
             retrievalServer,
-            provisioningServer);
+            provisioningServer,
+            dataServer,
+            stateServer);
     try (AutoCloseable unused = bundleFactory) {
       bundleFactory.forStage(getExecutableStage(environment));
     }
@@ -114,15 +231,16 @@ public void closesEnvironmentOnCleanup() throws Exception 
{
 
   @Test
   public void cachesEnvironment() throws Exception {
-    try (ProcessJobBundleFactory bundleFactory =
-        new ProcessJobBundleFactory(
+    try (DefaultJobBundleFactory bundleFactory =
+        new DefaultJobBundleFactory(
             envFactory,
-            serverFactory,
             stageIdGenerator,
             controlServer,
             loggingServer,
             retrievalServer,
-            provisioningServer)) {
+            provisioningServer,
+            dataServer,
+            stateServer)) {
       StageBundleFactory bf1 = 
bundleFactory.forStage(getExecutableStage(environment));
       StageBundleFactory bf2 = 
bundleFactory.forStage(getExecutableStage(environment));
       // NOTE: We hang on to stage bundle references to ensure their 
underlying environments are not
@@ -137,7 +255,7 @@ public void cachesEnvironment() throws Exception {
 
   @Test
   public void doesNotCacheDifferentEnvironments() throws Exception {
-    Environment envFoo = Environments.createDockerEnvironment("foo-env-url");
+    Environment envFoo = 
Environment.newBuilder().setUrn("dummy:urn:another").build();
     RemoteEnvironment remoteEnvFoo = mock(RemoteEnvironment.class);
     InstructionRequestHandler fooInstructionHandler = 
mock(InstructionRequestHandler.class);
     when(envFactory.createEnvironment(envFoo)).thenReturn(remoteEnvFoo);
@@ -146,15 +264,16 @@ public void doesNotCacheDifferentEnvironments() throws 
Exception {
     when(fooInstructionHandler.handle(any()))
         .thenReturn(CompletableFuture.completedFuture(instructionResponse));
 
-    try (ProcessJobBundleFactory bundleFactory =
-        new ProcessJobBundleFactory(
+    try (DefaultJobBundleFactory bundleFactory =
+        new DefaultJobBundleFactory(
             envFactory,
-            serverFactory,
             stageIdGenerator,
             controlServer,
             loggingServer,
             retrievalServer,
-            provisioningServer)) {
+            provisioningServer,
+            dataServer,
+            stateServer)) {
       bundleFactory.forStage(getExecutableStage(environment));
       bundleFactory.forStage(getExecutableStage(envFoo));
       verify(envFactory).createEnvironment(environment);
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactoryTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactoryTest.java
deleted file mode 100644
index 6f32d58969a..00000000000
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactoryTest.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.fnexecution.control;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-import java.util.concurrent.CompletableFuture;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse;
-import org.apache.beam.model.pipeline.v1.RunnerApi.Coder;
-import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
-import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
-import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload;
-import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
-import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
-import org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec;
-import org.apache.beam.model.pipeline.v1.RunnerApi.WindowingStrategy;
-import org.apache.beam.runners.core.construction.Environments;
-import org.apache.beam.runners.core.construction.ModelCoders;
-import org.apache.beam.runners.core.construction.graph.ExecutableStage;
-import org.apache.beam.runners.fnexecution.GrpcFnServer;
-import org.apache.beam.runners.fnexecution.ServerFactory;
-import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
-import 
org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory;
-import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
-import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
-import 
org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
-import org.apache.beam.sdk.fn.IdGenerator;
-import org.apache.beam.sdk.fn.IdGenerators;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/** Tests for {@link DockerJobBundleFactory}. */
-@RunWith(JUnit4.class)
-public class DockerJobBundleFactoryTest {
-
-  @Mock private DockerEnvironmentFactory envFactory;
-  @Mock private RemoteEnvironment remoteEnvironment;
-  @Mock private InstructionRequestHandler instructionHandler;
-  @Mock private ServerFactory serverFactory;
-  @Mock GrpcFnServer<FnApiControlClientPoolService> controlServer;
-  @Mock GrpcFnServer<GrpcLoggingService> loggingServer;
-  @Mock GrpcFnServer<ArtifactRetrievalService> retrievalServer;
-  @Mock GrpcFnServer<StaticGrpcProvisionService> provisioningServer;
-
-  private final Environment environment = 
Environments.createDockerEnvironment("env-url");
-  private final IdGenerator stageIdGenerator = 
IdGenerators.incrementingLongs();
-  private final InstructionResponse instructionResponse =
-      
InstructionResponse.newBuilder().setInstructionId("instruction-id").build();
-
-  @Before
-  public void setUpMocks() throws Exception {
-    MockitoAnnotations.initMocks(this);
-    
when(envFactory.createEnvironment(environment)).thenReturn(remoteEnvironment);
-    
when(remoteEnvironment.getInstructionRequestHandler()).thenReturn(instructionHandler);
-    when(instructionHandler.handle(any()))
-        .thenReturn(CompletableFuture.completedFuture(instructionResponse));
-  }
-
-  @Test
-  public void createsCorrectEnvironment() throws Exception {
-    try (DockerJobBundleFactory bundleFactory =
-        new DockerJobBundleFactory(
-            envFactory,
-            serverFactory,
-            stageIdGenerator,
-            controlServer,
-            loggingServer,
-            retrievalServer,
-            provisioningServer)) {
-      bundleFactory.forStage(getExecutableStage(environment));
-      verify(envFactory).createEnvironment(environment);
-    }
-  }
-
-  @Test
-  public void closesEnvironmentOnCleanup() throws Exception {
-    DockerJobBundleFactory bundleFactory =
-        new DockerJobBundleFactory(
-            envFactory,
-            serverFactory,
-            stageIdGenerator,
-            controlServer,
-            loggingServer,
-            retrievalServer,
-            provisioningServer);
-    try (AutoCloseable unused = bundleFactory) {
-      bundleFactory.forStage(getExecutableStage(environment));
-    }
-    verify(remoteEnvironment).close();
-  }
-
-  @Test
-  public void cachesEnvironment() throws Exception {
-    try (DockerJobBundleFactory bundleFactory =
-        new DockerJobBundleFactory(
-            envFactory,
-            serverFactory,
-            stageIdGenerator,
-            controlServer,
-            loggingServer,
-            retrievalServer,
-            provisioningServer)) {
-      StageBundleFactory bf1 = 
bundleFactory.forStage(getExecutableStage(environment));
-      StageBundleFactory bf2 = 
bundleFactory.forStage(getExecutableStage(environment));
-      // NOTE: We hang on to stage bundle references to ensure their 
underlying environments are not
-      // garbage collected. For additional safety, we print the factories to 
ensure the referernces
-      // are not optimized away.
-      System.out.println("bundle factory 1:" + bf1);
-      System.out.println("bundle factory 1:" + bf2);
-      verify(envFactory).createEnvironment(environment);
-      verifyNoMoreInteractions(envFactory);
-    }
-  }
-
-  @Test
-  public void doesNotCacheDifferentEnvironments() throws Exception {
-    Environment envFoo = Environments.createDockerEnvironment("foo-env-url");
-    RemoteEnvironment remoteEnvFoo = mock(RemoteEnvironment.class);
-    InstructionRequestHandler fooInstructionHandler = 
mock(InstructionRequestHandler.class);
-    when(envFactory.createEnvironment(envFoo)).thenReturn(remoteEnvFoo);
-    
when(remoteEnvFoo.getInstructionRequestHandler()).thenReturn(fooInstructionHandler);
-    // Don't bother creating a distinct instruction response because we don't 
use it here.
-    when(fooInstructionHandler.handle(any()))
-        .thenReturn(CompletableFuture.completedFuture(instructionResponse));
-
-    try (DockerJobBundleFactory bundleFactory =
-        new DockerJobBundleFactory(
-            envFactory,
-            serverFactory,
-            stageIdGenerator,
-            controlServer,
-            loggingServer,
-            retrievalServer,
-            provisioningServer)) {
-      bundleFactory.forStage(getExecutableStage(environment));
-      bundleFactory.forStage(getExecutableStage(envFoo));
-      verify(envFactory).createEnvironment(environment);
-      verify(envFactory).createEnvironment(envFoo);
-      verifyNoMoreInteractions(envFactory);
-    }
-  }
-
-  private static ExecutableStage getExecutableStage(Environment environment) {
-    return ExecutableStage.fromPayload(
-        ExecutableStagePayload.newBuilder()
-            .setInput("input-pc")
-            .setEnvironment(environment)
-            .setComponents(
-                Components.newBuilder()
-                    .putPcollections(
-                        "input-pc",
-                        PCollection.newBuilder()
-                            .setWindowingStrategyId("windowing-strategy")
-                            .setCoderId("coder-id")
-                            .build())
-                    .putWindowingStrategies(
-                        "windowing-strategy",
-                        
WindowingStrategy.newBuilder().setWindowCoderId("coder-id").build())
-                    .putCoders(
-                        "coder-id",
-                        Coder.newBuilder()
-                            .setSpec(
-                                SdkFunctionSpec.newBuilder()
-                                    .setSpec(
-                                        FunctionSpec.newBuilder()
-                                            
.setUrn(ModelCoders.INTERVAL_WINDOW_CODER_URN)
-                                            .build())
-                                    .build())
-                            .build())
-                    .build())
-            .build());
-  }
-}
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go 
b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index 33e77d0ad32..9d6bb34acd6 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -62,8 +62,8 @@ const (
 
 // Options for marshalling a graph into a model pipeline.
 type Options struct {
-       // ContainerImageURL is the default environment container image.
-       ContainerImageURL string
+       // Environment used to run the user code.
+       Environment pb.Environment
 }
 
 // Marshal converts a graph to a model pipeline.
@@ -488,16 +488,7 @@ func boolToBounded(bounded bool) pb.IsBounded_Enum {
 func (m *marshaller) addDefaultEnv() string {
        const id = "go"
        if _, exists := m.environments[id]; !exists {
-               payload := &pb.DockerPayload{ContainerImage: 
m.opt.ContainerImageURL}
-               serializedPayload, err := proto.Marshal(payload)
-               if err != nil {
-                       panic(fmt.Sprintf("Failed to serialize Environment 
payload %v: %v", payload, err))
-               }
-               m.environments[id] = &pb.Environment{
-                 Url: m.opt.ContainerImageURL,
-                 Urn: "beam:env:docker:v1",
-                 Payload: serializedPayload,
-               }
+               m.environments[id] = &m.opt.Environment
        }
        return id
 }
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go 
b/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
index dff3eea92f8..e12c251337a 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
@@ -25,6 +25,7 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
        "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+       pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
        "github.com/golang/protobuf/proto"
 )
 
@@ -79,7 +80,12 @@ func TestParDo(t *testing.T) {
                t.Fatal("expected a single edge")
        }
 
-       p, err := graphx.Marshal(edges, &graphx.Options{ContainerImageURL: 
"foo"})
+       payload, err := proto.Marshal(&pb.DockerPayload{ContainerImage: "foo"})
+       if err != nil {
+               t.Fatal(err)
+       }
+       p, err := graphx.Marshal(edges,
+               &graphx.Options{Environment: pb.Environment{Url: "foo", Urn: 
"beam:env:docker:v1", Payload: payload}})
        if err != nil {
                t.Fatal(err)
        }
diff --git a/sdks/go/pkg/beam/core/runtime/pipelinex/util.go 
b/sdks/go/pkg/beam/core/runtime/pipelinex/util.go
index 2457377e059..f859d95998f 100644
--- a/sdks/go/pkg/beam/core/runtime/pipelinex/util.go
+++ b/sdks/go/pkg/beam/core/runtime/pipelinex/util.go
@@ -33,7 +33,7 @@ func Bounded(p *pb.Pipeline) bool {
 func ContainerImages(p *pb.Pipeline) []string {
        var ret []string
        for _, t := range p.GetComponents().GetEnvironments() {
-// TODO(angoenka) 09/14/2018 Check t.Urn before parsing the payload.
+               // TODO(angoenka) 09/14/2018 Check t.Urn before parsing the 
payload.
                var payload pb.DockerPayload
                proto.Unmarshal(t.GetPayload(), &payload)
                ret = append(ret, payload.ContainerImage)
diff --git a/sdks/go/pkg/beam/options/jobopts/options.go 
b/sdks/go/pkg/beam/options/jobopts/options.go
index 1e60f60fbf2..1f8828b26cd 100644
--- a/sdks/go/pkg/beam/options/jobopts/options.go
+++ b/sdks/go/pkg/beam/options/jobopts/options.go
@@ -37,8 +37,19 @@ var (
        // JobName is the name of the job.
        JobName = flag.String("job_name", "", "Job name (optional).")
 
-       // ContainerImage is the location of the SDK harness container image.
-       ContainerImage = flag.String("container_image", "", "Container image")
+       // EnvironmentType is the environment type to run the user code.
+       EnvironmentType = flag.String("environment_type", "DOCKER",
+               "Environment Type. Possible options are DOCKER and PROCESS.")
+
+       // EnvironmentType is the environment type to run the user code.
+       EnvironmentConfig = flag.String("environment_config",
+               "",
+               "Set environment configuration for running the user code.\n"+
+                       "For DOCKER: Url for the docker image.\n"+
+                       "For PROCESS: json of the form {\"os\": \"<OS>\", "+
+                       "\"arch\": \"<ARCHITECTURE>\", \"command\": \"<process 
to execute>\", "+
+                       "\"env\":{\"<Environment variables 1>\": \"<ENV_VAL>\"} 
}. "+
+                       "All fields in the json are optional except command.")
 
        // WorkerBinary is the location of the compiled worker binary. If not
        // specified, the binary is produced via go build.
@@ -72,15 +83,30 @@ func GetJobName() string {
        return *JobName
 }
 
-// GetContainerImage returns the specified SDK harness container image or,
+// GetEnvironmentUrn returns the specified EnvironmentUrn used to run the SDK 
Harness,
+// if not present, returns the docker environment urn "beam:env:docker:v1".
+// Convenience function.
+func GetEnvironmentUrn(ctx context.Context) string {
+       switch env := strings.ToLower(*EnvironmentType); env {
+       case "process":
+               return "beam:env:process:v1"
+       case "docker":
+               return "beam:env:docker:v1"
+       default:
+               log.Infof(ctx, "No environment type specified. Using default 
environment: '%v'", *EnvironmentType)
+               return "beam:env:docker:v1"
+       }
+}
+
+// GetEnvironmentConfig returns the specified configuration for specified SDK 
Harness,
 // if not present, the default development container for the current user.
 // Convenience function.
-func GetContainerImage(ctx context.Context) string {
-       if *ContainerImage == "" {
-               *ContainerImage = 
os.ExpandEnv("$USER-docker-apache.bintray.io/beam/go:latest")
-               log.Infof(ctx, "No container image specified. Using dev image: 
'%v'", *ContainerImage)
+func GetEnvironmentConfig(ctx context.Context) string {
+       if *EnvironmentConfig == "" {
+               *EnvironmentConfig = 
os.ExpandEnv("$USER-docker-apache.bintray.io/beam/go:latest")
+               log.Infof(ctx, "No environment config specified. Using default 
config: '%v'", *EnvironmentConfig)
        }
-       return *ContainerImage
+       return *EnvironmentConfig
 }
 
 // GetExperiments returns the experiments.
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
index af3cce04415..293bab2b7a8 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
@@ -32,6 +32,7 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks"
        "github.com/apache/beam/sdks/go/pkg/beam/log"
+       pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
        "github.com/apache/beam/sdks/go/pkg/beam/options/gcpopts"
        "github.com/apache/beam/sdks/go/pkg/beam/options/jobopts"
        "github.com/apache/beam/sdks/go/pkg/beam/runners/dataflow/dataflowlib"
@@ -87,7 +88,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
                return errors.New("no GCS staging location specified. Use 
--staging_location=gs://<bucket>/<path>")
        }
        if *image == "" {
-               *image = jobopts.GetContainerImage(ctx)
+               *image = getContainerImage(ctx)
        }
        var jobLabels map[string]string
        if *labels != "" {
@@ -143,7 +144,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
        if err != nil {
                return err
        }
-       model, err := graphx.Marshal(edges, &graphx.Options{ContainerImageURL: 
*image})
+       model, err := graphx.Marshal(edges, &graphx.Options{Environment: 
createEnvironment(ctx)})
        if err != nil {
                return fmt.Errorf("failed to generate model pipeline: %v", err)
        }
@@ -168,7 +169,6 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
        _, err = dataflowlib.Execute(ctx, model, opts, workerURL, jarURL, 
modelURL, *endpoint, false)
        return err
 }
-
 func gcsRecorderHook(opts []string) perf.CaptureHook {
        bucket, prefix, err := gcsx.ParseObject(opts[0])
        if err != nil {
@@ -183,3 +183,36 @@ func gcsRecorderHook(opts []string) perf.CaptureHook {
                return gcsx.WriteObject(client, bucket, path.Join(prefix, 
spec), r)
        }
 }
+
+func getContainerImage(ctx context.Context) string {
+       urn := jobopts.GetEnvironmentUrn(ctx)
+       if urn == "" || urn == "beam:env:docker:v1" {
+               return jobopts.GetEnvironmentConfig(ctx)
+       }
+       panic(fmt.Sprintf("Unsupported environment %v", urn))
+}
+
+func createEnvironment(ctx context.Context) pb.Environment {
+       var environment pb.Environment
+       switch urn := jobopts.GetEnvironmentUrn(ctx); urn {
+       case "beam:env:process:v1":
+               // TODO Support process based SDK Harness.
+               panic(fmt.Sprintf("Unsupported environment %v", urn))
+       case "beam:env:docker:v1":
+               fallthrough
+       default:
+               config := jobopts.GetEnvironmentConfig(ctx)
+               payload := &pb.DockerPayload{ContainerImage: config}
+               serializedPayload, err := proto.Marshal(payload)
+               if err != nil {
+                       panic(fmt.Sprintf(
+                               "Failed to serialize Environment payload %v for 
config %v: %v", payload, config, err))
+               }
+               environment = pb.Environment{
+                       Url:     config,
+                       Urn:     urn,
+                       Payload: serializedPayload,
+               }
+       }
+       return environment
+}
diff --git a/sdks/go/pkg/beam/runners/universal/universal.go 
b/sdks/go/pkg/beam/runners/universal/universal.go
index fe6866ae599..539c1e77057 100644
--- a/sdks/go/pkg/beam/runners/universal/universal.go
+++ b/sdks/go/pkg/beam/runners/universal/universal.go
@@ -26,6 +26,7 @@ import (
        // Importing to get the side effect of the remote execution hook. See 
init().
        _ "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/init"
        "github.com/apache/beam/sdks/go/pkg/beam/log"
+       pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
        "github.com/apache/beam/sdks/go/pkg/beam/options/jobopts"
        "github.com/apache/beam/sdks/go/pkg/beam/runners/universal/runnerlib"
        "github.com/golang/protobuf/proto"
@@ -47,7 +48,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
        if err != nil {
                return err
        }
-       pipeline, err := graphx.Marshal(edges, 
&graphx.Options{ContainerImageURL: jobopts.GetContainerImage(ctx)})
+       pipeline, err := graphx.Marshal(edges, &graphx.Options{Environment: 
createEnvironment(ctx)})
        if err != nil {
                return fmt.Errorf("failed to generate model pipeline: %v", err)
        }
@@ -62,3 +63,28 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
        _, err = runnerlib.Execute(ctx, pipeline, endpoint, opt, *jobopts.Async)
        return err
 }
+
+func createEnvironment(ctx context.Context) pb.Environment {
+       var environment pb.Environment
+       switch urn := jobopts.GetEnvironmentUrn(ctx); urn {
+       case "beam:env:process:v1":
+               // TODO Support process based SDK Harness.
+               panic(fmt.Sprintf("Unsupported environment %v", urn))
+       case "beam:env:docker:v1":
+               fallthrough
+       default:
+               config := jobopts.GetEnvironmentConfig(ctx)
+               payload := &pb.DockerPayload{ContainerImage: config}
+               serializedPayload, err := proto.Marshal(payload)
+               if err != nil {
+                       panic(fmt.Sprintf(
+                               "Failed to serialize Environment payload %v for 
config %v: %v", payload, config, err))
+               }
+               environment = pb.Environment{
+                       Url:     config,
+                       Urn:     urn,
+                       Payload: serializedPayload,
+               }
+       }
+       return environment
+}
diff --git a/sdks/go/test/run_integration_tests.sh 
b/sdks/go/test/run_integration_tests.sh
index 5f9317f4b70..43896466ea5 100755
--- a/sdks/go/test/run_integration_tests.sh
+++ b/sdks/go/test/run_integration_tests.sh
@@ -75,7 +75,8 @@ echo ">>> RUNNING DATAFLOW INTEGRATION TESTS"
 ./sdks/go/build/bin/integration \
     --runner=dataflow \
     --project=$DATAFLOW_PROJECT \
-    --worker_harness_container_image=$CONTAINER:$TAG \
+    --environment_type=DOCKER \
+    --environment_config=$CONTAINER:$TAG \
     --staging_location=$GCS_LOCATION/staging-validatesrunner-test \
     --temp_location=$GCS_LOCATION/temp-validatesrunner-test \
     --worker_binary=./sdks/go/test/build/bin/linux-amd64/worker
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
index a57d8b49427..bdb1caf3b4d 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.options;
 
 import java.util.List;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.options.Validation.Required;
 
 /** Pipeline options common to all portable runners. */
@@ -39,6 +40,13 @@
 
   void setFilesToStage(List<String> value);
 
+  @Description(
+      "Set the default environment for running user code. "
+          + "Currently only docker image URL are supported.")
+  String getDefaultJavaEnvironmentUrl();
+
+  void setDefaultJavaEnvironmentUrl(String url);
+
   @Description(
       "Job service endpoint to use. Should be in the form of address and port, 
e.g. localhost:3000")
   @Required
@@ -47,9 +55,21 @@
   void setJobEndpoint(String endpoint);
 
   @Description(
-      "Set the default environment for running user code. "
-          + "Currently only docker image URL are supported.")
-  String getDefaultJavaEnvironmentUrl();
+      "Set the default environment type for running user code. "
+          + "Possible options are DOCKER and PROCESS.")
+  String getDefaultEnvironmentType();
 
-  void setDefaultJavaEnvironmentUrl(String url);
+  void setDefaultEnvironmentType(String envitonmentType);
+
+  @Description(
+      "Set environment configuration for running the user code.\n"
+          + " For DOCKER: Url for the docker image.\n"
+          + " For PROCESS: json of the form "
+          + "{\"os\": \"<OS>\", \"arch\": \"<ARCHITECTURE>\", \"command\": 
\"<process to execute>\", "
+          + "\"env\":{\"<Environment variables 1>\": \"<ENV_VAL>\"} }. "
+          + "All fields in the json are optional except command.")
+  @Nullable
+  String getDefaultEnvironmentConfig();
+
+  void setDefaultEnvironmentConfig(@Nullable String config);
 }
diff --git a/sdks/python/apache_beam/options/pipeline_options.py 
b/sdks/python/apache_beam/options/pipeline_options.py
index 7da91a668da..bb43e386616 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -655,11 +655,18 @@ def _add_argparse_args(cls, parser):
                         help=
                         ('Job service endpoint to use. Should be in the form '
                          'of address and port, e.g. localhost:3000'))
-    parser.add_argument('--harness_docker_image',
-                        default=None,
-                        help=
-                        ('Docker image to use for executing Python code '
-                         'in the pipeline when running using the Fn API.'))
+    parser.add_argument(
+        '--environment_type', default=None,
+        help=('Set the default environment type for running '
+              'user code. Possible options are DOCKER and PROCESS.'))
+    parser.add_argument(
+        '--environment_config', default=None,
+        help=('Set environment configuration for running the user code.\n For '
+              'DOCKER: Url for the docker image.\n For PROCESS: json of the '
+              'form {"os": "<OS>", "arch": "<ARCHITECTURE>", "command": '
+              '"<process to execute>", "env":{"<Environment variables 1>": '
+              '"<ENV_VAL>"} }. All fields in the json are optional except '
+              'command.'))
 
 
 class FlinkOptions(PipelineOptions):
diff --git a/sdks/python/apache_beam/runners/pipeline_context.py 
b/sdks/python/apache_beam/runners/pipeline_context.py
index 78048bd4d90..dbe2953ae8d 100644
--- a/sdks/python/apache_beam/runners/pipeline_context.py
+++ b/sdks/python/apache_beam/runners/pipeline_context.py
@@ -27,7 +27,6 @@
 from apache_beam import coders
 from apache_beam import pipeline
 from apache_beam import pvalue
-from apache_beam.portability import common_urns
 from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.transforms import core
@@ -110,7 +109,7 @@ class PipelineContext(object):
       'environments': Environment,
   }
 
-  def __init__(self, proto=None, default_environment_url=None):
+  def __init__(self, proto=None, default_environment=None):
     if isinstance(proto, beam_fn_api_pb2.ProcessBundleDescriptor):
       proto = beam_runner_api_pb2.Components(
           coders=dict(proto.coders.items()),
@@ -120,15 +119,9 @@ def __init__(self, proto=None, 
default_environment_url=None):
       setattr(
           self, name, _PipelineContextMap(
               self, cls, getattr(proto, name, None)))
-    if default_environment_url:
+    if default_environment:
       self._default_environment_id = self.environments.get_id(
-          Environment(
-              beam_runner_api_pb2.Environment(
-                  url=default_environment_url,
-                  urn=common_urns.environments.DOCKER.urn,
-                  payload=beam_runner_api_pb2.DockerPayload(
-                      container_image=default_environment_url
-                  ).SerializeToString())))
+          Environment(default_environment))
     else:
       self._default_environment_id = None
 
diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py 
b/sdks/python/apache_beam/runners/portability/portable_runner.py
index 18162bb6444..ec537266324 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner.py
@@ -17,6 +17,7 @@
 
 from __future__ import absolute_import
 
+import json
 import logging
 import os
 import threading
@@ -30,6 +31,7 @@
 from apache_beam.portability import common_urns
 from apache_beam.portability.api import beam_job_api_pb2
 from apache_beam.portability.api import beam_job_api_pb2_grpc
+from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.runners import pipeline_context
 from apache_beam.runners import runner
 from apache_beam.runners.job import utils as job_utils
@@ -69,17 +71,46 @@ def default_docker_image():
       logging.warning('Could not find a Python SDK docker image.')
       return 'unknown'
 
+  @staticmethod
+  def _create_environment(options):
+    portable_options = options.view_as(PortableOptions)
+    environment_urn = common_urns.environments.DOCKER.urn
+    if portable_options.environment_type == 'DOCKER':
+      environment_urn = common_urns.environments.DOCKER.urn
+    elif portable_options.environment_type == 'PROCESS':
+      environment_urn = common_urns.environments.PROCESS.urn
+
+    if environment_urn == common_urns.environments.DOCKER.urn:
+      docker_image = (
+          portable_options.environment_config
+          or PortableRunner.default_docker_image())
+      return beam_runner_api_pb2.Environment(
+          url=docker_image,
+          urn=common_urns.environments.DOCKER.urn,
+          payload=beam_runner_api_pb2.DockerPayload(
+              container_image=docker_image
+          ).SerializeToString())
+    elif environment_urn == common_urns.environments.PROCESS.urn:
+      config = json.loads(portable_options.environment_config)
+      return beam_runner_api_pb2.Environment(
+          urn=common_urns.environments.PROCESS.urn,
+          payload=beam_runner_api_pb2.ProcessPayload(
+              os=(config.get('os') or ''),
+              arch=(config.get('arch') or ''),
+              command=config.get('command'),
+              env=(config.get('env') or '')
+          ).SerializeToString())
+
   def run_pipeline(self, pipeline):
-    docker_image = (
-        pipeline.options.view_as(PortableOptions).harness_docker_image
-        or self.default_docker_image())
-    job_endpoint = pipeline.options.view_as(PortableOptions).job_endpoint
+    portable_options = pipeline.options.view_as(PortableOptions)
+    job_endpoint = portable_options.job_endpoint
     if not job_endpoint:
       docker = DockerizedJobServer()
       job_endpoint = docker.start()
 
     proto_context = pipeline_context.PipelineContext(
-        default_environment_url=docker_image)
+        default_environment=PortableRunner._create_environment(
+            portable_options))
     proto_pipeline = pipeline.to_runner_api(context=proto_context)
 
     if not self.is_embedded_fnapi_runner:
diff --git 
a/sdks/python/apache_beam/runners/portability/portable_runner_test.py 
b/sdks/python/apache_beam/runners/portability/portable_runner_test.py
index 0a406bce3c7..950c6bd4948 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner_test.py
@@ -34,11 +34,14 @@
 import apache_beam as beam
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.options.pipeline_options import PortableOptions
+from apache_beam.portability import common_urns
 from apache_beam.portability.api import beam_job_api_pb2
 from apache_beam.portability.api import beam_job_api_pb2_grpc
+from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.runners.portability import fn_api_runner_test
 from apache_beam.runners.portability import portable_runner
 from apache_beam.runners.portability.local_job_service import LocalJobServicer
+from apache_beam.runners.portability.portable_runner import PortableRunner
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
 
@@ -216,6 +219,57 @@ def _subprocess_command(cls, port):
     ]
 
 
+class PortableRunnerInternalTest(unittest.TestCase):
+  def test__create_default_environment(self):
+    docker_image = PortableRunner.default_docker_image()
+    self.assertEqual(
+        
PortableRunner._create_environment(PipelineOptions.from_dictionary({})),
+        beam_runner_api_pb2.Environment(
+            url=docker_image,
+            urn=common_urns.environments.DOCKER.urn,
+            payload=beam_runner_api_pb2.DockerPayload(
+                container_image=docker_image
+            ).SerializeToString()))
+
+  def test__create_docker_environment(self):
+    docker_image = 'py-docker'
+    self.assertEqual(
+        PortableRunner._create_environment(PipelineOptions.from_dictionary({
+            'environment_type': 'DOCKER',
+            'environment_config': docker_image,
+        })), beam_runner_api_pb2.Environment(
+            url=docker_image,
+            urn=common_urns.environments.DOCKER.urn,
+            payload=beam_runner_api_pb2.DockerPayload(
+                container_image=docker_image
+            ).SerializeToString()))
+
+  def test__create_process_environment(self):
+    self.assertEqual(
+        PortableRunner._create_environment(PipelineOptions.from_dictionary({
+            'environment_type': "PROCESS",
+            'environment_config': '{"os": "linux", "arch": "amd64", '
+                                  '"command": "run.sh", '
+                                  '"env":{"k1": "v1"} }',
+        })), beam_runner_api_pb2.Environment(
+            urn=common_urns.environments.PROCESS.urn,
+            payload=beam_runner_api_pb2.ProcessPayload(
+                os='linux',
+                arch='amd64',
+                command='run.sh',
+                env={'k1': 'v1'},
+            ).SerializeToString()))
+    self.assertEqual(
+        PortableRunner._create_environment(PipelineOptions.from_dictionary({
+            'environment_type': 'PROCESS',
+            'environment_config': '{"command": "run.sh"}',
+        })), beam_runner_api_pb2.Environment(
+            urn=common_urns.environments.PROCESS.urn,
+            payload=beam_runner_api_pb2.ProcessPayload(
+                command='run.sh',
+            ).SerializeToString()))
+
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 148426)
    Time Spent: 15h 50m  (was: 15h 40m)

> Modify Environment to support non-dockerized SDK harness deployments 
> ---------------------------------------------------------------------
>
>                 Key: BEAM-5288
>                 URL: https://issues.apache.org/jira/browse/BEAM-5288
>             Project: Beam
>          Issue Type: New Feature
>          Components: beam-model
>            Reporter: Maximilian Michels
>            Assignee: Ankur Goenka
>            Priority: Major
>          Time Spent: 15h 50m
>  Remaining Estimate: 0h
>
> As of mailing discussions and BEAM-5187, it has become clear that we need to 
> extend the Environment information. In addition to the Docker environment, 
> the extended environment holds deployment options for 1) a process-based 
> environment, 2) an externally managed environment.
> The proto definition, as of now, looks as follows:
> {noformat}
>  message Environment {
>    // (Required) The URN of the payload
>    string urn = 1;
>    // (Optional) The data specifying any parameters to the URN. If
>    // the URN does not require any arguments, this may be omitted.
>    bytes payload = 2;
>  }
>  message StandardEnvironments {
>    enum Environments {
>      DOCKER = 0 [(beam_urn) = "beam:env:docker:v1"];
>      PROCESS = 1 [(beam_urn) = "beam:env:process:v1"];
>      EXTERNAL = 2 [(beam_urn) = "beam:env:external:v1"];
>    }
>  }
>  // The payload of a Docker image
>  message DockerPayload {
>    string container_image = 1;  // implicitly linux_amd64.
>  }
>  message ProcessPayload {
>    string os = 1;  // "linux", "darwin", ..
>    string arch = 2;  // "amd64", ..
>    string command = 3; // process to execute
>    map<string, string> env = 4; // environment variables
>  }
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to