FnApiControlClient, de-inverts Fn API control plane The Fn API control plane has an inverted client/server relationship in order to support firewall rules where the runner is forbidden from connecting out to the SDK harness. This Java helper provides an API with the more conventional polarity. It also associates streamed gRPC requests with responses to support simple future-based programming.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/06f4b0a4 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/06f4b0a4 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/06f4b0a4 Branch: refs/heads/master Commit: 06f4b0a4ad7552d3676c80e868a887ac017c92ac Parents: 46ee5a5 Author: Kenneth Knowles <k...@google.com> Authored: Wed Jul 26 07:42:03 2017 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Sun Sep 3 19:42:56 2017 -0700 ---------------------------------------------------------------------- runners/core-java/pom.xml | 27 +++- .../beam/runners/core/FnApiControlClient.java | 148 +++++++++++++++++++ .../runners/core/FnApiControlClientTest.java | 139 +++++++++++++++++ 3 files changed, 307 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/06f4b0a4/runners/core-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml index 8c8e599..4097d2d 100644 --- a/runners/core-java/pom.xml +++ b/runners/core-java/pom.xml @@ -69,6 +69,11 @@ <dependency> <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-common-fn-api</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> <artifactId>beam-runners-core-construction-java</artifactId> </dependency> @@ -91,10 +96,25 @@ </dependency> <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-core</artifactId> + </dependency> + + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-stub</artifactId> + </dependency> + + <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <!-- test dependencies --> <!-- Utilities such as WindowMatchers --> @@ -135,12 +155,5 @@ <artifactId>jackson-dataformat-yaml</artifactId> <scope>test</scope> </dependency> - - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-sdks-common-fn-api</artifactId> - <type>test-jar</type> - <scope>test</scope> - </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/beam/blob/06f4b0a4/runners/core-java/src/main/java/org/apache/beam/runners/core/FnApiControlClient.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/FnApiControlClient.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/FnApiControlClient.java new file mode 100644 index 0000000..4b72bfc --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/FnApiControlClient.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.io.Closeable; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.beam.fn.v1.BeamFnApi; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A client for the control plane of an SDK harness, which can issue requests to it over the Fn API. + * + * <p>This class presents a low-level Java API de-inverting the Fn API's gRPC layer. + * + * <p>The Fn API is inverted so the runner is the server and the SDK harness is the client, for + * firewalling reasons (the runner may execute in a more privileged environment forbidding outbound + * connections). + * + * <p>This low-level client is responsible only for correlating requests with responses. + */ +class FnApiControlClient implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(FnApiControlClient.class); + + // All writes to this StreamObserver need to be synchronized. + private final StreamObserver<BeamFnApi.InstructionRequest> requestReceiver; + private final ResponseStreamObserver responseObserver = new ResponseStreamObserver(); + private final Map<String, SettableFuture<BeamFnApi.InstructionResponse>> outstandingRequests; + private volatile boolean isClosed; + + private FnApiControlClient(StreamObserver<BeamFnApi.InstructionRequest> requestReceiver) { + this.requestReceiver = requestReceiver; + this.outstandingRequests = new ConcurrentHashMap<>(); + } + + /** + * Returns a {@link FnApiControlClient} which will submit its requests to the provided + * observer. + * + * <p>It is the responsibility of the caller to register this object as an observer of incoming + * responses (this will generally be done as part of fulfilling the contract of a gRPC service). + */ + public static FnApiControlClient forRequestObserver( + StreamObserver<BeamFnApi.InstructionRequest> requestObserver) { + return new FnApiControlClient(requestObserver); + } + + public synchronized ListenableFuture<BeamFnApi.InstructionResponse> handle( + BeamFnApi.InstructionRequest request) { + LOG.debug("Sending InstructionRequest {}", request); + SettableFuture<BeamFnApi.InstructionResponse> resultFuture = SettableFuture.create(); + outstandingRequests.put(request.getInstructionId(), resultFuture); + requestReceiver.onNext(request); + return resultFuture; + } + + StreamObserver<BeamFnApi.InstructionResponse> asResponseObserver() { + return responseObserver; + } + + @Override + public void close() { + closeAndTerminateOutstandingRequests(new IllegalStateException("Runner closed connection")); + } + + /** Closes this client and terminates any outstanding requests exceptionally. */ + private synchronized void closeAndTerminateOutstandingRequests(Throwable cause) { + if (isClosed) { + return; + } + + // Make a copy of the map to make the view of the outstanding requests consistent. + Map<String, SettableFuture<BeamFnApi.InstructionResponse>> outstandingRequestsCopy = + new ConcurrentHashMap<>(outstandingRequests); + outstandingRequests.clear(); + isClosed = true; + + if (outstandingRequestsCopy.isEmpty()) { + requestReceiver.onCompleted(); + return; + } + requestReceiver.onError( + new StatusRuntimeException(Status.CANCELLED.withDescription(cause.getMessage()))); + + LOG.error( + "{} closed, clearing outstanding requests {}", + FnApiControlClient.class.getSimpleName(), + outstandingRequestsCopy); + for (SettableFuture<BeamFnApi.InstructionResponse> outstandingRequest : + outstandingRequestsCopy.values()) { + outstandingRequest.setException(cause); + } + } + + /** + * A private view of this class as a {@link StreamObserver} for connecting as a gRPC listener. + */ + private class ResponseStreamObserver implements StreamObserver<BeamFnApi.InstructionResponse> { + /** + * Processes an incoming {@link BeamFnApi.InstructionResponse} by correlating it with the + * corresponding {@link BeamFnApi.InstructionRequest} and completes the future that was returned + * by {@link #handle}. + */ + @Override + public void onNext(BeamFnApi.InstructionResponse response) { + LOG.debug("Received InstructionResponse {}", response); + SettableFuture<BeamFnApi.InstructionResponse> completableFuture = + outstandingRequests.remove(response.getInstructionId()); + if (completableFuture != null) { + completableFuture.set(response); + } + } + + /** */ + @Override + public void onCompleted() { + closeAndTerminateOutstandingRequests( + new IllegalStateException("SDK harness closed connection")); + } + + @Override + public void onError(Throwable cause) { + LOG.error("{} received error {}", FnApiControlClient.class.getSimpleName(), cause); + closeAndTerminateOutstandingRequests(cause); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/06f4b0a4/runners/core-java/src/test/java/org/apache/beam/runners/core/FnApiControlClientTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/FnApiControlClientTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/FnApiControlClientTest.java new file mode 100644 index 0000000..07b4784 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/FnApiControlClientTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.isA; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.verify; + +import com.google.common.util.concurrent.ListenableFuture; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import org.apache.beam.fn.v1.BeamFnApi; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** Unit tests for {@link FnApiControlClient}. */ +@RunWith(JUnit4.class) +public class FnApiControlClientTest { + + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Mock public StreamObserver<BeamFnApi.InstructionRequest> mockObserver; + private FnApiControlClient client; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + client = FnApiControlClient.forRequestObserver(mockObserver); + } + + @Test + public void testRequestSent() { + String id = "instructionId"; + client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build()); + + verify(mockObserver).onNext(any(BeamFnApi.InstructionRequest.class)); + } + + @Test + public void testRequestSuccess() throws Exception { + String id = "successfulInstruction"; + + Future<BeamFnApi.InstructionResponse> responseFuture = + client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build()); + client + .asResponseObserver() + .onNext(BeamFnApi.InstructionResponse.newBuilder().setInstructionId(id).build()); + + BeamFnApi.InstructionResponse response = responseFuture.get(); + + assertThat(response.getInstructionId(), equalTo(id)); + } + + @Test + public void testUnknownResponseIgnored() throws Exception { + String id = "actualInstruction"; + String unknownId = "unknownInstruction"; + + ListenableFuture<BeamFnApi.InstructionResponse> responseFuture = + client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build()); + + client + .asResponseObserver() + .onNext(BeamFnApi.InstructionResponse.newBuilder().setInstructionId(unknownId).build()); + + assertThat(responseFuture.isDone(), is(false)); + assertThat(responseFuture.isCancelled(), is(false)); + } + + @Test + public void testOnCompletedCancelsOutstanding() throws Exception { + String id = "clientHangUpInstruction"; + + Future<BeamFnApi.InstructionResponse> responseFuture = + client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build()); + + client.asResponseObserver().onCompleted(); + + thrown.expect(ExecutionException.class); + thrown.expectCause(isA(IllegalStateException.class)); + thrown.expectMessage("closed"); + responseFuture.get(); + } + + @Test + public void testOnErrorCancelsOutstanding() throws Exception { + String id = "errorInstruction"; + + Future<BeamFnApi.InstructionResponse> responseFuture = + client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build()); + + class FrazzleException extends Exception {} + client.asResponseObserver().onError(new FrazzleException()); + + thrown.expect(ExecutionException.class); + thrown.expectCause(isA(FrazzleException.class)); + responseFuture.get(); + } + + @Test + public void testCloseCancelsOutstanding() throws Exception { + String id = "serverCloseInstruction"; + + Future<BeamFnApi.InstructionResponse> responseFuture = + client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build()); + + client.close(); + + thrown.expect(ExecutionException.class); + thrown.expectCause(isA(IllegalStateException.class)); + thrown.expectMessage("closed"); + responseFuture.get(); + } +}