[ 
https://issues.apache.org/jira/browse/BEAM-6160?focusedWorklogId=171738&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-171738
 ]

ASF GitHub Bot logged work on BEAM-6160:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/Dec/18 22:38
            Start Date: 03/Dec/18 22:38
    Worklog Time Spent: 10m 
      Work Description: lukecwik closed pull request #7168: [BEAM-6160] Use 
service server rather than service
URL: https://github.com/apache/beam/pull/7168
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
index b103dcef4b2c..2666dea61115 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
@@ -336,9 +336,9 @@ boolean doWork(WorkItem workItem, WorkItemStatusClient 
workItemStatusClient) thr
         worker =
             mapTaskExecutorFactory.create(
                 sdkWorkerHarness.getControlClientHandler(),
-                sdkWorkerHarness.getDataService(),
+                sdkWorkerHarness.getGrpcDataFnServer(),
                 sdkHarnessRegistry.beamFnDataApiServiceDescriptor(),
-                sdkWorkerHarness.getStateService(),
+                sdkWorkerHarness.getGrpcStateFnServer(),
                 network,
                 options,
                 stageName,
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
index 067b221fabc7..8e191e2654a4 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
@@ -55,6 +55,7 @@
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
 import 
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor;
 import 
org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation;
+import org.apache.beam.runners.dataflow.worker.fn.data.BeamFnDataGrpcService;
 import 
org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortReadOperation;
 import 
org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation;
 import org.apache.beam.runners.dataflow.worker.graph.Edges.Edge;
@@ -82,8 +83,10 @@
 import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.WriteOperation;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
 import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
 import org.apache.beam.runners.fnexecution.data.FnDataService;
+import org.apache.beam.runners.fnexecution.state.GrpcStateService;
 import org.apache.beam.runners.fnexecution.state.StateDelegator;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -115,9 +118,9 @@ private BeamFnMapTaskExecutorFactory() {}
   @Override
   public DataflowMapTaskExecutor create(
       InstructionRequestHandler instructionRequestHandler,
-      FnDataService beamFnDataService,
+      GrpcFnServer<BeamFnDataGrpcService.DataService> grpcDataFnServer,
       Endpoints.ApiServiceDescriptor dataApiServiceDescriptor,
-      StateDelegator beamFnStateDelegator,
+      GrpcFnServer<GrpcStateService> grpcStateFnServer,
       MutableNetwork<Node, Edge> network,
       PipelineOptions options,
       String stageName,
@@ -143,7 +146,7 @@ public DataflowMapTaskExecutor create(
         createOperationTransformForRegisterFnNodes(
             idGenerator,
             instructionRequestHandler,
-            beamFnStateDelegator,
+            grpcStateFnServer.getService(),
             stageName,
             executionContext));
 
@@ -153,7 +156,7 @@ public DataflowMapTaskExecutor create(
         network,
         createOperationTransformForGrpcPortNodes(
             network,
-            beamFnDataService,
+            grpcDataFnServer.getService(),
             // TODO: Set NameContext properly for these operations.
             executionContext.createOperationContext(
                 NameContext.create(stageName, stageName, stageName, 
stageName))));
@@ -165,7 +168,7 @@ public DataflowMapTaskExecutor create(
             network,
             idGenerator,
             instructionRequestHandler,
-            beamFnDataService,
+            grpcDataFnServer.getService(),
             dataApiServiceDescriptor,
             executionContext,
             stageName));
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMapTaskExecutorFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMapTaskExecutorFactory.java
index 56b2cbc23c96..b6bcb5a5f51a 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMapTaskExecutorFactory.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMapTaskExecutorFactory.java
@@ -22,11 +22,12 @@
 import java.util.function.Supplier;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
+import org.apache.beam.runners.dataflow.worker.fn.data.BeamFnDataGrpcService;
 import org.apache.beam.runners.dataflow.worker.graph.Edges.Edge;
 import org.apache.beam.runners.dataflow.worker.graph.Nodes.Node;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
 import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
-import org.apache.beam.runners.fnexecution.data.FnDataService;
-import org.apache.beam.runners.fnexecution.state.StateDelegator;
+import org.apache.beam.runners.fnexecution.state.GrpcStateService;
 import org.apache.beam.sdk.options.PipelineOptions;
 
 /** Creates a {@link DataflowMapTaskExecutor} from a {@link MapTask} 
definition. */
@@ -38,9 +39,9 @@
    */
   DataflowMapTaskExecutor create(
       InstructionRequestHandler instructionRequestHandler,
-      FnDataService beamFnDataService,
+      GrpcFnServer<BeamFnDataGrpcService.DataService> grpcDataFnServer,
       Endpoints.ApiServiceDescriptor dataApiServiceDescriptor,
-      StateDelegator beamFnStateDelegator,
+      GrpcFnServer<GrpcStateService> grpcStateFnServer,
       MutableNetwork<Node, Edge> network,
       PipelineOptions options,
       String stageName,
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowRunnerHarness.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowRunnerHarness.java
index c4f373648f2b..8fb610b52e10 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowRunnerHarness.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowRunnerHarness.java
@@ -26,14 +26,16 @@
 import javax.annotation.Nullable;
 import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.dataflow.DataflowRunner;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
 import org.apache.beam.runners.dataflow.worker.fn.BeamFnControlService;
-import org.apache.beam.runners.dataflow.worker.fn.ServerFactory;
 import org.apache.beam.runners.dataflow.worker.fn.data.BeamFnDataGrpcService;
 import org.apache.beam.runners.dataflow.worker.fn.logging.BeamFnLoggingService;
 import 
org.apache.beam.runners.dataflow.worker.fn.stream.ServerStreamObserverFactory;
 import 
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer;
 import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
+import org.apache.beam.runners.fnexecution.ServerFactory;
 import org.apache.beam.runners.fnexecution.control.FnApiControlClient;
 import org.apache.beam.runners.fnexecution.state.GrpcStateService;
 import org.apache.beam.sdk.io.FileSystems;
@@ -76,7 +78,16 @@ public static void main(String[] unusedArgs) throws 
Exception {
     // Initialized registered file systems.˜
     FileSystems.setDefaultPipelineOptions(pipelineOptions);
 
-    ServerFactory serverFactory = ServerFactory.fromOptions(pipelineOptions);
+    DataflowPipelineDebugOptions dataflowOptions =
+        pipelineOptions.as(DataflowPipelineDebugOptions.class);
+    ServerFactory serverFactory;
+    if (DataflowRunner.hasExperiment(dataflowOptions, 
"beam_fn_api_epoll_domain_socket")) {
+      serverFactory = ServerFactory.createEpollDomainSocket();
+    } else if (DataflowRunner.hasExperiment(dataflowOptions, 
"beam_fn_api_epoll")) {
+      serverFactory = ServerFactory.createEpollSocket();
+    } else {
+      serverFactory = ServerFactory.createDefault();
+    }
     ServerStreamObserverFactory streamObserverFactory =
         ServerStreamObserverFactory.fromOptions(pipelineOptions);
 
@@ -103,11 +114,11 @@ public static void main(String[] unusedArgs) throws 
Exception {
 
       servicesServer =
           serverFactory.create(
-              controlApiService,
-              ImmutableList.of(beamFnControlService, beamFnDataService, 
beamFnStateService));
+              ImmutableList.of(beamFnControlService, beamFnDataService, 
beamFnStateService),
+              controlApiService);
 
       loggingServer =
-          serverFactory.create(loggingApiService, 
ImmutableList.of(beamFnLoggingService));
+          serverFactory.create(ImmutableList.of(beamFnLoggingService), 
loggingApiService);
 
       start(
           pipeline,
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java
index 2bd0c1e712c0..e632719bc7a6 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java
@@ -47,6 +47,7 @@
 import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
 import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
+import 
org.apache.beam.runners.dataflow.worker.fn.data.BeamFnDataGrpcService.DataService;
 import org.apache.beam.runners.dataflow.worker.graph.Edges.Edge;
 import org.apache.beam.runners.dataflow.worker.graph.Edges.MultiOutputInfoEdge;
 import org.apache.beam.runners.dataflow.worker.graph.Networks;
@@ -69,9 +70,9 @@
 import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.WriteOperation;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
 import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
-import org.apache.beam.runners.fnexecution.data.FnDataService;
-import org.apache.beam.runners.fnexecution.state.StateDelegator;
+import org.apache.beam.runners.fnexecution.state.GrpcStateService;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -98,9 +99,9 @@ private IntrinsicMapTaskExecutorFactory() {}
   @Override
   public DataflowMapTaskExecutor create(
       InstructionRequestHandler instructionRequestHandler,
-      FnDataService beamFnDataService,
+      GrpcFnServer<DataService> grpcDataFnServer,
       Endpoints.ApiServiceDescriptor dataApiServiceDescriptor,
-      StateDelegator beamFnStateDelegator,
+      GrpcFnServer<GrpcStateService> grpcStateFnServer,
       MutableNetwork<Node, Edge> network,
       PipelineOptions options,
       String stageName,
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SdkHarnessRegistries.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SdkHarnessRegistries.java
index 3e3284ec969a..a0d54eada51b 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SdkHarnessRegistries.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SdkHarnessRegistries.java
@@ -26,8 +26,8 @@
 import javax.annotation.Nullable;
 import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
 import org.apache.beam.runners.dataflow.worker.fn.data.BeamFnDataGrpcService;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
 import org.apache.beam.runners.fnexecution.control.FnApiControlClient;
-import org.apache.beam.runners.fnexecution.data.FnDataService;
 import org.apache.beam.runners.fnexecution.state.GrpcStateService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -187,14 +187,15 @@ public String getWorkerId() {
 
       @Override
       @Nullable
-      public FnDataService getDataService() {
-        return beamFnDataGrpcService.getDataService(getWorkerId());
+      public GrpcFnServer<BeamFnDataGrpcService.DataService> 
getGrpcDataFnServer() {
+        return GrpcFnServer.create(
+            beamFnDataGrpcService.getDataService(getWorkerId()), 
beamFnDataApiServiceDescriptor());
       }
 
       @Override
       @Nullable
-      public GrpcStateService getStateService() {
-        return beamFnStateService;
+      public GrpcFnServer<GrpcStateService> getGrpcStateFnServer() {
+        return GrpcFnServer.create(beamFnStateService, 
beamFnDataApiServiceDescriptor());
       }
     }
   }
@@ -228,13 +229,13 @@ public String getWorkerId() {
 
           @Nullable
           @Override
-          public FnDataService getDataService() {
+          public GrpcFnServer<BeamFnDataGrpcService.DataService> 
getGrpcDataFnServer() {
             return null;
           }
 
           @Nullable
           @Override
-          public GrpcStateService getStateService() {
+          public GrpcFnServer<GrpcStateService> getGrpcStateFnServer() {
             return null;
           }
         };
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SdkHarnessRegistry.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SdkHarnessRegistry.java
index 8c11d148c6ef..383d292df7c9 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SdkHarnessRegistry.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SdkHarnessRegistry.java
@@ -19,9 +19,10 @@
 
 import javax.annotation.Nullable;
 import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.runners.dataflow.worker.fn.data.BeamFnDataGrpcService;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
 import org.apache.beam.runners.fnexecution.control.FnApiControlClient;
-import org.apache.beam.runners.fnexecution.data.FnDataService;
-import org.apache.beam.runners.fnexecution.state.StateDelegator;
+import org.apache.beam.runners.fnexecution.state.GrpcStateService;
 
 /** Registry used to manage all the connections (Control, Data, State) from 
SdkHarness */
 public interface SdkHarnessRegistry {
@@ -60,9 +61,9 @@
     public String getWorkerId();
 
     @Nullable
-    public FnDataService getDataService();
+    public GrpcFnServer<BeamFnDataGrpcService.DataService> 
getGrpcDataFnServer();
 
     @Nullable
-    public StateDelegator getStateService();
+    public GrpcFnServer<GrpcStateService> getGrpcStateFnServer();
   }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 070973bb2b44..80556a8194d8 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -1143,9 +1143,9 @@ private void process(
         DataflowMapTaskExecutor mapTaskExecutor =
             mapTaskExecutorFactory.create(
                 worker.getControlClientHandler(),
-                worker.getDataService(),
+                worker.getGrpcDataFnServer(),
                 sdkHarnessRegistry.beamFnDataApiServiceDescriptor(),
-                worker.getStateService(),
+                worker.getGrpcStateFnServer(),
                 mapTaskNetwork,
                 options,
                 mapTask.getStageName(),
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/ServerFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/ServerFactory.java
deleted file mode 100644
index 616ce0ff563d..000000000000
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/ServerFactory.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.worker.fn;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.net.HostAndPort;
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.List;
-import org.apache.beam.model.pipeline.v1.Endpoints;
-import org.apache.beam.runners.dataflow.DataflowRunner;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
-import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.BindableService;
-import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.Server;
-import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.ServerBuilder;
-import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.ServerInterceptors;
-import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.netty.NettyServerBuilder;
-import 
org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.EpollEventLoopGroup;
-import 
org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.EpollServerDomainSocketChannel;
-import 
org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.EpollServerSocketChannel;
-import 
org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.unix.DomainSocketAddress;
-import 
org.apache.beam.vendor.grpc.v1_13_1.io.netty.util.internal.ThreadLocalRandom;
-
-/**
- * A {@link Server gRPC Server} factory that returns a server based upon 
{@link PipelineOptions}
- * experiments. <br>
- * TODO: Kill {@link ServerFactory} instead use {@link
- * org.apache.beam.runners.fnexecution.ServerFactory}.
- */
-@Deprecated
-public abstract class ServerFactory {
-  public static ServerFactory fromOptions(PipelineOptions options) {
-    DataflowPipelineDebugOptions dataflowOptions = 
options.as(DataflowPipelineDebugOptions.class);
-    if (DataflowRunner.hasExperiment(dataflowOptions, 
"beam_fn_api_epoll_domain_socket")) {
-      return new EpollDomainSocket();
-    } else if (DataflowRunner.hasExperiment(dataflowOptions, 
"beam_fn_api_epoll")) {
-      return new EpollSocket();
-    }
-    return new Default();
-  }
-
-  /**
-   * Allocates a port for a server using an ephemeral port chosen 
automatically. The chosen port is
-   * accessible to the caller from the URL set in the input {@link
-   * Endpoints.ApiServiceDescriptor.Builder}. Server applies {@link
-   * GrpcContextHeaderAccessorProvider#interceptor()} to all incoming requests.
-   */
-  public abstract Server allocatePortAndCreate(
-      Endpoints.ApiServiceDescriptor.Builder builder, List<BindableService> 
services)
-      throws IOException;
-
-  /**
-   * Creates an instance of this server at the address specified by the given 
service descriptor.
-   * Server applies {@link GrpcContextHeaderAccessorProvider#interceptor()} to 
all incoming
-   * requests.
-   */
-  public abstract Server create(
-      Endpoints.ApiServiceDescriptor serviceDescriptor, List<BindableService> 
services)
-      throws IOException;
-
-  /**
-   * Creates a {@link Server gRPC Server} using a Unix domain socket. Note 
that this requires <a
-   * href="http://netty.io/wiki/forked-tomcat-native.html";>Netty TcNative</a> 
available to be able
-   * to provide a {@link EpollServerDomainSocketChannel}.
-   *
-   * <p>The unix domain socket is located at 
${java.io.tmpdir}/fnapi${random[0-10000)}.sock
-   */
-  private static class EpollDomainSocket extends ServerFactory {
-    private static File getFileForPort(int port) {
-      return new File(System.getProperty("java.io.tmpdir"), 
String.format("fnapi%d.sock", port));
-    }
-
-    @Override
-    public Server allocatePortAndCreate(
-        Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptor, 
List<BindableService> services)
-        throws IOException {
-      File tmp;
-      do {
-        tmp = getFileForPort(ThreadLocalRandom.current().nextInt(10000));
-      } while (tmp.exists());
-      apiServiceDescriptor.setUrl("unix://" + tmp.getAbsolutePath());
-      return create(apiServiceDescriptor.build(), services);
-    }
-
-    @Override
-    public Server create(
-        Endpoints.ApiServiceDescriptor serviceDescriptor, 
List<BindableService> services)
-        throws IOException {
-      SocketAddress socketAddress = 
SocketAddressFactory.createFrom(serviceDescriptor.getUrl());
-      checkArgument(
-          socketAddress instanceof DomainSocketAddress,
-          "%s requires a Unix domain socket address, got %s",
-          EpollDomainSocket.class.getSimpleName(),
-          serviceDescriptor.getUrl());
-      return createServer((DomainSocketAddress) socketAddress, services);
-    }
-
-    private static Server createServer(
-        DomainSocketAddress domainSocket, List<BindableService> services) 
throws IOException {
-      NettyServerBuilder builder =
-          NettyServerBuilder.forAddress(domainSocket)
-              .channelType(EpollServerDomainSocketChannel.class)
-              .workerEventLoopGroup(new EpollEventLoopGroup())
-              .bossEventLoopGroup(new EpollEventLoopGroup())
-              .maxMessageSize(Integer.MAX_VALUE);
-      for (BindableService service : services) {
-        // Wrap the service to extract headers
-        builder.addService(
-            ServerInterceptors.intercept(service, 
GrpcContextHeaderAccessorProvider.interceptor()));
-      }
-      return builder.build().start();
-    }
-  }
-
-  /**
-   * Creates a {@link Server gRPC Server} using an Epoll socket. Note that 
this requires <a
-   * href="http://netty.io/wiki/forked-tomcat-native.html";>Netty TcNative</a> 
available to be able
-   * to provide a {@link EpollServerSocketChannel}.
-   *
-   * <p>The server is created listening any open port on "localhost".
-   */
-  private static class EpollSocket extends ServerFactory {
-    @Override
-    public Server allocatePortAndCreate(
-        Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptor, 
List<BindableService> services)
-        throws IOException {
-      InetSocketAddress address = new 
InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
-      Server server = createServer(address, services);
-      apiServiceDescriptor.setUrl(
-          HostAndPort.fromParts(address.getHostName(), 
server.getPort()).toString());
-      return server;
-    }
-
-    @Override
-    public Server create(
-        Endpoints.ApiServiceDescriptor serviceDescriptor, 
List<BindableService> services)
-        throws IOException {
-      SocketAddress socketAddress = 
SocketAddressFactory.createFrom(serviceDescriptor.getUrl());
-      checkArgument(
-          socketAddress instanceof InetSocketAddress,
-          "%s requires a host:port socket address, got %s",
-          EpollSocket.class.getSimpleName(),
-          serviceDescriptor.getUrl());
-      return createServer((InetSocketAddress) socketAddress, services);
-    }
-
-    private static Server createServer(InetSocketAddress socket, 
List<BindableService> services)
-        throws IOException {
-      ServerBuilder builder =
-          NettyServerBuilder.forAddress(socket)
-              .channelType(EpollServerSocketChannel.class)
-              .workerEventLoopGroup(new EpollEventLoopGroup())
-              .bossEventLoopGroup(new EpollEventLoopGroup())
-              .maxMessageSize(Integer.MAX_VALUE);
-      for (BindableService service : services) {
-        // Wrap the service to extract headers
-        builder.addService(
-            ServerInterceptors.intercept(service, 
GrpcContextHeaderAccessorProvider.interceptor()));
-      }
-      return builder.build().start();
-    }
-  }
-
-  /**
-   * Creates a {@link Server gRPC Server} using the default server factory.
-   *
-   * <p>The server is created listening any open port on "localhost".
-   */
-  private static class Default extends ServerFactory {
-    @Override
-    public Server allocatePortAndCreate(
-        Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptor, 
List<BindableService> services)
-        throws IOException {
-      InetSocketAddress address = new 
InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
-      Server server = createServer(address, services);
-      apiServiceDescriptor.setUrl(
-          HostAndPort.fromParts(address.getHostName(), 
server.getPort()).toString());
-      return server;
-    }
-
-    @Override
-    public Server create(
-        Endpoints.ApiServiceDescriptor serviceDescriptor, 
List<BindableService> services)
-        throws IOException {
-      SocketAddress socketAddress = 
SocketAddressFactory.createFrom(serviceDescriptor.getUrl());
-      checkArgument(
-          socketAddress instanceof InetSocketAddress,
-          "Default ServerFactory requires a host:port socket address, got %s",
-          serviceDescriptor.getUrl());
-      return createServer((InetSocketAddress) socketAddress, services);
-    }
-
-    private static Server createServer(InetSocketAddress socket, 
List<BindableService> services)
-        throws IOException {
-      NettyServerBuilder builder =
-          NettyServerBuilder.forPort(socket.getPort())
-              // Set the message size to max value here. The actual size is 
governed by the
-              // buffer size in the layers above.
-              .maxMessageSize(Integer.MAX_VALUE);
-      for (BindableService service : services) {
-        // Wrap the service to extract headers
-        builder.addService(
-            ServerInterceptors.intercept(service, 
GrpcContextHeaderAccessorProvider.interceptor()));
-      }
-      return builder.build().start();
-    }
-  }
-}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/BeamFnDataGrpcService.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/BeamFnDataGrpcService.java
index be295f9f0a4b..d81b902be88c 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/BeamFnDataGrpcService.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/BeamFnDataGrpcService.java
@@ -32,6 +32,7 @@
 import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.runners.dataflow.worker.fn.grpc.BeamFnService;
+import org.apache.beam.runners.fnexecution.FnService;
 import org.apache.beam.runners.fnexecution.HeaderAccessor;
 import org.apache.beam.runners.fnexecution.data.FnDataService;
 import org.apache.beam.sdk.coders.Coder;
@@ -204,42 +205,55 @@ public void fail(Throwable t) {
     }
   }
 
-  /** Get the DataService for the clientId */
-  public FnDataService getDataService(final String clientId) {
-    return new FnDataService() {
-      @Override
-      public <T> InboundDataClient receive(
-          LogicalEndpoint inputLocation,
-          Coder<WindowedValue<T>> coder,
-          FnDataReceiver<WindowedValue<T>> consumer) {
-        LOG.debug("Registering consumer for {}", inputLocation);
+  // A wrapper class
+  public class DataService extends BeamFnDataGrpc.BeamFnDataImplBase
+      implements FnDataService, FnService {
+    private final String clientId;
 
-        return new DeferredInboundDataClient(clientId, inputLocation, coder, 
consumer);
-      }
+    public DataService(String clientId) {
+      this.clientId = clientId;
+    }
 
-      @Override
-      public <T> CloseableFnDataReceiver<WindowedValue<T>> send(
-          LogicalEndpoint outputLocation, Coder<WindowedValue<T>> coder) {
-        LOG.debug("Creating output consumer for {}", outputLocation);
-        try {
-          if (outboundBufferLimit.isPresent()) {
-            return 
BeamFnDataBufferingOutboundObserver.forLocationWithBufferLimit(
-                outboundBufferLimit.get(),
-                outputLocation,
-                coder,
-                getClientFuture(clientId).get().getOutboundObserver());
-          } else {
-            return BeamFnDataBufferingOutboundObserver.forLocation(
-                outputLocation, coder, 
getClientFuture(clientId).get().getOutboundObserver());
-          }
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new RuntimeException(e);
-        } catch (ExecutionException e) {
-          throw new RuntimeException(e);
+    @Override
+    public <T> InboundDataClient receive(
+        LogicalEndpoint inputLocation,
+        Coder<WindowedValue<T>> coder,
+        FnDataReceiver<WindowedValue<T>> consumer) {
+      LOG.debug("Registering consumer for {}", inputLocation);
+
+      return new DeferredInboundDataClient(this.clientId, inputLocation, 
coder, consumer);
+    }
+
+    @Override
+    public <T> CloseableFnDataReceiver<WindowedValue<T>> send(
+        LogicalEndpoint outputLocation, Coder<WindowedValue<T>> coder) {
+      LOG.debug("Creating output consumer for {}", outputLocation);
+      try {
+        if (outboundBufferLimit.isPresent()) {
+          return 
BeamFnDataBufferingOutboundObserver.forLocationWithBufferLimit(
+              outboundBufferLimit.get(),
+              outputLocation,
+              coder,
+              getClientFuture(this.clientId).get().getOutboundObserver());
+        } else {
+          return BeamFnDataBufferingOutboundObserver.forLocation(
+              outputLocation, coder, 
getClientFuture(this.clientId).get().getOutboundObserver());
         }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      } catch (ExecutionException e) {
+        throw new RuntimeException(e);
       }
-    };
+    }
+
+    @Override
+    public void close() throws Exception {}
+  }
+
+  /** Get the DataService for the clientId */
+  public DataService getDataService(final String clientId) {
+    return new DataService(clientId);
   }
 
   @Override
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/BeamFnControlServiceTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/BeamFnControlServiceTest.java
index 4620ecb1c896..b6607325f8f6 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/BeamFnControlServiceTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/BeamFnControlServiceTest.java
@@ -31,6 +31,7 @@
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import 
org.apache.beam.runners.dataflow.worker.fn.stream.ServerStreamObserverFactory;
 import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
+import org.apache.beam.runners.fnexecution.ServerFactory;
 import org.apache.beam.runners.fnexecution.control.FnApiControlClient;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -73,8 +74,7 @@ public void testClientConnecting() throws Exception {
             descriptor,
             ServerStreamObserverFactory.fromOptions(options)::from,
             GrpcContextHeaderAccessorProvider.getHeaderAccessor());
-    Server server =
-        ServerFactory.fromOptions(options).create(descriptor, 
ImmutableList.of(service));
+    Server server = 
ServerFactory.createDefault().create(ImmutableList.of(service), descriptor);
     String url = service.getApiServiceDescriptor().getUrl();
     BeamFnControlGrpc.BeamFnControlStub clientStub =
         
BeamFnControlGrpc.newStub(ManagedChannelBuilder.forTarget(url).usePlaintext(true).build());
@@ -102,8 +102,7 @@ public void testMultipleClientsConnecting() throws 
Exception {
             descriptor,
             ServerStreamObserverFactory.fromOptions(options)::from,
             GrpcContextHeaderAccessorProvider.getHeaderAccessor());
-    Server server =
-        ServerFactory.fromOptions(options).create(descriptor, 
ImmutableList.of(service));
+    Server server = 
ServerFactory.createDefault().create(ImmutableList.of(service), descriptor);
 
     String url = service.getApiServiceDescriptor().getUrl();
     BeamFnControlGrpc.BeamFnControlStub clientStub =
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/ServerFactoryTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/ServerFactoryTest.java
deleted file mode 100644
index 70b9980b755b..000000000000
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/ServerFactoryTest.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.worker.fn;
-
-import static org.hamcrest.Matchers.allOf;
-import static org.hamcrest.Matchers.anyOf;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.lessThan;
-import static org.hamcrest.Matchers.startsWith;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assume.assumeTrue;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.net.HostAndPort;
-import com.google.common.util.concurrent.Uninterruptibles;
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
-import org.apache.beam.model.pipeline.v1.Endpoints;
-import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
-import org.apache.beam.runners.dataflow.harness.test.TestStreams;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.ManagedChannel;
-import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.ManagedChannelBuilder;
-import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.Server;
-import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.netty.NettyChannelBuilder;
-import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.stub.CallStreamObserver;
-import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.stub.StreamObserver;
-import org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.Epoll;
-import 
org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.EpollDomainSocketChannel;
-import 
org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.EpollEventLoopGroup;
-import 
org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.EpollSocketChannel;
-import 
org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.unix.DomainSocketAddress;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link ServerFactory}. */
-@RunWith(JUnit4.class)
-public class ServerFactoryTest {
-  private static final BeamFnApi.Elements CLIENT_DATA =
-      BeamFnApi.Elements.newBuilder()
-          
.addData(BeamFnApi.Elements.Data.newBuilder().setInstructionReference("1"))
-          .build();
-  private static final BeamFnApi.Elements SERVER_DATA =
-      BeamFnApi.Elements.newBuilder()
-          
.addData(BeamFnApi.Elements.Data.newBuilder().setInstructionReference("1"))
-          .build();
-
-  @Test
-  public void testCreatingDefaultServer() throws Exception {
-    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
-        runTestUsing(PipelineOptionsFactory.create());
-    HostAndPort hostAndPort = 
HostAndPort.fromString(apiServiceDescriptor.getUrl());
-    assertThat(
-        hostAndPort.getHost(),
-        anyOf(
-            equalTo(InetAddress.getLoopbackAddress().getHostName()),
-            equalTo(InetAddress.getLoopbackAddress().getHostAddress())));
-    assertThat(hostAndPort.getPort(), allOf(greaterThan(0), lessThan(65536)));
-  }
-
-  @Test
-  public void testCreatingEpollServer() throws Exception {
-    assumeTrue(Epoll.isAvailable());
-    // tcnative only supports the ipv4 address family
-    assumeTrue(InetAddress.getLoopbackAddress() instanceof Inet4Address);
-    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
-        runTestUsing(
-            PipelineOptionsFactory.fromArgs(new String[] 
{"--experiments=beam_fn_api_epoll"})
-                .create());
-    HostAndPort hostAndPort = 
HostAndPort.fromString(apiServiceDescriptor.getUrl());
-    assertThat(
-        hostAndPort.getHost(),
-        anyOf(
-            equalTo(InetAddress.getLoopbackAddress().getHostName()),
-            equalTo(InetAddress.getLoopbackAddress().getHostAddress())));
-    assertThat(hostAndPort.getPort(), allOf(greaterThan(0), lessThan(65536)));
-  }
-
-  @Test
-  public void testCreatingUnixDomainSocketServer() throws Exception {
-    assumeTrue(Epoll.isAvailable());
-    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
-        runTestUsing(
-            PipelineOptionsFactory.fromArgs(
-                    new String[] {
-                      
"--experiments=beam_fn_api_epoll,beam_fn_api_epoll_domain_socket"
-                    })
-                .create());
-    assertThat(
-        apiServiceDescriptor.getUrl(),
-        startsWith("unix://" + System.getProperty("java.io.tmpdir")));
-  }
-
-  private Endpoints.ApiServiceDescriptor runTestUsing(PipelineOptions options) 
throws Exception {
-    ManagedChannelFactory channelFactory = ManagedChannelFactory.from(options);
-    Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptorBuilder =
-        Endpoints.ApiServiceDescriptor.newBuilder();
-
-    Collection<BeamFnApi.Elements> serverElements = new ArrayList<>();
-    CountDownLatch clientHangedUp = new CountDownLatch(1);
-    CallStreamObserver<BeamFnApi.Elements> serverInboundObserver =
-        TestStreams.withOnNext(serverElements::add)
-            .withOnCompleted(clientHangedUp::countDown)
-            .build();
-    TestDataService service = new TestDataService(serverInboundObserver);
-
-    ServerFactory serverFactory = ServerFactory.fromOptions(options);
-    Server server =
-        serverFactory.allocatePortAndCreate(apiServiceDescriptorBuilder, 
ImmutableList.of(service));
-    assertFalse(server.isShutdown());
-    ManagedChannel channel = 
channelFactory.forDescriptor(apiServiceDescriptorBuilder.build());
-    BeamFnDataGrpc.BeamFnDataStub stub = BeamFnDataGrpc.newStub(channel);
-    Collection<BeamFnApi.Elements> clientElements = new ArrayList<>();
-    CountDownLatch serverHangedUp = new CountDownLatch(1);
-    CallStreamObserver<BeamFnApi.Elements> clientInboundObserver =
-        TestStreams.withOnNext(clientElements::add)
-            .withOnCompleted(serverHangedUp::countDown)
-            .build();
-
-    StreamObserver<BeamFnApi.Elements> clientOutboundObserver = 
stub.data(clientInboundObserver);
-    StreamObserver<BeamFnApi.Elements> serverOutboundObserver = 
service.outboundObservers.take();
-
-    clientOutboundObserver.onNext(CLIENT_DATA);
-    serverOutboundObserver.onNext(SERVER_DATA);
-    clientOutboundObserver.onCompleted();
-    clientHangedUp.await();
-    serverOutboundObserver.onCompleted();
-    serverHangedUp.await();
-
-    assertThat(clientElements, contains(SERVER_DATA));
-    assertThat(serverElements, contains(CLIENT_DATA));
-    server.shutdown();
-    server.awaitTermination(1, TimeUnit.SECONDS);
-    server.shutdownNow();
-
-    return apiServiceDescriptorBuilder.build();
-  }
-
-  /** A test gRPC service that uses the provided inbound observer for all 
clients. */
-  private static class TestDataService extends 
BeamFnDataGrpc.BeamFnDataImplBase {
-    private final LinkedBlockingQueue<StreamObserver<BeamFnApi.Elements>> 
outboundObservers;
-    private final StreamObserver<BeamFnApi.Elements> inboundObserver;
-
-    private TestDataService(StreamObserver<BeamFnApi.Elements> 
inboundObserver) {
-      this.inboundObserver = inboundObserver;
-      this.outboundObservers = new LinkedBlockingQueue<>();
-    }
-
-    @Override
-    public StreamObserver<BeamFnApi.Elements> data(
-        StreamObserver<BeamFnApi.Elements> outboundObserver) {
-      Uninterruptibles.putUninterruptibly(outboundObservers, outboundObserver);
-      return inboundObserver;
-    }
-  }
-
-  /**
-   * Uses {@link PipelineOptions} to configure which underlying {@link 
ManagedChannel}
-   * implementation to use.
-   *
-   * <p>TODO: Remove this fork once available from a common shared library.
-   */
-  public abstract static class ManagedChannelFactory {
-    public static ManagedChannelFactory from(PipelineOptions options) {
-      List<String> experiments = 
options.as(DataflowPipelineDebugOptions.class).getExperiments();
-      if (experiments != null && experiments.contains("beam_fn_api_epoll")) {
-        
org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.Epoll.ensureAvailability();
-        return new Epoll();
-      }
-      return new Default();
-    }
-
-    public abstract ManagedChannel forDescriptor(ApiServiceDescriptor 
apiServiceDescriptor);
-
-    /**
-     * Creates a {@link ManagedChannel} backed by an {@link 
EpollDomainSocketChannel} if the address
-     * is a {@link DomainSocketAddress}. Otherwise creates a {@link 
ManagedChannel} backed by an
-     * {@link EpollSocketChannel}.
-     */
-    private static class Epoll extends ManagedChannelFactory {
-      @Override
-      public ManagedChannel forDescriptor(ApiServiceDescriptor 
apiServiceDescriptor) {
-        SocketAddress address = 
SocketAddressFactory.createFrom(apiServiceDescriptor.getUrl());
-        return NettyChannelBuilder.forAddress(address)
-            .channelType(
-                address instanceof DomainSocketAddress
-                    ? EpollDomainSocketChannel.class
-                    : EpollSocketChannel.class)
-            .eventLoopGroup(new EpollEventLoopGroup())
-            .usePlaintext(true)
-            // Set the message size to max value here. The actual size is 
governed by the
-            // buffer size in the layers above.
-            .maxInboundMessageSize(Integer.MAX_VALUE)
-            .build();
-      }
-    }
-
-    /**
-     * Creates a {@link ManagedChannel} relying on the {@link 
ManagedChannelBuilder} to create
-     * instances.
-     */
-    private static class Default extends ManagedChannelFactory {
-      @Override
-      public ManagedChannel forDescriptor(ApiServiceDescriptor 
apiServiceDescriptor) {
-        return ManagedChannelBuilder.forTarget(apiServiceDescriptor.getUrl())
-            .usePlaintext(true)
-            // Set the message size to max value here. The actual size is 
governed by the
-            // buffer size in the layers above.
-            .maxInboundMessageSize(Integer.MAX_VALUE)
-            .build();
-      }
-    }
-  }
-}
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcFnServer.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcFnServer.java
index 52bcbafc1c5f..aabfd1f7f0ab 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcFnServer.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcFnServer.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.fnexecution;
 
+import com.google.common.collect.ImmutableList;
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
@@ -33,7 +34,8 @@
   public static <ServiceT extends FnService> GrpcFnServer<ServiceT> 
allocatePortAndCreateFor(
       ServiceT service, ServerFactory factory) throws IOException {
     ApiServiceDescriptor.Builder apiServiceDescriptor = 
ApiServiceDescriptor.newBuilder();
-    Server server = factory.allocatePortAndCreate(service, 
apiServiceDescriptor);
+    Server server =
+        factory.allocateAddressAndCreate(ImmutableList.of(service), 
apiServiceDescriptor);
     return new GrpcFnServer<>(server, service, apiServiceDescriptor.build());
   }
 
@@ -43,7 +45,18 @@
    */
   public static <ServiceT extends FnService> GrpcFnServer<ServiceT> create(
       ServiceT service, ApiServiceDescriptor endpoint, ServerFactory factory) 
throws IOException {
-    return new GrpcFnServer<>(factory.create(service, endpoint), service, 
endpoint);
+    return new GrpcFnServer<>(
+        factory.create(ImmutableList.of(service), endpoint), service, 
endpoint);
+  }
+
+  /** @deprecated This create function is used for Dataflow migration purpose 
only. */
+  @Deprecated
+  public static <ServiceT extends FnService> GrpcFnServer<ServiceT> create(
+      ServiceT service, ApiServiceDescriptor endpoint) {
+    return new GrpcFnServer(null, service, endpoint) {
+      @Override
+      public void close() throws Exception {}
+    };
   }
 
   private final Server server;
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java
index 9300d6b9514c..6f6afdd77e64 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.fnexecution;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
 import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.BindableService;
@@ -39,20 +40,32 @@ public static InProcessServerFactory create() {
   private InProcessServerFactory() {}
 
   @Override
-  public Server allocatePortAndCreate(BindableService service, 
ApiServiceDescriptor.Builder builder)
-      throws IOException {
+  public Server allocateAddressAndCreate(
+      List<BindableService> services, ApiServiceDescriptor.Builder builder) 
throws IOException {
     String name = String.format("InProcessServer_%s", 
serviceNameUniqifier.getAndIncrement());
     builder.setUrl(name);
-    return 
InProcessServerBuilder.forName(name).addService(service).build().start();
+    InProcessServerBuilder serverBuilder = 
InProcessServerBuilder.forName(name);
+    services
+        .stream()
+        .forEach(
+            service ->
+                serverBuilder.addService(
+                    ServerInterceptors.intercept(
+                        service, 
GrpcContextHeaderAccessorProvider.interceptor())));
+    return serverBuilder.build().start();
   }
 
   @Override
-  public Server create(BindableService service, ApiServiceDescriptor 
serviceDescriptor)
+  public Server create(List<BindableService> services, ApiServiceDescriptor 
serviceDescriptor)
       throws IOException {
-    return InProcessServerBuilder.forName(serviceDescriptor.getUrl())
-        .addService(
-            ServerInterceptors.intercept(service, 
GrpcContextHeaderAccessorProvider.interceptor()))
-        .build()
-        .start();
+    InProcessServerBuilder builder = 
InProcessServerBuilder.forName(serviceDescriptor.getUrl());
+    services
+        .stream()
+        .forEach(
+            service ->
+                builder.addService(
+                    ServerInterceptors.intercept(
+                        service, 
GrpcContextHeaderAccessorProvider.interceptor())));
+    return builder.build().start();
   }
 }
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
index 54a758c8614b..18818c6e8d57 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
@@ -20,58 +20,81 @@
 import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.net.HostAndPort;
+import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.util.List;
 import java.util.function.Supplier;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.sdk.fn.channel.SocketAddressFactory;
 import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.BindableService;
 import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.Server;
+import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.ServerBuilder;
 import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.ServerInterceptors;
 import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.netty.NettyServerBuilder;
+import 
org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.EpollEventLoopGroup;
+import 
org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.EpollServerDomainSocketChannel;
+import 
org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.EpollServerSocketChannel;
+import 
org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.unix.DomainSocketAddress;
+import 
org.apache.beam.vendor.grpc.v1_13_1.io.netty.util.internal.ThreadLocalRandom;
 
 /** A {@link Server gRPC server} factory. */
 public abstract class ServerFactory {
-  /** Create a default {@link ServerFactory}. */
+  /** Create a default {@link InetSocketAddressServerFactory}. */
   public static ServerFactory createDefault() {
     return new InetSocketAddressServerFactory(UrlFactory.createDefault());
   }
 
-  /** Create a {@link ServerFactory} that uses the given url factory. */
+  /** Create a {@link InetSocketAddressServerFactory} that uses the given url 
factory. */
   public static ServerFactory createWithUrlFactory(UrlFactory urlFactory) {
     return new InetSocketAddressServerFactory(urlFactory);
   }
 
-  /** Create a {@link ServerFactory} that uses ports from a supplier. */
+  /** Create a {@link InetSocketAddressServerFactory} that uses ports from a 
supplier. */
   public static ServerFactory createWithPortSupplier(Supplier<Integer> 
portSupplier) {
     return new InetSocketAddressServerFactory(UrlFactory.createDefault(), 
portSupplier);
   }
 
-  /** Create a {@link ServerFactory} that uses the given url factory and ports 
from a supplier. */
+  /**
+   * Create a {@link InetSocketAddressServerFactory} that uses the given url 
factory and ports from
+   * a supplier.
+   */
   public static ServerFactory createWithUrlFactoryAndPortSupplier(
       UrlFactory urlFactory, Supplier<Integer> portSupplier) {
     return new InetSocketAddressServerFactory(urlFactory, portSupplier);
   }
 
+  /** Create a {@link EpollSocket}. */
+  public static ServerFactory createEpollSocket() {
+    return new EpollSocket();
+  }
+
+  /** Create a {@link EpollDomainSocket}. */
+  public static ServerFactory createEpollDomainSocket() {
+    return new EpollDomainSocket();
+  }
+
   /**
-   * Creates an instance of this server using an ephemeral port chosen 
automatically. The chosen
-   * port is accessible to the caller from the URL set in the input {@link
-   * Endpoints.ApiServiceDescriptor.Builder}. Server applies {@link
+   * Creates an instance of this server using an ephemeral address. The 
allocation of the address is
+   * server type dependent, which means the address may be a port for certain 
type of server, or a
+   * file path for other certain types. The chosen address is accessible to 
the caller from the URL
+   * set in the input {@link Endpoints.ApiServiceDescriptor.Builder}. Server 
applies {@link
    * GrpcContextHeaderAccessorProvider#interceptor()} to all incoming requests.
    */
-  public abstract Server allocatePortAndCreate(
-      BindableService service, Endpoints.ApiServiceDescriptor.Builder builder) 
throws IOException;
+  public abstract Server allocateAddressAndCreate(
+      List<BindableService> services, Endpoints.ApiServiceDescriptor.Builder 
builder)
+      throws IOException;
 
   /**
-   * Creates an instance of this server at the address specified by the given 
service descriptor.
-   * Server applies {@link GrpcContextHeaderAccessorProvider#interceptor()} to 
all incoming
-   * requests.
+   * Creates an instance of this server at the address specified by the given 
service descriptor and
+   * bound to multiple services. Server applies {@link
+   * GrpcContextHeaderAccessorProvider#interceptor()} to all incoming requests.
    */
   public abstract Server create(
-      BindableService service, Endpoints.ApiServiceDescriptor 
serviceDescriptor) throws IOException;
-
+      List<BindableService> services, Endpoints.ApiServiceDescriptor 
serviceDescriptor)
+      throws IOException;
   /**
    * Creates a {@link Server gRPC Server} using the default server factory.
    *
@@ -91,18 +114,19 @@ private InetSocketAddressServerFactory(UrlFactory 
urlFactory, Supplier<Integer>
     }
 
     @Override
-    public Server allocatePortAndCreate(
-        BindableService service, Endpoints.ApiServiceDescriptor.Builder 
apiServiceDescriptor)
+    public Server allocateAddressAndCreate(
+        List<BindableService> services, Endpoints.ApiServiceDescriptor.Builder 
apiServiceDescriptor)
         throws IOException {
       InetSocketAddress address =
           new InetSocketAddress(InetAddress.getLoopbackAddress(), 
portSupplier.get());
-      Server server = createServer(service, address);
+      Server server = createServer(services, address);
       apiServiceDescriptor.setUrl(urlFactory.createUrl(address.getHostName(), 
server.getPort()));
       return server;
     }
 
     @Override
-    public Server create(BindableService service, 
Endpoints.ApiServiceDescriptor serviceDescriptor)
+    public Server create(
+        List<BindableService> services, Endpoints.ApiServiceDescriptor 
serviceDescriptor)
         throws IOException {
       SocketAddress socketAddress = 
SocketAddressFactory.createFrom(serviceDescriptor.getUrl());
       checkArgument(
@@ -111,24 +135,128 @@ public Server create(BindableService service, 
Endpoints.ApiServiceDescriptor ser
           getClass().getSimpleName(),
           ServerFactory.class.getSimpleName(),
           serviceDescriptor.getUrl());
-      return createServer(service, (InetSocketAddress) socketAddress);
+      return createServer(services, (InetSocketAddress) socketAddress);
     }
 
-    private static Server createServer(BindableService service, 
InetSocketAddress socket)
+    private static Server createServer(List<BindableService> services, 
InetSocketAddress socket)
         throws IOException {
-      // Note: Every ServerFactory should apply 
GrpcContextHeaderAccessorProvider to the service.
-      Server server =
+      NettyServerBuilder builder =
           NettyServerBuilder.forPort(socket.getPort())
-              .addService(
-                  ServerInterceptors.intercept(
-                      service, 
GrpcContextHeaderAccessorProvider.interceptor()))
               // Set the message size to max value here. The actual size is 
governed by the
               // buffer size in the layers above.
-              .maxMessageSize(Integer.MAX_VALUE)
-              .build();
-      server.start();
+              .maxMessageSize(Integer.MAX_VALUE);
+      services
+          .stream()
+          .forEach(
+              service ->
+                  builder.addService(
+                      ServerInterceptors.intercept(
+                          service, 
GrpcContextHeaderAccessorProvider.interceptor())));
+      return builder.build().start();
+    }
+  }
+
+  /**
+   * Creates a {@link Server gRPC Server} using a Unix domain socket. Note 
that this requires <a
+   * href="http://netty.io/wiki/forked-tomcat-native.html";>Netty TcNative</a> 
available to be able
+   * to provide a {@link EpollServerDomainSocketChannel}.
+   *
+   * <p>The unix domain socket is located at 
${java.io.tmpdir}/fnapi${random[0-10000)}.sock
+   */
+  private static class EpollDomainSocket extends ServerFactory {
+    private static File chooseRandomTmpFile(int port) {
+      return new File(System.getProperty("java.io.tmpdir"), 
String.format("fnapi%d.sock", port));
+    }
+
+    @Override
+    public Server allocateAddressAndCreate(
+        List<BindableService> services, Endpoints.ApiServiceDescriptor.Builder 
apiServiceDescriptor)
+        throws IOException {
+      File tmp;
+      do {
+        tmp = chooseRandomTmpFile(ThreadLocalRandom.current().nextInt(10000));
+      } while (tmp.exists());
+      apiServiceDescriptor.setUrl("unix://" + tmp.getAbsolutePath());
+      return create(services, apiServiceDescriptor.build());
+    }
+
+    @Override
+    public Server create(
+        List<BindableService> services, Endpoints.ApiServiceDescriptor 
serviceDescriptor)
+        throws IOException {
+      SocketAddress socketAddress = 
SocketAddressFactory.createFrom(serviceDescriptor.getUrl());
+      checkArgument(
+          socketAddress instanceof DomainSocketAddress,
+          "%s requires a Unix domain socket address, got %s",
+          EpollDomainSocket.class.getSimpleName(),
+          serviceDescriptor.getUrl());
+      return createServer(services, (DomainSocketAddress) socketAddress);
+    }
+
+    private static Server createServer(
+        List<BindableService> services, DomainSocketAddress domainSocket) 
throws IOException {
+      NettyServerBuilder builder =
+          NettyServerBuilder.forAddress(domainSocket)
+              .channelType(EpollServerDomainSocketChannel.class)
+              .workerEventLoopGroup(new EpollEventLoopGroup())
+              .bossEventLoopGroup(new EpollEventLoopGroup())
+              .maxMessageSize(Integer.MAX_VALUE);
+      for (BindableService service : services) {
+        // Wrap the service to extract headers
+        builder.addService(
+            ServerInterceptors.intercept(service, 
GrpcContextHeaderAccessorProvider.interceptor()));
+      }
+      return builder.build().start();
+    }
+  }
+
+  /**
+   * Creates a {@link Server gRPC Server} using an Epoll socket. Note that 
this requires <a
+   * href="http://netty.io/wiki/forked-tomcat-native.html";>Netty TcNative</a> 
available to be able
+   * to provide a {@link EpollServerSocketChannel}.
+   *
+   * <p>The server is created listening any open port on "localhost".
+   */
+  private static class EpollSocket extends ServerFactory {
+    @Override
+    public Server allocateAddressAndCreate(
+        List<BindableService> services, Endpoints.ApiServiceDescriptor.Builder 
apiServiceDescriptor)
+        throws IOException {
+      InetSocketAddress address = new 
InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
+      Server server = createServer(services, address);
+      apiServiceDescriptor.setUrl(
+          HostAndPort.fromParts(address.getHostName(), 
server.getPort()).toString());
       return server;
     }
+
+    @Override
+    public Server create(
+        List<BindableService> services, Endpoints.ApiServiceDescriptor 
serviceDescriptor)
+        throws IOException {
+      SocketAddress socketAddress = 
SocketAddressFactory.createFrom(serviceDescriptor.getUrl());
+      checkArgument(
+          socketAddress instanceof InetSocketAddress,
+          "%s requires a host:port socket address, got %s",
+          EpollSocket.class.getSimpleName(),
+          serviceDescriptor.getUrl());
+      return createServer(services, (InetSocketAddress) socketAddress);
+    }
+
+    private static Server createServer(List<BindableService> services, 
InetSocketAddress socket)
+        throws IOException {
+      ServerBuilder builder =
+          NettyServerBuilder.forAddress(socket)
+              .channelType(EpollServerSocketChannel.class)
+              .workerEventLoopGroup(new EpollEventLoopGroup())
+              .bossEventLoopGroup(new EpollEventLoopGroup())
+              .maxMessageSize(Integer.MAX_VALUE);
+      for (BindableService service : services) {
+        // Wrap the service to extract headers
+        builder.addService(
+            ServerInterceptors.intercept(service, 
GrpcContextHeaderAccessorProvider.interceptor()));
+      }
+      return builder.build().start();
+    }
   }
 
   /**
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/GrpcContextHeaderAccessorProviderTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/GrpcContextHeaderAccessorProviderTest.java
index ece8e838c315..8d146b78293e 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/GrpcContextHeaderAccessorProviderTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/GrpcContextHeaderAccessorProviderTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.fnexecution;
 
+import com.google.common.collect.ImmutableList;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
@@ -54,7 +55,8 @@ public void testWorkerIdOnConnect() throws Exception {
     TestDataService testService = new 
TestDataService(Mockito.mock(StreamObserver.class), consumer);
     ApiServiceDescriptor serviceDescriptor =
         ApiServiceDescriptor.newBuilder().setUrl("testServer").build();
-    Server server = InProcessServerFactory.create().create(testService, 
serviceDescriptor);
+    Server server =
+        InProcessServerFactory.create().create(ImmutableList.of(testService), 
serviceDescriptor);
     final Metadata.Key<String> workerIdKey =
         Metadata.Key.of("worker_id", Metadata.ASCII_STRING_MARSHALLER);
     Channel channel =
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
index 2942faa817d6..58d7e1d069bc 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
@@ -24,11 +24,15 @@
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.startsWith;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assume.assumeTrue;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.net.HostAndPort;
 import com.google.common.util.concurrent.Uninterruptibles;
+import java.net.Inet4Address;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -45,6 +49,7 @@
 import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.Server;
 import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.stub.CallStreamObserver;
 import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.Epoll;
 import org.junit.Test;
 
 /** Tests for {@link ServerFactory}. */
@@ -79,7 +84,8 @@ public void usesUrlFactory() throws Exception {
         TestStreams.withOnNext((Elements unused) -> {}).withOnCompleted(() -> 
{}).build();
     TestDataService service = new TestDataService(observer);
     ApiServiceDescriptor.Builder descriptorBuilder = 
ApiServiceDescriptor.newBuilder();
-    Server server = serverFactory.allocatePortAndCreate(service, 
descriptorBuilder);
+    Server server =
+        serverFactory.allocateAddressAndCreate(ImmutableList.of(service), 
descriptorBuilder);
     // Immediately terminate server. We don't actually use it here.
     server.shutdown();
     assertThat(descriptorBuilder.getUrl(), is("foo"));
@@ -111,7 +117,7 @@ public void urlFactoryWithPortSupplier() throws Exception {
     ApiServiceDescriptor.Builder descriptorBuilder = 
ApiServiceDescriptor.newBuilder();
     Server server = null;
     try {
-      server = serverFactory.allocatePortAndCreate(service, descriptorBuilder);
+      server = 
serverFactory.allocateAddressAndCreate(ImmutableList.of(service), 
descriptorBuilder);
       assertThat(descriptorBuilder.getUrl(), is("foo:65535"));
     } finally {
       if (server != null) {
@@ -120,6 +126,32 @@ public void urlFactoryWithPortSupplier() throws Exception {
     }
   }
 
+  @Test
+  public void testCreatingEpollServer() throws Exception {
+    assumeTrue(Epoll.isAvailable());
+    // tcnative only supports the ipv4 address family
+    assumeTrue(InetAddress.getLoopbackAddress() instanceof Inet4Address);
+    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+        runTestUsing(ServerFactory.createEpollSocket(), 
ManagedChannelFactory.createEpoll());
+    HostAndPort hostAndPort = 
HostAndPort.fromString(apiServiceDescriptor.getUrl());
+    assertThat(
+        hostAndPort.getHost(),
+        anyOf(
+            equalTo(InetAddress.getLoopbackAddress().getHostName()),
+            equalTo(InetAddress.getLoopbackAddress().getHostAddress())));
+    assertThat(hostAndPort.getPort(), allOf(greaterThan(0), lessThan(65536)));
+  }
+
+  @Test
+  public void testCreatingUnixDomainSocketServer() throws Exception {
+    assumeTrue(Epoll.isAvailable());
+    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+        runTestUsing(ServerFactory.createEpollDomainSocket(), 
ManagedChannelFactory.createEpoll());
+    assertThat(
+        apiServiceDescriptor.getUrl(),
+        startsWith("unix://" + System.getProperty("java.io.tmpdir")));
+  }
+
   private Endpoints.ApiServiceDescriptor runTestUsing(
       ServerFactory serverFactory, ManagedChannelFactory channelFactory) 
throws Exception {
     Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptorBuilder =
@@ -132,7 +164,9 @@ public void urlFactoryWithPortSupplier() throws Exception {
             .withOnCompleted(clientHangedUp::countDown)
             .build();
     TestDataService service = new TestDataService(serverInboundObserver);
-    Server server = serverFactory.allocatePortAndCreate(service, 
apiServiceDescriptorBuilder);
+    Server server =
+        serverFactory.allocateAddressAndCreate(
+            ImmutableList.of(service), apiServiceDescriptorBuilder);
     assertFalse(server.isShutdown());
 
     ManagedChannel channel = 
channelFactory.forDescriptor(apiServiceDescriptorBuilder.build());


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 171738)
    Time Spent: 2h 40m  (was: 2.5h)

> Simplify service and server creation in dataflow runner harness
> ---------------------------------------------------------------
>
>                 Key: BEAM-6160
>                 URL: https://issues.apache.org/jira/browse/BEAM-6160
>             Project: Beam
>          Issue Type: Sub-task
>          Components: runner-dataflow
>            Reporter: Boyuan Zhang
>            Assignee: Boyuan Zhang
>            Priority: Major
>          Time Spent: 2h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to