[ 
https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=94636&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94636
 ]

ASF GitHub Bot logged work on BEAM-3327:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/Apr/18 15:37
            Start Date: 24/Apr/18 15:37
    Worklog Time Spent: 10m 
      Work Description: tgroh closed pull request #5189: [BEAM-3327] Basic 
Docker environment factory
URL: https://github.com/apache/beam/pull/5189
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/java-fn-execution/build.gradle 
b/runners/java-fn-execution/build.gradle
index 462752f684a..258987089df 100644
--- a/runners/java-fn-execution/build.gradle
+++ b/runners/java-fn-execution/build.gradle
@@ -39,3 +39,18 @@ dependencies {
   testCompile library.java.mockito_core
   testCompile library.java.slf4j_simple
 }
+
+test {
+  useJUnit {
+    // Exclude tests that need Docker.
+    excludeCategories 
"org.apache.beam.runners.fnexecution.environment.testing.NeedsDocker"
+  }
+}
+
+task testDocker(type: Test) {
+  group = "Verification"
+  description = "Runs Docker tests"
+  useJUnit {
+    includeCategories 
"org.apache.beam.runners.fnexecution.environment.testing.NeedsDocker"
+  }
+}
diff --git a/runners/java-fn-execution/pom.xml 
b/runners/java-fn-execution/pom.xml
index 515801538f7..bdf69c46a07 100644
--- a/runners/java-fn-execution/pom.xml
+++ b/runners/java-fn-execution/pom.xml
@@ -32,6 +32,53 @@
 
   <packaging>jar</packaging>
 
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+              <!-- Do not run Docker tests with unit tests. -->
+              <excludedGroups>
+                
org.apache.beam.runners.fnexecution.environment.testing.NeedsDocker
+              </excludedGroups>
+            </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+  <profiles>
+    <profile>
+      <id>docker-tests</id>
+      <activation><activeByDefault>false</activeByDefault></activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>docker-tests</id>
+                <phase>integration-test</phase>
+                <goals>
+                  <goal>test</goal>
+                </goals>
+                <configuration>
+                  <groups>
+                    
org.apache.beam.runners.fnexecution.environment.testing.NeedsDocker
+                  </groups>
+                  <!-- TODO: Enable this when we figure out how to run the 
tests
+                  <failIfNoTests>true</failIfNoTests>
+                   -->
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
   <dependencies>
     <dependency>
       <groupId>org.apache.beam</groupId>
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcContextHeaderAccessorProvider.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcContextHeaderAccessorProvider.java
index 71089e33f5b..a8dde1514eb 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcContextHeaderAccessorProvider.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcContextHeaderAccessorProvider.java
@@ -63,7 +63,14 @@ public static HeaderAccessor getHeaderAccessor() {
     @Override
     /** This method should be called from the request method. */
     public String getSdkWorkerId() {
-      return SDK_WORKER_CONTEXT_KEY.get();
+      // TODO: https://issues.apache.org/jira/browse/BEAM-4149 Some harnesses 
may not set the worker
+      // id header. Remove the null check below once this is fixed.
+      String workerId = SDK_WORKER_CONTEXT_KEY.get();
+      if (workerId == null) {
+        return "";
+      } else {
+        return workerId;
+      }
     }
   }
 }
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ControlClientPool.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ControlClientPool.java
index e348dc22cac..c343fda5114 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ControlClientPool.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ControlClientPool.java
@@ -17,16 +17,64 @@
  */
 package org.apache.beam.runners.fnexecution.control;
 
-import org.apache.beam.sdk.fn.function.ThrowingConsumer;
-import org.apache.beam.sdk.util.ThrowingSupplier;
+import java.time.Duration;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.concurrent.ThreadSafe;
 
-/** Control client pool that exposes a source and sink of control clients. */
-public interface ControlClientPool<T extends InstructionRequestHandler> {
+/**
+ * A pool of control clients that brokers incoming SDK harness connections (in 
the form of {@link
+ * InstructionRequestHandler InstructionRequestHandlers}.
+ *
+ * <p>Incoming instruction handlers usually come from the control plane gRPC 
service. Typical use:
+ *
+ * <pre>
+ *   // Within owner of the pool, who may or may not own the control plane 
server as well
+ *   ControlClientPool pool = ...
+ *   FnApiControlClientPoolService service =
+ *       FnApiControlClientPoolService.offeringClientsToSink(pool.getSink(), 
headerAccessor)
+ *   // Incoming gRPC control connections will now be added to the client pool.
+ *
+ *   // Within code that interacts with the instruction handler. The get call 
blocks until an
+ *   // incoming client is available:
+ *   ControlClientSource clientSource = ... InstructionRequestHandler
+ *   instructionHandler = clientSource.get("worker-id");
+ * </pre>
+ *
+ * <p>All {@link ControlClientPool} implementations must be thread-safe.
+ */
+@ThreadSafe
+public interface ControlClientPool {
+
+  /** Sink for control clients. */
+  Sink getSink();
 
   /** Source of control clients. */
-  ThrowingSupplier<T> getSource();
+  Source getSource();
 
-  /** Sink for control clients. */
-  ThrowingConsumer<T> getSink();
+  /** A sink for {@link InstructionRequestHandler InstructionRequestHandlers} 
keyed by worker id. */
+  @FunctionalInterface
+  interface Sink {
+
+    /**
+     * Puts an {@link InstructionRequestHandler} into a client pool. Worker 
ids must be unique per
+     * pool.
+     */
+    void put(String workerId, InstructionRequestHandler instructionHandler) 
throws Exception;
+  }
+
+  /** A source of {@link InstructionRequestHandler 
InstructionRequestHandlers}. */
+  @FunctionalInterface
+  interface Source {
 
+    /**
+     * Retrieves the {@link InstructionRequestHandler} for the given worker 
id, blocking until
+     * available or the request times out. Worker ids must be unique per pool. 
A given worker id
+     * must not be requested multiple times. Note that if the given worker id 
is never entered into
+     * the pool, this call will never return.
+     *
+     * @throws TimeoutException if the request times out
+     * @throws InterruptedException if interrupted while waiting
+     */
+    InstructionRequestHandler take(String workerId, Duration timeout) throws 
Exception;
+  }
 }
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java
index a3f59ed4f6c..5ea0d6521ab 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java
@@ -17,32 +17,37 @@
  */
 package org.apache.beam.runners.fnexecution.control;
 
+import static com.google.common.base.Preconditions.checkState;
+
 import io.grpc.stub.StreamObserver;
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.concurrent.GuardedBy;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
 import org.apache.beam.runners.fnexecution.FnService;
 import org.apache.beam.runners.fnexecution.HeaderAccessor;
-import org.apache.beam.sdk.fn.function.ThrowingConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** A Fn API control service which adds incoming SDK harness connections to a 
pool. */
+/** A Fn API control service which adds incoming SDK harness connections to a 
sink. */
 public class FnApiControlClientPoolService extends 
BeamFnControlGrpc.BeamFnControlImplBase
     implements FnService {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(FnApiControlClientPoolService.class);
 
-  private final ThrowingConsumer<? super FnApiControlClient> clientPool;
-  private final Collection<FnApiControlClient> vendedClients = new 
CopyOnWriteArrayList<>();
+  private final Object lock = new Object();
+  private final ControlClientPool.Sink clientSink;
   private final HeaderAccessor headerAccessor;
-  private AtomicBoolean closed = new AtomicBoolean();
+
+  @GuardedBy("lock")
+  private final Collection<FnApiControlClient> vendedClients = new 
ArrayList<>();
+
+  @GuardedBy("lock")
+  private boolean closed = false;
 
   private FnApiControlClientPoolService(
-      ThrowingConsumer<? super FnApiControlClient> clientPool,
-      HeaderAccessor headerAccessor) {
-    this.clientPool = clientPool;
+      ControlClientPool.Sink clientSink, HeaderAccessor headerAccessor) {
+    this.clientSink = clientSink;
     this.headerAccessor = headerAccessor;
   }
 
@@ -50,12 +55,12 @@ private FnApiControlClientPoolService(
    * Creates a new {@link FnApiControlClientPoolService} which will enqueue 
and vend new SDK harness
    * connections.
    *
-   * <p>Clients placed into the {@code clientPool} are owned by whichever 
consumer owns the pool.
-   * That consumer is responsible for closing the clients when they are no 
longer needed.
+   * <p>Clients placed into the {@code clientSink} are owned by whoever 
consumes them from the other
+   * end of the pool. That consumer is responsible for closing the clients 
when they are no longer
+   * needed.
    */
   public static FnApiControlClientPoolService offeringClientsToPool(
-      ThrowingConsumer<? super FnApiControlClient> clientPool,
-      HeaderAccessor headerAccessor) {
+      ControlClientPool.Sink clientPool, HeaderAccessor headerAccessor) {
     return new FnApiControlClientPoolService(clientPool, headerAccessor);
   }
 
@@ -69,17 +74,25 @@ public static FnApiControlClientPoolService 
offeringClientsToPool(
   @Override
   public StreamObserver<BeamFnApi.InstructionResponse> control(
       StreamObserver<BeamFnApi.InstructionRequest> requestObserver) {
-    LOGGER.info("Beam Fn Control client connected with id {}", 
headerAccessor.getSdkWorkerId());
-    FnApiControlClient newClient =
-        FnApiControlClient.forRequestObserver(headerAccessor.getSdkWorkerId(), 
requestObserver);
+    String workerId = headerAccessor.getSdkWorkerId();
+    LOGGER.info("Beam Fn Control client connected with id {}", workerId);
+    FnApiControlClient newClient = 
FnApiControlClient.forRequestObserver(workerId, requestObserver);
     try {
       // Add the client to the pool of vended clients before making it 
available - we should close
       // the client when we close even if no one has picked it up yet. This 
can occur after the
       // service is closed, in which case the client will be discarded when 
the service is
       // discarded, which should be performed by a call to #shutdownNow. The 
remote caller must be
       // able to handle an unexpectedly terminated connection.
-      vendedClients.add(newClient);
-      clientPool.accept(newClient);
+      synchronized (lock) {
+        checkState(
+            !closed, "%s already closed", 
FnApiControlClientPoolService.class.getSimpleName());
+        // TODO: https://issues.apache.org/jira/browse/BEAM-4151: Prevent 
stale client references
+        // from leaking.
+        vendedClients.add(newClient);
+      }
+      // We do not attempt to transactionally add the client to our internal 
list and offer it to
+      // the sink.
+      clientSink.put(headerAccessor.getSdkWorkerId(), newClient);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw new RuntimeException(e);
@@ -91,9 +104,12 @@ public static FnApiControlClientPoolService 
offeringClientsToPool(
 
   @Override
   public void close() {
-    if (!closed.getAndSet(true)) {
-      for (FnApiControlClient vended : vendedClients) {
-        vended.close();
+    synchronized (lock) {
+      if (!closed) {
+        closed = true;
+        for (FnApiControlClient vended : vendedClients) {
+          vended.close();
+        }
       }
     }
   }
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/InstructionRequestHandler.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/InstructionRequestHandler.java
index 46e2d7b11e9..b65573268b9 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/InstructionRequestHandler.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/InstructionRequestHandler.java
@@ -21,7 +21,6 @@
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 
 /** Interface for any function that can handle a Fn API {@link 
BeamFnApi.InstructionRequest}. */
-@FunctionalInterface
-public interface InstructionRequestHandler {
+public interface InstructionRequestHandler extends AutoCloseable {
   CompletionStage<BeamFnApi.InstructionResponse> 
handle(BeamFnApi.InstructionRequest request);
 }
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/MapControlClientPool.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/MapControlClientPool.java
new file mode 100644
index 00000000000..2dc1ccdbcf1
--- /dev/null
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/MapControlClientPool.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.control;
+
+import com.google.common.collect.Maps;
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A {@link ControlClientPool} backed by a client map. It is expected that a 
given client id will be
+ * requested at most once.
+ */
+public class MapControlClientPool implements ControlClientPool {
+
+  /** Creates a {@link MapControlClientPool}. */
+  public static MapControlClientPool create() {
+    return new MapControlClientPool();
+  }
+
+  private final Map<String, CompletableFuture<InstructionRequestHandler>> 
clients =
+      Maps.newConcurrentMap();
+
+  private MapControlClientPool() {}
+
+  @Override
+  public Source getSource() {
+    return this::getClient;
+  }
+
+  @Override
+  public Sink getSink() {
+    return this::putClient;
+  }
+
+  private void putClient(String workerId, InstructionRequestHandler client) {
+    CompletableFuture<InstructionRequestHandler> future =
+        clients.computeIfAbsent(workerId, 
MapControlClientPool::createClientFuture);
+    boolean success = future.complete(client);
+    if (!success) {
+      throw new IllegalStateException(
+          String.format("Control client for worker id %s failed to compete", 
workerId));
+    }
+  }
+
+  private InstructionRequestHandler getClient(String workerId, Duration 
timeout)
+      throws ExecutionException, InterruptedException, TimeoutException {
+    CompletableFuture<InstructionRequestHandler> future =
+        clients.computeIfAbsent(workerId, 
MapControlClientPool::createClientFuture);
+    // TODO: Wire in health checking of clients so requests don't hang.
+    future.get(timeout.getSeconds(), TimeUnit.SECONDS);
+    InstructionRequestHandler client = future.get();
+    clients.remove(workerId);
+    return client;
+  }
+
+  private static CompletableFuture<InstructionRequestHandler> 
createClientFuture(String unused) {
+    return new CompletableFuture<>();
+  }
+}
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/QueueControlClientPool.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/QueueControlClientPool.java
deleted file mode 100644
index 9744a612b70..00000000000
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/QueueControlClientPool.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.fnexecution.control;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.SynchronousQueue;
-import org.apache.beam.sdk.fn.function.ThrowingConsumer;
-import org.apache.beam.sdk.util.ThrowingSupplier;
-
-/** Control client pool backed by a blocking queue. */
-public class QueueControlClientPool<T extends InstructionRequestHandler>
-    implements ControlClientPool {
-
-  private final BlockingQueue<T> queue;
-
-  /**
-   * Creates a client pool backed by a {@link SynchronousQueue}. Client 
submission blocks until
-   * there is a receiving thread waiting on the source.
-   */
-  public static QueueControlClientPool createSynchronous() {
-      return new QueueControlClientPool<>(new SynchronousQueue<>(true));
-  }
-
-  /**
-   * Creates a client pool backed by an unbounded {@link LinkedBlockingQueue}. 
Clients are buffered
-   * until consumed.
-   */
-  public static QueueControlClientPool createBuffering() {
-      return new QueueControlClientPool<>(new LinkedBlockingQueue<>());
-  }
-
-  private QueueControlClientPool(BlockingQueue<T> queue) {
-      this.queue = queue;
-  }
-
-  @Override
-  public ThrowingSupplier<T> getSource() {
-    return queue::take;
-  }
-
-  @Override
-  public ThrowingConsumer<T> getSink() {
-      return queue::put;
-  }
-
-}
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerCommand.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerCommand.java
new file mode 100644
index 00000000000..73959357593
--- /dev/null
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerCommand.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.environment;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.ImmutableList;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/** A docker command wrapper. Simplifies communications with the Docker 
daemon. */
+class DockerCommand {
+  // TODO: Should we require 64-character container ids? Docker technically 
allows abbreviated ids,
+  // but we _should_ always capture full ids.
+  private static final Pattern CONTAINER_ID_PATTERN = 
Pattern.compile("\\p{XDigit}{64}");
+
+  static DockerCommand forExecutable(String dockerExecutable, Duration 
commandTimeout) {
+    return new DockerCommand(dockerExecutable, commandTimeout);
+  }
+
+  private final String dockerExecutable;
+  private final Duration commandTimeout;
+
+  private DockerCommand(String dockerExecutable, Duration commandTimeout) {
+    this.dockerExecutable = dockerExecutable;
+    this.commandTimeout = commandTimeout;
+  }
+
+  /**
+   * Runs the given container image with the given command line arguments. 
Returns the running
+   * container id.
+   */
+  public String runImage(String imageTag, List<String> args)
+      throws IOException, TimeoutException, InterruptedException {
+    checkArgument(!imageTag.isEmpty(), "Docker image tag required");
+    // TODO: Validate args?
+    return runShortCommand(
+        ImmutableList.<String>builder()
+            .add(dockerExecutable)
+            .add("run")
+            .add("-d")
+            .add(imageTag)
+            .addAll(args)
+            .build());
+  }
+
+  /**
+   * Kills a docker container by container id.
+   *
+   * @throws IOException if an IOException occurs or if the given container id 
does not exist
+   */
+  public void killContainer(String containerId)
+      throws IOException, TimeoutException, InterruptedException {
+    checkArgument(containerId != null);
+    checkArgument(
+        CONTAINER_ID_PATTERN.matcher(containerId).matches(),
+        "Container ID must be a 64-character hexadecimal string");
+    runShortCommand(Arrays.asList(dockerExecutable, "kill", containerId));
+  }
+
+  /** Run the given command invocation and return stdout as a String. */
+  private String runShortCommand(List<String> invocation)
+      throws IOException, TimeoutException, InterruptedException {
+    ProcessBuilder pb = new ProcessBuilder(invocation);
+    Process process = pb.start();
+    // TODO: Consider supplying executor service here.
+    CompletableFuture<String> resultString =
+        CompletableFuture.supplyAsync(
+            () -> {
+              // NOTE: We do not own the underlying stream and do not close it.
+              BufferedReader reader =
+                  new BufferedReader(
+                      new InputStreamReader(process.getInputStream(), 
StandardCharsets.UTF_8));
+              return reader.lines().collect(Collectors.joining());
+            });
+    // NOTE: We only consume the error string in the case of an error.
+    CompletableFuture<String> errorFuture =
+        CompletableFuture.supplyAsync(
+            () -> {
+              BufferedReader reader =
+                  new BufferedReader(
+                      new InputStreamReader(process.getErrorStream(), 
StandardCharsets.UTF_8));
+              return reader.lines().collect(Collectors.joining());
+            });
+    // TODO: Retry on interrupt?
+    boolean processDone = process.waitFor(commandTimeout.toMillis(), 
TimeUnit.MILLISECONDS);
+    if (!processDone) {
+      process.destroy();
+      throw new TimeoutException(
+          String.format(
+              "Timed out while waiting for command '%s'",
+              invocation.stream().collect(Collectors.joining(" "))));
+    }
+    int exitCode = process.exitValue();
+    if (exitCode != 0) {
+      String errorString;
+      try {
+        errorString = errorFuture.get(commandTimeout.toMillis(), 
TimeUnit.MILLISECONDS);
+      } catch (Exception stderrEx) {
+        errorString = String.format("Error capturing stderr: %s", 
stderrEx.getMessage());
+      }
+      throw new IOException(
+          String.format(
+              "Received exit code %d for command '%s'. stderr: %s",
+              exitCode, invocation.stream().collect(Collectors.joining(" ")), 
errorString));
+    }
+    try {
+      // TODO: Consider a stricter timeout.
+      return resultString.get(commandTimeout.toMillis(), 
TimeUnit.MILLISECONDS);
+    } catch (ExecutionException e) {
+      Throwable cause = e.getCause();
+      // Recast any exceptions in reading output as IOExceptions.
+      throw new IOException(cause);
+    }
+  }
+}
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerContainerEnvironment.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerContainerEnvironment.java
new file mode 100644
index 00000000000..a4e33a18956
--- /dev/null
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerContainerEnvironment.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.environment;
+
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+
+/**
+ * A {@link RemoteEnvironment} that wraps a running Docker container.
+ *
+ * <p>A {@link DockerContainerEnvironment} owns both the underlying docker 
container that it
+ * communicates with an the {@link InstructionRequestHandler} that it uses to 
do so.
+ */
+@ThreadSafe
+class DockerContainerEnvironment implements RemoteEnvironment {
+
+  static DockerContainerEnvironment create(
+      DockerCommand docker,
+      Environment environment,
+      String containerId,
+      InstructionRequestHandler instructionHandler) {
+    return new DockerContainerEnvironment(docker, environment, containerId, 
instructionHandler);
+  }
+
+  private final Object lock = new Object();
+  private final DockerCommand docker;
+  private final Environment environment;
+  private final String containerId;
+  private final InstructionRequestHandler instructionHandler;
+
+  private boolean isClosed = false;
+
+  private DockerContainerEnvironment(
+      DockerCommand docker,
+      Environment environment,
+      String containerId,
+      InstructionRequestHandler instructionHandler) {
+    this.docker = docker;
+    this.environment = environment;
+    this.containerId = containerId;
+    this.instructionHandler = instructionHandler;
+  }
+
+  @Override
+  public Environment getEnvironment() {
+    return environment;
+  }
+
+  @Override
+  public InstructionRequestHandler getInstructionRequestHandler() {
+    return instructionHandler;
+  }
+
+  /**
+   * Closes this remote docker environment. The associated {@link 
InstructionRequestHandler} should
+   * not be used after calling this.
+   */
+  @Override
+  public void close() throws Exception {
+    synchronized (lock) {
+      // The running docker container and instruction handler should each only 
be terminated once.
+      // Do nothing if we have already requested termination.
+      if (!isClosed) {
+        isClosed = true;
+        instructionHandler.close();
+        docker.killContainer(containerId);
+      }
+    }
+  }
+}
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java
new file mode 100644
index 00000000000..c80d9d91478
--- /dev/null
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.environment;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
+import org.apache.beam.runners.fnexecution.control.ControlClientPool;
+import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import 
org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An {@link EnvironmentFactory} that creates docker containers by shelling 
out to docker. Returned
+ * {@link RemoteEnvironment RemoteEnvironments} own their respective docker 
containers. Not
+ * thread-safe.
+ */
+public class DockerEnvironmentFactory implements EnvironmentFactory {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DockerEnvironmentFactory.class);
+
+  public static DockerEnvironmentFactory forServices(
+      DockerCommand docker,
+      GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
+      GrpcFnServer<GrpcLoggingService> loggingServiceServer,
+      GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
+      GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
+      ControlClientPool.Source clientSource,
+      // TODO: Refine this to IdGenerator when we determine where that should 
live.
+      Supplier<String> idGenerator) {
+    return new DockerEnvironmentFactory(
+        docker,
+        controlServiceServer,
+        loggingServiceServer,
+        retrievalServiceServer,
+        provisioningServiceServer,
+        idGenerator,
+        clientSource);
+  }
+
+  private final DockerCommand docker;
+  private final GrpcFnServer<FnApiControlClientPoolService> 
controlServiceServer;
+  private final GrpcFnServer<GrpcLoggingService> loggingServiceServer;
+  private final GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer;
+  private final GrpcFnServer<StaticGrpcProvisionService> 
provisioningServiceServer;
+  private final Supplier<String> idGenerator;
+  private final ControlClientPool.Source clientSource;
+
+  private DockerEnvironmentFactory(
+      DockerCommand docker,
+      GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
+      GrpcFnServer<GrpcLoggingService> loggingServiceServer,
+      GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
+      GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
+      Supplier<String> idGenerator,
+      ControlClientPool.Source clientSource) {
+    this.docker = docker;
+    this.controlServiceServer = controlServiceServer;
+    this.loggingServiceServer = loggingServiceServer;
+    this.retrievalServiceServer = retrievalServiceServer;
+    this.provisioningServiceServer = provisioningServiceServer;
+    this.idGenerator = idGenerator;
+    this.clientSource = clientSource;
+  }
+
+  /** Creates a new, active {@link RemoteEnvironment} backed by a local Docker 
container. */
+  @Override
+  public RemoteEnvironment createEnvironment(Environment environment) throws 
Exception {
+    String workerId = idGenerator.get();
+
+    // Prepare docker invocation.
+    Path workerPersistentDirectory = 
Files.createTempDirectory("worker_persistent_directory");
+    Path semiPersistentDirectory = 
Files.createTempDirectory("semi_persistent_dir");
+    String containerImage = environment.getUrl();
+    // TODO: https://issues.apache.org/jira/browse/BEAM-4148 The default 
service address will not
+    // work for Docker for Mac.
+    String loggingEndpoint = 
loggingServiceServer.getApiServiceDescriptor().getUrl();
+    String artifactEndpoint = 
retrievalServiceServer.getApiServiceDescriptor().getUrl();
+    String provisionEndpoint = 
provisioningServiceServer.getApiServiceDescriptor().getUrl();
+    String controlEndpoint = 
controlServiceServer.getApiServiceDescriptor().getUrl();
+    List<String> args =
+        Arrays.asList(
+            "-v",
+            // TODO: Mac only allows temporary mounts under /tmp by default 
(as of 17.12).
+            String.format("%s:%s", workerPersistentDirectory, 
semiPersistentDirectory),
+            // NOTE: Host networking does not work on Mac, but the command 
line flag is accepted.
+            "--network=host",
+            containerImage,
+            String.format("--id=%s", workerId),
+            String.format("--logging_endpoint=%s", loggingEndpoint),
+            String.format("--artifact_endpoint=%s", artifactEndpoint),
+            String.format("--provision_endpoint=%s", provisionEndpoint),
+            String.format("--control_endpoint=%s", controlEndpoint),
+            String.format("--semi_persist_dir=%s", semiPersistentDirectory));
+
+    // Wrap the blocking call to clientSource.get in case an exception is 
thrown.
+    String containerId = null;
+    InstructionRequestHandler instructionHandler = null;
+    try {
+      containerId = docker.runImage(containerImage, args);
+      // Wait on a client from the gRPC server.
+      while (instructionHandler == null) {
+        try {
+          instructionHandler = clientSource.take(workerId, 
Duration.ofMinutes(2));
+        } catch (TimeoutException timeoutEx) {
+          LOG.info(
+              "Still waiting for startup of environment {} for worker id {}",
+              environment.getUrl(),
+              workerId);
+        } catch (InterruptedException interruptEx) {
+          Thread.currentThread().interrupt();
+          throw new RuntimeException(interruptEx);
+        }
+      }
+    } catch (Exception e) {
+      if (containerId != null) {
+        // Kill the launched docker container if we can't retrieve a client 
for it.
+        try {
+          docker.killContainer(containerId);
+        } catch (Exception dockerException) {
+          e.addSuppressed(dockerException);
+        }
+      }
+      throw e;
+    }
+
+    return DockerContainerEnvironment.create(docker, environment, containerId, 
instructionHandler);
+  }
+}
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EnvironmentFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EnvironmentFactory.java
index 8a47d54f324..eb62d2e9138 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EnvironmentFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EnvironmentFactory.java
@@ -22,16 +22,8 @@
 import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
 import org.apache.beam.runners.fnexecution.control.SdkHarnessClient;
 
-/**
- * Manages access to {@link Environment environments} which communicate to an 
{@link
- * SdkHarnessClient}.
- */
+/** Creates {@link Environment environments} which communicate to an {@link 
SdkHarnessClient}. */
 public interface EnvironmentFactory {
-  /**
-   * Retrieve a handle to an active {@link Environment}. This may allocate 
resources if required.
-   *
-   * <p>TODO: Determine and document the owner of the returned environment. If 
the environment is
-   * owned by the manager, make the Manager {@link AutoCloseable}.
-   */
-  RemoteEnvironment getEnvironment(RunnerApi.Environment container) throws 
Exception;
+  /** Creates an active {@link Environment} and returns a handle to it. */
+  RemoteEnvironment createEnvironment(RunnerApi.Environment container) throws 
Exception;
 }
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/testing/NeedsDocker.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/testing/NeedsDocker.java
new file mode 100644
index 00000000000..e009ecf934c
--- /dev/null
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/testing/NeedsDocker.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.environment.testing;
+
+/** Category for integration tests that require Docker. */
+public interface NeedsDocker {}
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/testing/package-info.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/testing/package-info.java
new file mode 100644
index 00000000000..5777a29f39d
--- /dev/null
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/testing/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Test utilities for the environment management package. */
+package org.apache.beam.runners.fnexecution.environment.testing;
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/InProcessSdkHarness.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/InProcessSdkHarness.java
index 5b507b9b16a..58f2614142e 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/InProcessSdkHarness.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/InProcessSdkHarness.java
@@ -21,14 +21,15 @@
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.grpc.ManagedChannel;
 import io.grpc.inprocess.InProcessChannelBuilder;
+import java.time.Duration;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import javax.annotation.Nullable;
 import org.apache.beam.fn.harness.FnHarness;
 import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
 import org.apache.beam.runners.fnexecution.control.ControlClientPool;
-import org.apache.beam.runners.fnexecution.control.FnApiControlClient;
 import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
-import org.apache.beam.runners.fnexecution.control.QueueControlClientPool;
+import org.apache.beam.runners.fnexecution.control.MapControlClientPool;
 import org.apache.beam.runners.fnexecution.control.SdkHarnessClient;
 import org.apache.beam.runners.fnexecution.data.GrpcDataService;
 import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
@@ -47,9 +48,15 @@
 public class InProcessSdkHarness extends ExternalResource implements TestRule {
 
   public static InProcessSdkHarness create() {
-    return new InProcessSdkHarness();
+    return new InProcessSdkHarness(null);
   }
 
+  public static InProcessSdkHarness withClientTimeout(Duration clientTimeout) {
+    return new InProcessSdkHarness(clientTimeout);
+  }
+
+  @Nullable private final Duration clientTimeout;
+
   private ExecutorService executor;
   private GrpcFnServer<GrpcLoggingService> loggingServer;
   private GrpcFnServer<GrpcDataService> dataServer;
@@ -57,7 +64,9 @@ public static InProcessSdkHarness create() {
 
   private SdkHarnessClient client;
 
-  private InProcessSdkHarness() {}
+  private InProcessSdkHarness(Duration clientTimeout) {
+    this.clientTimeout = clientTimeout;
+  }
 
   public SdkHarnessClient client() {
     return client;
@@ -70,7 +79,8 @@ public ApiServiceDescriptor dataEndpoint() {
   protected void before() throws Exception {
     InProcessServerFactory serverFactory = InProcessServerFactory.create();
     executor = Executors.newCachedThreadPool(new 
ThreadFactoryBuilder().setDaemon(true).build());
-    ControlClientPool<FnApiControlClient> clientPool = 
QueueControlClientPool.createSynchronous();
+    ControlClientPool clientPool;
+    clientPool = MapControlClientPool.create();
     FnApiControlClientPoolService clientPoolService =
         FnApiControlClientPoolService.offeringClientsToPool(
             clientPool.getSink(), 
GrpcContextHeaderAccessorProvider.getHeaderAccessor());
@@ -98,8 +108,11 @@ public ManagedChannel forDescriptor(ApiServiceDescriptor 
apiServiceDescriptor) {
           return null;
         });
 
-    client = SdkHarnessClient.usingFnApiClient(clientPool.getSource().get(),
-        dataServer.getService());
+    // TODO: https://issues.apache.org/jira/browse/BEAM-4149 Worker ids cannot 
currently be set by
+    // the harness. All clients have the implicit empty id for now.
+    client =
+        SdkHarnessClient.usingFnApiClient(
+            clientPool.getSource().take("", clientTimeout), 
dataServer.getService());
   }
 
   protected void after() {
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolServiceTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolServiceTest.java
index d259a96e3d1..ccae17a3caa 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolServiceTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolServiceTest.java
@@ -26,6 +26,7 @@
 import io.grpc.inprocess.InProcessChannelBuilder;
 import io.grpc.stub.StreamObserver;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -47,10 +48,7 @@
 @RunWith(JUnit4.class)
 public class FnApiControlClientPoolServiceTest {
 
-  // For ease of straight-line testing, we use a LinkedBlockingQueue; in 
practice a SynchronousQueue
-  // for matching incoming connections and server threads is likely.
-  private final ControlClientPool<FnApiControlClient> pool =
-      QueueControlClientPool.createBuffering();
+  private final ControlClientPool pool = MapControlClientPool.create();
   private final FnApiControlClientPoolService controlService =
       FnApiControlClientPoolService.offeringClientsToPool(
           pool.getSink(), 
GrpcContextHeaderAccessorProvider.getHeaderAccessor());
@@ -76,7 +74,8 @@ public void testIncomingConnection() throws Exception {
     StreamObserver<BeamFnApi.InstructionResponse> responseObserver =
         controlService.control(requestObserver);
 
-    FnApiControlClient client = pool.getSource().get();
+    // TODO: https://issues.apache.org/jira/browse/BEAM-4149 Use proper worker 
id.
+    InstructionRequestHandler client = pool.getSource().take("", 
Duration.ofSeconds(2));
 
     // Check that the client is wired up to the request channel
     String id = "fakeInstruction";
@@ -114,7 +113,8 @@ public void onCompleted() {
           }
         });
 
-    pool.getSource().get();
+    // TODO: https://issues.apache.org/jira/browse/BEAM-4149 Use proper worker 
id.
+    pool.getSource().take("", Duration.ofSeconds(2));
     server.close();
 
     latch.await();
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 80236f1a7be..7e32a6b7a1f 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -26,6 +26,7 @@
 import io.grpc.ManagedChannel;
 import io.grpc.inprocess.InProcessChannelBuilder;
 import java.io.Serializable;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -106,7 +107,7 @@ public void setup() throws Exception {
         GrpcFnServer.allocatePortAndCreateFor(
             GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), 
serverFactory);
 
-    ControlClientPool<FnApiControlClient> clientPool = 
QueueControlClientPool.createSynchronous();
+    ControlClientPool clientPool = MapControlClientPool.create();
     controlServer =
         GrpcFnServer.allocatePortAndCreateFor(
             FnApiControlClientPoolService.offeringClientsToPool(
@@ -128,7 +129,9 @@ public ManagedChannel forDescriptor(ApiServiceDescriptor 
apiServiceDescriptor) {
                   }
                 },
                 StreamObserverFactory.direct()));
-    FnApiControlClient controlClient = clientPool.getSource().get();
+    // TODO: https://issues.apache.org/jira/browse/BEAM-4149 Use proper worker 
id.
+    InstructionRequestHandler controlClient =
+        clientPool.getSource().take("", Duration.ofSeconds(2));
     this.controlClient = SdkHarnessClient.usingFnApiClient(controlClient, 
dataServer.getService());
   }
 
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
index 96b3f5917dc..566f0484af4 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
@@ -34,6 +34,7 @@
 
 import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -92,7 +93,9 @@
   @Mock public FnApiControlClient fnApiControlClient;
   @Mock public FnDataService dataService;
 
-  @Rule public InProcessSdkHarness harness = InProcessSdkHarness.create();
+  @Rule
+  public InProcessSdkHarness harness = 
InProcessSdkHarness.withClientTimeout(Duration.ofSeconds(5));
+
   @Rule public ExpectedException thrown = ExpectedException.none();
 
   private SdkHarnessClient sdkHarnessClient;
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/DockerCommandTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/DockerCommandTest.java
new file mode 100644
index 00000000000..64ae4c72489
--- /dev/null
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/DockerCommandTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.environment;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.runners.fnexecution.environment.testing.NeedsDocker;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link DockerCommand}. */
+@Category(NeedsDocker.class)
+@RunWith(JUnit4.class)
+public class DockerCommandTest {
+
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void helloWorld() throws Exception {
+    DockerCommand docker = getWrapper();
+    String container = docker.runImage("hello-world", Collections.emptyList());
+    System.out.printf("Started container: %s%n", container);
+  }
+
+  @Test
+  public void killContainer() throws Exception {
+    DockerCommand docker = getWrapper();
+    String container = docker.runImage("debian", Arrays.asList("/bin/bash", 
"-c", "sleep 60"));
+    Stopwatch stopwatch = Stopwatch.createStarted();
+    docker.killContainer(container);
+    long elapsedSec = stopwatch.elapsed(TimeUnit.SECONDS);
+    assertThat(
+        "Container termination should complete before image self-exits",
+        elapsedSec,
+        is(lessThan(60L)));
+  }
+
+  @Test
+  public void capturesErrorOutput() throws Exception {
+    DockerCommand docker = getWrapper();
+    thrown.expect(instanceOf(IOException.class));
+    thrown.expectMessage(containsString("Error response from daemon"));
+    String badImageName = "this-image-should-hopefully-never-exist";
+    String container = docker.runImage(badImageName, ImmutableList.of());
+    // We should never reach this line, but clean up in case we do.
+    docker.killContainer(container);
+    Assert.fail(String.format("Container creation for %s should have failed", 
badImageName));
+  }
+
+  private static DockerCommand getWrapper() {
+    return DockerCommand.forExecutable("docker", Duration.ofMillis(100_000));
+  }
+}
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactoryTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactoryTest.java
new file mode 100644
index 00000000000..5cfaeff7928
--- /dev/null
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactoryTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.environment;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
+import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import 
org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link DockerEnvironmentFactory}. */
+@RunWith(JUnit4.class)
+public class DockerEnvironmentFactoryTest {
+
+  private static final ApiServiceDescriptor SERVICE_DESCRIPTOR =
+      ApiServiceDescriptor.newBuilder().setUrl("service-url").build();
+  private static final String IMAGE_NAME = "my-image";
+  private static final Environment ENVIRONMENT =
+      Environment.newBuilder().setUrl(IMAGE_NAME).build();
+  private static final String CONTAINER_ID =
+      "e4485f0f2b813b63470feacba5fe9cb89699878c095df4124abd320fd5401385";
+
+  private static final AtomicLong nextId = new AtomicLong(0);
+  private static final Supplier<String> ID_GENERATOR =
+      () -> Long.toString(nextId.getAndIncrement());
+
+  @Mock private DockerCommand docker;
+
+  @Mock private GrpcFnServer<FnApiControlClientPoolService> 
controlServiceServer;
+  @Mock private GrpcFnServer<GrpcLoggingService> loggingServiceServer;
+  @Mock private GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer;
+  @Mock private GrpcFnServer<StaticGrpcProvisionService> 
provisioningServiceServer;
+
+  @Mock private InstructionRequestHandler client;
+
+  @Before
+  public void initMocks() {
+    MockitoAnnotations.initMocks(this);
+
+    
when(controlServiceServer.getApiServiceDescriptor()).thenReturn(SERVICE_DESCRIPTOR);
+    
when(loggingServiceServer.getApiServiceDescriptor()).thenReturn(SERVICE_DESCRIPTOR);
+    
when(retrievalServiceServer.getApiServiceDescriptor()).thenReturn(SERVICE_DESCRIPTOR);
+    
when(provisioningServiceServer.getApiServiceDescriptor()).thenReturn(SERVICE_DESCRIPTOR);
+  }
+
+  @Test
+  public void createsCorrectEnvironment() throws Exception {
+    when(docker.runImage(Mockito.eq(IMAGE_NAME), 
Mockito.any())).thenReturn(CONTAINER_ID);
+    DockerEnvironmentFactory factory = getFactory();
+
+    RemoteEnvironment handle = factory.createEnvironment(ENVIRONMENT);
+    assertThat(handle.getInstructionRequestHandler(), is(client));
+    assertThat(handle.getEnvironment(), equalTo(ENVIRONMENT));
+  }
+
+  @Test
+  public void destroysCorrectContainer() throws Exception {
+    when(docker.runImage(Mockito.eq(IMAGE_NAME), 
Mockito.any())).thenReturn(CONTAINER_ID);
+    DockerEnvironmentFactory factory = getFactory();
+
+    RemoteEnvironment handle = factory.createEnvironment(ENVIRONMENT);
+    handle.close();
+    verify(docker).killContainer(CONTAINER_ID);
+  }
+
+  @Test
+  public void createsMultipleEnvironments() throws Exception {
+    DockerEnvironmentFactory factory = getFactory();
+
+    Environment fooEnv = Environment.newBuilder().setUrl("foo").build();
+    RemoteEnvironment fooHandle = factory.createEnvironment(fooEnv);
+    assertThat(fooHandle.getEnvironment(), is(equalTo(fooEnv)));
+
+    Environment barEnv = Environment.newBuilder().setUrl("bar").build();
+    RemoteEnvironment barHandle = factory.createEnvironment(barEnv);
+    assertThat(barHandle.getEnvironment(), is(equalTo(barEnv)));
+  }
+
+  private DockerEnvironmentFactory getFactory() {
+    return DockerEnvironmentFactory.forServices(
+        docker,
+        controlServiceServer,
+        loggingServiceServer,
+        retrievalServiceServer,
+        provisioningServiceServer,
+        (workerId, timeout) -> client,
+        ID_GENERATOR);
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 94636)
    Time Spent: 21.5h  (was: 21h 20m)

> Add abstractions to manage Environment Instance lifecycles.
> -----------------------------------------------------------
>
>                 Key: BEAM-3327
>                 URL: https://issues.apache.org/jira/browse/BEAM-3327
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-core
>            Reporter: Thomas Groh
>            Assignee: Ben Sidhom
>            Priority: Major
>              Labels: portability
>          Time Spent: 21.5h
>  Remaining Estimate: 0h
>
> This permits remote stage execution for arbitrary environments



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to