Service for pooling incoming Fn API control plane connections
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a1dc9d32 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a1dc9d32 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a1dc9d32 Branch: refs/heads/master Commit: a1dc9d3272710094664192f5fa0dad051232f09c Parents: 06f4b0a Author: Kenneth Knowles <k...@google.com> Authored: Wed Jul 26 07:44:41 2017 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Sun Sep 3 19:42:56 2017 -0700 ---------------------------------------------------------------------- .../core/FnApiControlClientPoolService.java | 66 ++++++++++++++++++++ .../core/FnApiControlClientPoolServiceTest.java | 65 +++++++++++++++++++ 2 files changed, 131 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a1dc9d32/runners/core-java/src/main/java/org/apache/beam/runners/core/FnApiControlClientPoolService.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/FnApiControlClientPoolService.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/FnApiControlClientPoolService.java new file mode 100644 index 0000000..e05a03d --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/FnApiControlClientPoolService.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import io.grpc.stub.StreamObserver; +import java.util.concurrent.BlockingQueue; +import org.apache.beam.fn.v1.BeamFnApi; +import org.apache.beam.fn.v1.BeamFnControlGrpc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** A Fn API control service which adds incoming SDK harness connections to a pool. */ +public class FnApiControlClientPoolService extends BeamFnControlGrpc.BeamFnControlImplBase { + private static final Logger LOGGER = LoggerFactory.getLogger(FnApiControlClientPoolService.class); + + private final BlockingQueue<FnApiControlClient> clientPool; + + private FnApiControlClientPoolService(BlockingQueue<FnApiControlClient> clientPool) { + this.clientPool = clientPool; + } + + /** + * Creates a new {@link FnApiControlClientPoolService} which will enqueue and vend new SDK harness + * connections. + */ + public static FnApiControlClientPoolService offeringClientsToPool( + BlockingQueue<FnApiControlClient> clientPool) { + return new FnApiControlClientPoolService(clientPool); + } + + /** + * Called by gRPC for each incoming connection from an SDK harness, and enqueue an available SDK + * harness client. + * + * <p>Note: currently does not distinguish what sort of SDK it is, so a separate instance is + * required for each. + */ + @Override + public StreamObserver<BeamFnApi.InstructionResponse> control( + StreamObserver<BeamFnApi.InstructionRequest> requestObserver) { + LOGGER.info("Beam Fn Control client connected."); + FnApiControlClient newClient = FnApiControlClient.forRequestObserver(requestObserver); + try { + clientPool.put(newClient); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + return newClient.asResponseObserver(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/a1dc9d32/runners/core-java/src/test/java/org/apache/beam/runners/core/FnApiControlClientPoolServiceTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/FnApiControlClientPoolServiceTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/FnApiControlClientPoolServiceTest.java new file mode 100644 index 0000000..fe63c9d --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/FnApiControlClientPoolServiceTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import com.google.common.util.concurrent.ListenableFuture; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.beam.fn.v1.BeamFnApi; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link FnApiControlClientPoolService}. */ +@RunWith(JUnit4.class) +public class FnApiControlClientPoolServiceTest { + + // For ease of straight-line testing, we use a LinkedBlockingQueue; in practice a SynchronousQueue + // for matching incoming connections and server threads is likely. + private final BlockingQueue<FnApiControlClient> pool = new LinkedBlockingQueue<>(); + private FnApiControlClientPoolService controlService = + FnApiControlClientPoolService.offeringClientsToPool(pool); + + @Test + public void testIncomingConnection() throws Exception { + StreamObserver<BeamFnApi.InstructionRequest> requestObserver = mock(StreamObserver.class); + StreamObserver<BeamFnApi.InstructionResponse> responseObserver = + controlService.control(requestObserver); + + FnApiControlClient client = pool.take(); + + // Check that the client is wired up to the request channel + String id = "fakeInstruction"; + ListenableFuture<BeamFnApi.InstructionResponse> responseFuture = + client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build()); + verify(requestObserver).onNext(any(BeamFnApi.InstructionRequest.class)); + assertThat(responseFuture.isDone(), is(false)); + + // Check that the response channel really came from the client + responseObserver.onNext( + BeamFnApi.InstructionResponse.newBuilder().setInstructionId(id).build()); + responseFuture.get(); + } +}