Add A FnService, FnServer to runner Fn Execution A FnService is a closeable BindableService. A FnServer is exactly one FnService and a server which exposes it as an endpoint.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bd4146d4 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bd4146d4 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bd4146d4 Branch: refs/heads/tez-runner Commit: bd4146d4eebe744ba9453a245e32f397394f9366 Parents: 6e4781b Author: Thomas Groh <tg...@google.com> Authored: Mon Oct 30 17:36:37 2017 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Fri Nov 10 10:21:55 2017 -0800 ---------------------------------------------------------------------- .../beam/runners/fnexecution/FnService.java | 24 ++++++ .../beam/runners/fnexecution/GrpcFnServer.java | 88 ++++++++++++++++++++ 2 files changed, 112 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/bd4146d4/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/FnService.java ---------------------------------------------------------------------- diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/FnService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/FnService.java new file mode 100644 index 0000000..9ea0fce --- /dev/null +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/FnService.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution; + +import io.grpc.BindableService; + +/** An interface sharing common behavior with services used during execution of user Fns. */ +public interface FnService extends AutoCloseable, BindableService {} http://git-wip-us.apache.org/repos/asf/beam/blob/bd4146d4/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcFnServer.java ---------------------------------------------------------------------- diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcFnServer.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcFnServer.java new file mode 100644 index 0000000..9f3dd3d --- /dev/null +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcFnServer.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution; + +import io.grpc.Server; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; + +/** + * A {@link Server gRPC Server} which manages a single {@link FnService}. The lifetime of the + * service is bound to the {@link GrpcFnServer}. + */ +public class GrpcFnServer<ServiceT extends FnService> implements AutoCloseable { + /** + * Create a {@link GrpcFnServer} for the provided {@link FnService} running on an arbitrary + * port. + */ + public static <ServiceT extends FnService> GrpcFnServer<ServiceT> allocatePortAndCreateFor( + ServiceT service, ServerFactory factory) throws IOException { + ApiServiceDescriptor.Builder apiServiceDescriptor = ApiServiceDescriptor.newBuilder(); + Server server = factory.allocatePortAndCreate(service, apiServiceDescriptor); + return new GrpcFnServer<>(server, service, apiServiceDescriptor.build()); + } + + /** + * Create a {@link GrpcFnServer} for the provided {@link FnService} which will run at the + * endpoint specified in the {@link ApiServiceDescriptor}. + */ + public static <ServiceT extends FnService> GrpcFnServer<ServiceT> create( + ServiceT service, ApiServiceDescriptor endpoint, ServerFactory factory) throws IOException { + return new GrpcFnServer<>(factory.create(service, endpoint), service, endpoint); + } + + private final Server server; + private final ServiceT service; + private final ApiServiceDescriptor apiServiceDescriptor; + + private GrpcFnServer(Server server, ServiceT service, ApiServiceDescriptor apiServiceDescriptor) + throws IOException { + this.server = server; + this.service = service; + this.apiServiceDescriptor = apiServiceDescriptor; + server.start(); + } + + /** + * Get an {@link ApiServiceDescriptor} describing the endpoint this {@link GrpcFnServer} is bound + * to. + */ + public ApiServiceDescriptor getApiServiceDescriptor() { + return apiServiceDescriptor; + } + + /** Get the service exposed by this {@link GrpcFnServer}. */ + public ServiceT getService() { + return service; + } + + @Override + public void close() throws Exception { + try { + // The server has been closed, and should not respond to any new incoming calls. + server.shutdown(); + service.close(); + server.awaitTermination(60, TimeUnit.SECONDS); + } finally { + server.shutdownNow(); + server.awaitTermination(); + } + } +}