Add A FnService, FnServer to runner Fn Execution

A FnService is a closeable BindableService. A FnServer is exactly one
FnService and a server which exposes it as an endpoint.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bd4146d4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bd4146d4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bd4146d4

Branch: refs/heads/tez-runner
Commit: bd4146d4eebe744ba9453a245e32f397394f9366
Parents: 6e4781b
Author: Thomas Groh <tg...@google.com>
Authored: Mon Oct 30 17:36:37 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Nov 10 10:21:55 2017 -0800

----------------------------------------------------------------------
 .../beam/runners/fnexecution/FnService.java     | 24 ++++++
 .../beam/runners/fnexecution/GrpcFnServer.java  | 88 ++++++++++++++++++++
 2 files changed, 112 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/bd4146d4/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/FnService.java
----------------------------------------------------------------------
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/FnService.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/FnService.java
new file mode 100644
index 0000000..9ea0fce
--- /dev/null
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/FnService.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution;
+
+import io.grpc.BindableService;
+
+/** An interface sharing common behavior with services used during execution 
of user Fns. */
+public interface FnService extends AutoCloseable, BindableService {}

http://git-wip-us.apache.org/repos/asf/beam/blob/bd4146d4/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcFnServer.java
----------------------------------------------------------------------
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcFnServer.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcFnServer.java
new file mode 100644
index 0000000..9f3dd3d
--- /dev/null
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcFnServer.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution;
+
+import io.grpc.Server;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+
+/**
+ * A {@link Server gRPC Server} which manages a single {@link FnService}. The 
lifetime of the
+ * service is bound to the {@link GrpcFnServer}.
+ */
+public class GrpcFnServer<ServiceT extends FnService> implements AutoCloseable 
{
+  /**
+   * Create a {@link GrpcFnServer} for the provided {@link FnService} running 
on an arbitrary
+   * port.
+   */
+  public static <ServiceT extends FnService> GrpcFnServer<ServiceT> 
allocatePortAndCreateFor(
+      ServiceT service, ServerFactory factory) throws IOException {
+    ApiServiceDescriptor.Builder apiServiceDescriptor = 
ApiServiceDescriptor.newBuilder();
+    Server server = factory.allocatePortAndCreate(service, 
apiServiceDescriptor);
+    return new GrpcFnServer<>(server, service, apiServiceDescriptor.build());
+  }
+
+  /**
+   * Create a {@link GrpcFnServer} for the provided {@link FnService} which 
will run at the
+   * endpoint specified in the {@link ApiServiceDescriptor}.
+   */
+  public static <ServiceT extends FnService> GrpcFnServer<ServiceT> create(
+      ServiceT service, ApiServiceDescriptor endpoint, ServerFactory factory) 
throws IOException {
+    return new GrpcFnServer<>(factory.create(service, endpoint), service, 
endpoint);
+  }
+
+  private final Server server;
+  private final ServiceT service;
+  private final ApiServiceDescriptor apiServiceDescriptor;
+
+  private GrpcFnServer(Server server, ServiceT service, ApiServiceDescriptor 
apiServiceDescriptor)
+      throws IOException {
+    this.server = server;
+    this.service = service;
+    this.apiServiceDescriptor = apiServiceDescriptor;
+    server.start();
+  }
+
+  /**
+   * Get an {@link ApiServiceDescriptor} describing the endpoint this {@link 
GrpcFnServer} is bound
+   * to.
+   */
+  public ApiServiceDescriptor getApiServiceDescriptor() {
+    return apiServiceDescriptor;
+  }
+
+  /** Get the service exposed by this {@link GrpcFnServer}. */
+  public ServiceT getService() {
+    return service;
+  }
+
+  @Override
+  public void close() throws Exception {
+    try {
+      // The server has been closed, and should not respond to any new 
incoming calls.
+      server.shutdown();
+      service.close();
+      server.awaitTermination(60, TimeUnit.SECONDS);
+    } finally {
+      server.shutdownNow();
+      server.awaitTermination();
+    }
+  }
+}

Reply via email to