Higher-level SdkHarnessClient This adds a fairly thin wrapper on FnApiSdkHarnessClient, encapsulating the fact that all request and response types are injected into a disjoint union and sent over the same low-level RPC.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/44a7a88f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/44a7a88f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/44a7a88f Branch: refs/heads/master Commit: 44a7a88f03dd11981b1253458f032ebad80bf788 Parents: a1dc9d3 Author: Kenneth Knowles <k...@google.com> Authored: Wed Jul 26 07:45:37 2017 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Sun Sep 3 19:42:56 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/core/SdkHarnessClient.java | 172 +++++++++++++++++++ .../beam/runners/core/SdkHarnessClientTest.java | 96 +++++++++++ 2 files changed, 268 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/44a7a88f/runners/core-java/src/main/java/org/apache/beam/runners/core/SdkHarnessClient.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SdkHarnessClient.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SdkHarnessClient.java new file mode 100644 index 0000000..655ce0a --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SdkHarnessClient.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import com.google.auto.value.AutoValue; +import com.google.common.base.Function; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import java.io.IOException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.beam.fn.v1.BeamFnApi; + +/** + * A high-level client for an SDK harness. + * + * <p>This provides a Java-friendly wrapper around {@link FnApiControlClient} and {@link + * FnDataReceiver}, which handle lower-level gRPC message wrangling. + */ +public class SdkHarnessClient { + + /** + * A supply of unique identifiers, used internally. These must be unique across all Fn API + * clients. + */ + public interface IdGenerator { + String getId(); + } + + /** A supply of unique identifiers that are simply incrementing longs. */ + private static class CountingIdGenerator implements IdGenerator { + private final AtomicLong nextId = new AtomicLong(0L); + + @Override + public String getId() { + return String.valueOf(nextId.incrementAndGet()); + } + } + + /** + * An active bundle for a particular {@link + * org.apache.beam.fn.v1.BeamFnApi.ProcessBundleDescriptor}. + */ + @AutoValue + public abstract static class ActiveBundle<InputT> { + public abstract String getBundleId(); + + public abstract Future<BeamFnApi.ProcessBundleResponse> getBundleResponse(); + + public abstract FnDataReceiver<InputT> getInputReceiver(); + + public static <InputT> ActiveBundle<InputT> create( + String bundleId, + Future<BeamFnApi.ProcessBundleResponse> response, + FnDataReceiver<InputT> dataReceiver) { + return new AutoValue_SdkHarnessClient_ActiveBundle(bundleId, response, dataReceiver); + } + } + + private final IdGenerator idGenerator; + private final FnApiControlClient fnApiControlClient; + + private SdkHarnessClient( + FnApiControlClient fnApiControlClient, + IdGenerator idGenerator) { + this.idGenerator = idGenerator; + this.fnApiControlClient = fnApiControlClient; + } + + /** + * Creates a client for a particular SDK harness. It is the responsibility of the caller to ensure + * that these correspond to the same SDK harness, so control plane and data plane messages can be + * correctly associated. + */ + public static SdkHarnessClient usingFnApiClient(FnApiControlClient fnApiControlClient) { + return new SdkHarnessClient(fnApiControlClient, new CountingIdGenerator()); + } + + public SdkHarnessClient withIdGenerator(IdGenerator idGenerator) { + return new SdkHarnessClient(fnApiControlClient, idGenerator); + } + + /** + * Registers a {@link org.apache.beam.fn.v1.BeamFnApi.ProcessBundleDescriptor} for future + * processing. + * + * <p>A client may block on the result future, but may also proceed without blocking. + */ + public Future<BeamFnApi.RegisterResponse> register( + Iterable<BeamFnApi.ProcessBundleDescriptor> processBundleDescriptors) { + + // TODO: validate that all the necessary data endpoints are known + + ListenableFuture<BeamFnApi.InstructionResponse> genericResponse = + fnApiControlClient.handle( + BeamFnApi.InstructionRequest.newBuilder() + .setInstructionId(idGenerator.getId()) + .setRegister( + BeamFnApi.RegisterRequest.newBuilder() + .addAllProcessBundleDescriptor(processBundleDescriptors) + .build()) + .build()); + + return Futures.transform( + genericResponse, + new Function<BeamFnApi.InstructionResponse, BeamFnApi.RegisterResponse>() { + @Override + public BeamFnApi.RegisterResponse apply(BeamFnApi.InstructionResponse input) { + return input.getRegister(); + } + }); + } + + /** + * Start a new bundle for the given {@link + * org.apache.beam.fn.v1.BeamFnApi.ProcessBundleDescriptor} identifier. + * + * <p>The input channels for the returned {@link ActiveBundle} are derived from the + * instructions in the {@link org.apache.beam.fn.v1.BeamFnApi.ProcessBundleDescriptor}. + */ + public ActiveBundle newBundle(String processBundleDescriptorId) { + String bundleId = idGenerator.getId(); + + // TODO: acquire an input receiver from appropriate FnDataService + FnDataReceiver dataReceiver = new FnDataReceiver() { + @Override + public void accept(Object input) throws Exception { + throw new UnsupportedOperationException("Placeholder FnDataReceiver cannot accept data."); + } + + @Override + public void close() throws IOException { + // noop + } + }; + + ListenableFuture<BeamFnApi.InstructionResponse> genericResponse = + fnApiControlClient.handle( + BeamFnApi.InstructionRequest.newBuilder() + .setProcessBundle( + BeamFnApi.ProcessBundleRequest.newBuilder() + .setProcessBundleDescriptorReference(processBundleDescriptorId)) + .build()); + + ListenableFuture<BeamFnApi.ProcessBundleResponse> specificResponse = + Futures.transform( + genericResponse, + new Function<BeamFnApi.InstructionResponse, BeamFnApi.ProcessBundleResponse>() { + @Override + public BeamFnApi.ProcessBundleResponse apply(BeamFnApi.InstructionResponse input) { + return input.getProcessBundle(); + } + }); + + return ActiveBundle.create(bundleId, specificResponse, dataReceiver); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/44a7a88f/runners/core-java/src/test/java/org/apache/beam/runners/core/SdkHarnessClientTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SdkHarnessClientTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SdkHarnessClientTest.java new file mode 100644 index 0000000..1bf8bbc --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SdkHarnessClientTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.SettableFuture; +import java.util.concurrent.Future; +import org.apache.beam.fn.v1.BeamFnApi; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** Unit tests for {@link SdkHarnessClient}. */ +@RunWith(JUnit4.class) +public class SdkHarnessClientTest { + + @Mock public FnApiControlClient fnApiControlClient; + + private SdkHarnessClient sdkHarnessClient; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + sdkHarnessClient = SdkHarnessClient.usingFnApiClient(fnApiControlClient); + } + + @Test + public void testRegisterDoesNotCrash() throws Exception { + String descriptorId1 = "descriptor1"; + String descriptorId2 = "descriptor2"; + + SettableFuture<BeamFnApi.InstructionResponse> registerResponseFuture = SettableFuture.create(); + when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class))) + .thenReturn(registerResponseFuture); + + Future<BeamFnApi.RegisterResponse> responseFuture = sdkHarnessClient.register( + ImmutableList.of( + BeamFnApi.ProcessBundleDescriptor.newBuilder().setId(descriptorId1).build(), + BeamFnApi.ProcessBundleDescriptor.newBuilder().setId(descriptorId2).build())); + + // Correlating the RegisterRequest and RegisterResponse is owned by the underlying + // FnApiControlClient. The SdkHarnessClient owns just wrapping the request and unwrapping + // the response. + // + // Currently there are no fields so there's nothing to check. This test is formulated + // to match the pattern it should have if/when the response is meaningful. + BeamFnApi.RegisterResponse response = BeamFnApi.RegisterResponse.getDefaultInstance(); + registerResponseFuture.set( + BeamFnApi.InstructionResponse.newBuilder().setRegister(response).build()); + responseFuture.get(); + } + + @Test + public void testNewBundleNoDataDoesNotCrash() throws Exception { + String descriptorId1 = "descriptor1"; + + SettableFuture<BeamFnApi.InstructionResponse> processBundleResponseFuture = + SettableFuture.create(); + when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class))) + .thenReturn(processBundleResponseFuture); + + SdkHarnessClient.ActiveBundle activeBundle = sdkHarnessClient.newBundle(descriptorId1); + + // Correlating the ProcessBundleRequest and ProcessBundleReponse is owned by the underlying + // FnApiControlClient. The SdkHarnessClient owns just wrapping the request and unwrapping + // the response. + // + // Currently there are no fields so there's nothing to check. This test is formulated + // to match the pattern it should have if/when the response is meaningful. + BeamFnApi.ProcessBundleResponse response = BeamFnApi.ProcessBundleResponse.getDefaultInstance(); + processBundleResponseFuture.set( + BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response).build()); + activeBundle.getBundleResponse().get(); + } +}