FnApiControlClient, de-inverts Fn API control plane

The Fn API control plane has an inverted client/server relationship in
order to support firewall rules where the runner is forbidden from
connecting out to the SDK harness. This Java helper provides an API with
the more conventional polarity. It also associates streamed gRPC
requests with responses to support simple future-based programming.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/06f4b0a4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/06f4b0a4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/06f4b0a4

Branch: refs/heads/master
Commit: 06f4b0a4ad7552d3676c80e868a887ac017c92ac
Parents: 46ee5a5
Author: Kenneth Knowles <k...@google.com>
Authored: Wed Jul 26 07:42:03 2017 -0700
Committer: Kenneth Knowles <k...@google.com>
Committed: Sun Sep 3 19:42:56 2017 -0700

----------------------------------------------------------------------
 runners/core-java/pom.xml                       |  27 +++-
 .../beam/runners/core/FnApiControlClient.java   | 148 +++++++++++++++++++
 .../runners/core/FnApiControlClientTest.java    | 139 +++++++++++++++++
 3 files changed, 307 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/06f4b0a4/runners/core-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml
index 8c8e599..4097d2d 100644
--- a/runners/core-java/pom.xml
+++ b/runners/core-java/pom.xml
@@ -69,6 +69,11 @@
 
     <dependency>
       <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-common-fn-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
       <artifactId>beam-runners-core-construction-java</artifactId>
     </dependency>
 
@@ -91,10 +96,25 @@
     </dependency>
 
     <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-stub</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>joda-time</groupId>
       <artifactId>joda-time</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
     <!-- test dependencies -->
 
     <!-- Utilities such as WindowMatchers -->
@@ -135,12 +155,5 @@
       <artifactId>jackson-dataformat-yaml</artifactId>
       <scope>test</scope>
     </dependency>
-
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-sdks-common-fn-api</artifactId>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/beam/blob/06f4b0a4/runners/core-java/src/main/java/org/apache/beam/runners/core/FnApiControlClient.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/FnApiControlClient.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/FnApiControlClient.java
new file mode 100644
index 0000000..4b72bfc
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/FnApiControlClient.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.io.Closeable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A client for the control plane of an SDK harness, which can issue requests 
to it over the Fn API.
+ *
+ * <p>This class presents a low-level Java API de-inverting the Fn API's gRPC 
layer.
+ *
+ * <p>The Fn API is inverted so the runner is the server and the SDK harness 
is the client, for
+ * firewalling reasons (the runner may execute in a more privileged 
environment forbidding outbound
+ * connections).
+ *
+ * <p>This low-level client is responsible only for correlating requests with 
responses.
+ */
+class FnApiControlClient implements Closeable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FnApiControlClient.class);
+
+  // All writes to this StreamObserver need to be synchronized.
+  private final StreamObserver<BeamFnApi.InstructionRequest> requestReceiver;
+  private final ResponseStreamObserver responseObserver = new 
ResponseStreamObserver();
+  private final Map<String, SettableFuture<BeamFnApi.InstructionResponse>> 
outstandingRequests;
+  private volatile boolean isClosed;
+
+  private FnApiControlClient(StreamObserver<BeamFnApi.InstructionRequest> 
requestReceiver) {
+    this.requestReceiver = requestReceiver;
+    this.outstandingRequests = new ConcurrentHashMap<>();
+  }
+
+  /**
+   * Returns a {@link FnApiControlClient} which will submit its requests to 
the provided
+   * observer.
+   *
+   * <p>It is the responsibility of the caller to register this object as an 
observer of incoming
+   * responses (this will generally be done as part of fulfilling the contract 
of a gRPC service).
+   */
+  public static FnApiControlClient forRequestObserver(
+      StreamObserver<BeamFnApi.InstructionRequest> requestObserver) {
+    return new FnApiControlClient(requestObserver);
+  }
+
+  public synchronized ListenableFuture<BeamFnApi.InstructionResponse> handle(
+      BeamFnApi.InstructionRequest request) {
+    LOG.debug("Sending InstructionRequest {}", request);
+    SettableFuture<BeamFnApi.InstructionResponse> resultFuture = 
SettableFuture.create();
+    outstandingRequests.put(request.getInstructionId(), resultFuture);
+    requestReceiver.onNext(request);
+    return resultFuture;
+  }
+
+  StreamObserver<BeamFnApi.InstructionResponse> asResponseObserver() {
+    return responseObserver;
+  }
+
+  @Override
+  public void close() {
+    closeAndTerminateOutstandingRequests(new IllegalStateException("Runner 
closed connection"));
+  }
+
+  /** Closes this client and terminates any outstanding requests 
exceptionally. */
+  private synchronized void closeAndTerminateOutstandingRequests(Throwable 
cause) {
+    if (isClosed) {
+      return;
+    }
+
+    // Make a copy of the map to make the view of the outstanding requests 
consistent.
+    Map<String, SettableFuture<BeamFnApi.InstructionResponse>> 
outstandingRequestsCopy =
+        new ConcurrentHashMap<>(outstandingRequests);
+    outstandingRequests.clear();
+    isClosed = true;
+
+    if (outstandingRequestsCopy.isEmpty()) {
+      requestReceiver.onCompleted();
+      return;
+    }
+    requestReceiver.onError(
+        new 
StatusRuntimeException(Status.CANCELLED.withDescription(cause.getMessage())));
+
+    LOG.error(
+        "{} closed, clearing outstanding requests {}",
+        FnApiControlClient.class.getSimpleName(),
+        outstandingRequestsCopy);
+    for (SettableFuture<BeamFnApi.InstructionResponse> outstandingRequest :
+        outstandingRequestsCopy.values()) {
+      outstandingRequest.setException(cause);
+    }
+  }
+
+  /**
+   * A private view of this class as a {@link StreamObserver} for connecting 
as a gRPC listener.
+   */
+  private class ResponseStreamObserver implements 
StreamObserver<BeamFnApi.InstructionResponse> {
+    /**
+     * Processes an incoming {@link BeamFnApi.InstructionResponse} by 
correlating it with the
+     * corresponding {@link BeamFnApi.InstructionRequest} and completes the 
future that was returned
+     * by {@link #handle}.
+     */
+    @Override
+    public void onNext(BeamFnApi.InstructionResponse response) {
+      LOG.debug("Received InstructionResponse {}", response);
+      SettableFuture<BeamFnApi.InstructionResponse> completableFuture =
+          outstandingRequests.remove(response.getInstructionId());
+      if (completableFuture != null) {
+        completableFuture.set(response);
+      }
+    }
+
+    /** */
+    @Override
+    public void onCompleted() {
+      closeAndTerminateOutstandingRequests(
+          new IllegalStateException("SDK harness closed connection"));
+    }
+
+    @Override
+    public void onError(Throwable cause) {
+      LOG.error("{} received error {}", 
FnApiControlClient.class.getSimpleName(), cause);
+      closeAndTerminateOutstandingRequests(cause);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/06f4b0a4/runners/core-java/src/test/java/org/apache/beam/runners/core/FnApiControlClientTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/FnApiControlClientTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/FnApiControlClientTest.java
new file mode 100644
index 0000000..07b4784
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/FnApiControlClientTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.isA;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Unit tests for {@link FnApiControlClient}. */
+@RunWith(JUnit4.class)
+public class FnApiControlClientTest {
+
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Mock public StreamObserver<BeamFnApi.InstructionRequest> mockObserver;
+  private FnApiControlClient client;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    client = FnApiControlClient.forRequestObserver(mockObserver);
+  }
+
+  @Test
+  public void testRequestSent() {
+    String id = "instructionId";
+    
client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
+
+    verify(mockObserver).onNext(any(BeamFnApi.InstructionRequest.class));
+  }
+
+  @Test
+  public void testRequestSuccess() throws Exception {
+    String id = "successfulInstruction";
+
+    Future<BeamFnApi.InstructionResponse> responseFuture =
+        
client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
+    client
+        .asResponseObserver()
+        
.onNext(BeamFnApi.InstructionResponse.newBuilder().setInstructionId(id).build());
+
+    BeamFnApi.InstructionResponse response = responseFuture.get();
+
+    assertThat(response.getInstructionId(), equalTo(id));
+  }
+
+  @Test
+  public void testUnknownResponseIgnored() throws Exception {
+    String id = "actualInstruction";
+    String unknownId = "unknownInstruction";
+
+    ListenableFuture<BeamFnApi.InstructionResponse> responseFuture =
+        
client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
+
+    client
+        .asResponseObserver()
+        
.onNext(BeamFnApi.InstructionResponse.newBuilder().setInstructionId(unknownId).build());
+
+    assertThat(responseFuture.isDone(), is(false));
+    assertThat(responseFuture.isCancelled(), is(false));
+  }
+
+  @Test
+  public void testOnCompletedCancelsOutstanding() throws Exception {
+    String id = "clientHangUpInstruction";
+
+    Future<BeamFnApi.InstructionResponse> responseFuture =
+        
client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
+
+    client.asResponseObserver().onCompleted();
+
+    thrown.expect(ExecutionException.class);
+    thrown.expectCause(isA(IllegalStateException.class));
+    thrown.expectMessage("closed");
+    responseFuture.get();
+  }
+
+  @Test
+  public void testOnErrorCancelsOutstanding() throws Exception {
+    String id = "errorInstruction";
+
+    Future<BeamFnApi.InstructionResponse> responseFuture =
+        
client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
+
+    class FrazzleException extends Exception {}
+    client.asResponseObserver().onError(new FrazzleException());
+
+    thrown.expect(ExecutionException.class);
+    thrown.expectCause(isA(FrazzleException.class));
+    responseFuture.get();
+  }
+
+  @Test
+  public void testCloseCancelsOutstanding() throws Exception {
+    String id = "serverCloseInstruction";
+
+    Future<BeamFnApi.InstructionResponse> responseFuture =
+        
client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
+
+    client.close();
+
+    thrown.expect(ExecutionException.class);
+    thrown.expectCause(isA(IllegalStateException.class));
+    thrown.expectMessage("closed");
+    responseFuture.get();
+  }
+}

Reply via email to