Service for pooling incoming Fn API control plane connections

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a1dc9d32
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a1dc9d32
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a1dc9d32

Branch: refs/heads/master
Commit: a1dc9d3272710094664192f5fa0dad051232f09c
Parents: 06f4b0a
Author: Kenneth Knowles <k...@google.com>
Authored: Wed Jul 26 07:44:41 2017 -0700
Committer: Kenneth Knowles <k...@google.com>
Committed: Sun Sep 3 19:42:56 2017 -0700

----------------------------------------------------------------------
 .../core/FnApiControlClientPoolService.java     | 66 ++++++++++++++++++++
 .../core/FnApiControlClientPoolServiceTest.java | 65 +++++++++++++++++++
 2 files changed, 131 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a1dc9d32/runners/core-java/src/main/java/org/apache/beam/runners/core/FnApiControlClientPoolService.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/FnApiControlClientPoolService.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/FnApiControlClientPoolService.java
new file mode 100644
index 0000000..e05a03d
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/FnApiControlClientPoolService.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.BlockingQueue;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.fn.v1.BeamFnControlGrpc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A Fn API control service which adds incoming SDK harness connections to a 
pool. */
+public class FnApiControlClientPoolService extends 
BeamFnControlGrpc.BeamFnControlImplBase {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FnApiControlClientPoolService.class);
+
+  private final BlockingQueue<FnApiControlClient> clientPool;
+
+  private FnApiControlClientPoolService(BlockingQueue<FnApiControlClient> 
clientPool) {
+    this.clientPool = clientPool;
+  }
+
+  /**
+   * Creates a new {@link FnApiControlClientPoolService} which will enqueue 
and vend new SDK harness
+   * connections.
+   */
+  public static FnApiControlClientPoolService offeringClientsToPool(
+      BlockingQueue<FnApiControlClient> clientPool) {
+    return new FnApiControlClientPoolService(clientPool);
+  }
+
+  /**
+   * Called by gRPC for each incoming connection from an SDK harness, and 
enqueue an available SDK
+   * harness client.
+   *
+   * <p>Note: currently does not distinguish what sort of SDK it is, so a 
separate instance is
+   * required for each.
+   */
+  @Override
+  public StreamObserver<BeamFnApi.InstructionResponse> control(
+      StreamObserver<BeamFnApi.InstructionRequest> requestObserver) {
+    LOGGER.info("Beam Fn Control client connected.");
+    FnApiControlClient newClient = 
FnApiControlClient.forRequestObserver(requestObserver);
+    try {
+      clientPool.put(newClient);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    }
+    return newClient.asResponseObserver();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a1dc9d32/runners/core-java/src/test/java/org/apache/beam/runners/core/FnApiControlClientPoolServiceTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/FnApiControlClientPoolServiceTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/FnApiControlClientPoolServiceTest.java
new file mode 100644
index 0000000..fe63c9d
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/FnApiControlClientPoolServiceTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link FnApiControlClientPoolService}. */
+@RunWith(JUnit4.class)
+public class FnApiControlClientPoolServiceTest {
+
+  // For ease of straight-line testing, we use a LinkedBlockingQueue; in 
practice a SynchronousQueue
+  // for matching incoming connections and server threads is likely.
+  private final BlockingQueue<FnApiControlClient> pool = new 
LinkedBlockingQueue<>();
+  private FnApiControlClientPoolService controlService =
+      FnApiControlClientPoolService.offeringClientsToPool(pool);
+
+  @Test
+  public void testIncomingConnection() throws Exception {
+    StreamObserver<BeamFnApi.InstructionRequest> requestObserver = 
mock(StreamObserver.class);
+    StreamObserver<BeamFnApi.InstructionResponse> responseObserver =
+        controlService.control(requestObserver);
+
+    FnApiControlClient client = pool.take();
+
+    // Check that the client is wired up to the request channel
+    String id = "fakeInstruction";
+    ListenableFuture<BeamFnApi.InstructionResponse> responseFuture =
+        
client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
+    verify(requestObserver).onNext(any(BeamFnApi.InstructionRequest.class));
+    assertThat(responseFuture.isDone(), is(false));
+
+    // Check that the response channel really came from the client
+    responseObserver.onNext(
+        
BeamFnApi.InstructionResponse.newBuilder().setInstructionId(id).build());
+    responseFuture.get();
+  }
+}

Reply via email to