Higher-level SdkHarnessClient

This adds a fairly thin wrapper on FnApiSdkHarnessClient, encapsulating
the fact that all request and response types are injected into a
disjoint union and sent over the same low-level RPC.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/44a7a88f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/44a7a88f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/44a7a88f

Branch: refs/heads/master
Commit: 44a7a88f03dd11981b1253458f032ebad80bf788
Parents: a1dc9d3
Author: Kenneth Knowles <k...@google.com>
Authored: Wed Jul 26 07:45:37 2017 -0700
Committer: Kenneth Knowles <k...@google.com>
Committed: Sun Sep 3 19:42:56 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/SdkHarnessClient.java     | 172 +++++++++++++++++++
 .../beam/runners/core/SdkHarnessClientTest.java |  96 +++++++++++
 2 files changed, 268 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/44a7a88f/runners/core-java/src/main/java/org/apache/beam/runners/core/SdkHarnessClient.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SdkHarnessClient.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SdkHarnessClient.java
new file mode 100644
index 0000000..655ce0a
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SdkHarnessClient.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.io.IOException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.beam.fn.v1.BeamFnApi;
+
+/**
+ * A high-level client for an SDK harness.
+ *
+ * <p>This provides a Java-friendly wrapper around {@link FnApiControlClient} 
and {@link
+ * FnDataReceiver}, which handle lower-level gRPC message wrangling.
+ */
+public class SdkHarnessClient {
+
+  /**
+   * A supply of unique identifiers, used internally. These must be unique 
across all Fn API
+   * clients.
+   */
+  public interface IdGenerator {
+    String getId();
+  }
+
+  /** A supply of unique identifiers that are simply incrementing longs. */
+  private static class CountingIdGenerator implements IdGenerator {
+    private final AtomicLong nextId = new AtomicLong(0L);
+
+    @Override
+    public String getId() {
+      return String.valueOf(nextId.incrementAndGet());
+    }
+  }
+
+  /**
+   * An active bundle for a particular {@link
+   * org.apache.beam.fn.v1.BeamFnApi.ProcessBundleDescriptor}.
+   */
+  @AutoValue
+  public abstract static class ActiveBundle<InputT> {
+    public abstract String getBundleId();
+
+    public abstract Future<BeamFnApi.ProcessBundleResponse> 
getBundleResponse();
+
+    public abstract FnDataReceiver<InputT> getInputReceiver();
+
+    public static <InputT> ActiveBundle<InputT> create(
+        String bundleId,
+        Future<BeamFnApi.ProcessBundleResponse> response,
+        FnDataReceiver<InputT> dataReceiver) {
+      return new AutoValue_SdkHarnessClient_ActiveBundle(bundleId, response, 
dataReceiver);
+    }
+  }
+
+  private final IdGenerator idGenerator;
+  private final FnApiControlClient fnApiControlClient;
+
+  private SdkHarnessClient(
+      FnApiControlClient fnApiControlClient,
+      IdGenerator idGenerator) {
+    this.idGenerator = idGenerator;
+    this.fnApiControlClient = fnApiControlClient;
+  }
+
+  /**
+   * Creates a client for a particular SDK harness. It is the responsibility 
of the caller to ensure
+   * that these correspond to the same SDK harness, so control plane and data 
plane messages can be
+   * correctly associated.
+   */
+  public static SdkHarnessClient usingFnApiClient(FnApiControlClient 
fnApiControlClient) {
+    return new SdkHarnessClient(fnApiControlClient, new CountingIdGenerator());
+  }
+
+  public SdkHarnessClient withIdGenerator(IdGenerator idGenerator) {
+    return new SdkHarnessClient(fnApiControlClient, idGenerator);
+  }
+
+  /**
+   * Registers a {@link 
org.apache.beam.fn.v1.BeamFnApi.ProcessBundleDescriptor} for future
+   * processing.
+   *
+   * <p>A client may block on the result future, but may also proceed without 
blocking.
+   */
+  public Future<BeamFnApi.RegisterResponse> register(
+      Iterable<BeamFnApi.ProcessBundleDescriptor> processBundleDescriptors) {
+
+    // TODO: validate that all the necessary data endpoints are known
+
+    ListenableFuture<BeamFnApi.InstructionResponse> genericResponse =
+        fnApiControlClient.handle(
+            BeamFnApi.InstructionRequest.newBuilder()
+                .setInstructionId(idGenerator.getId())
+                .setRegister(
+                    BeamFnApi.RegisterRequest.newBuilder()
+                        
.addAllProcessBundleDescriptor(processBundleDescriptors)
+                        .build())
+                .build());
+
+    return Futures.transform(
+        genericResponse,
+        new Function<BeamFnApi.InstructionResponse, 
BeamFnApi.RegisterResponse>() {
+          @Override
+          public BeamFnApi.RegisterResponse 
apply(BeamFnApi.InstructionResponse input) {
+            return input.getRegister();
+          }
+        });
+  }
+
+  /**
+   * Start a new bundle for the given {@link
+   * org.apache.beam.fn.v1.BeamFnApi.ProcessBundleDescriptor} identifier.
+   *
+   * <p>The input channels for the returned {@link ActiveBundle} are derived 
from the
+   * instructions in the {@link 
org.apache.beam.fn.v1.BeamFnApi.ProcessBundleDescriptor}.
+   */
+  public ActiveBundle newBundle(String processBundleDescriptorId) {
+    String bundleId = idGenerator.getId();
+
+    // TODO: acquire an input receiver from appropriate FnDataService
+    FnDataReceiver dataReceiver = new FnDataReceiver() {
+      @Override
+      public void accept(Object input) throws Exception {
+        throw new UnsupportedOperationException("Placeholder FnDataReceiver 
cannot accept data.");
+      }
+
+      @Override
+      public void close() throws IOException {
+        // noop
+      }
+    };
+
+    ListenableFuture<BeamFnApi.InstructionResponse> genericResponse =
+        fnApiControlClient.handle(
+            BeamFnApi.InstructionRequest.newBuilder()
+                .setProcessBundle(
+                    BeamFnApi.ProcessBundleRequest.newBuilder()
+                        
.setProcessBundleDescriptorReference(processBundleDescriptorId))
+                .build());
+
+    ListenableFuture<BeamFnApi.ProcessBundleResponse> specificResponse =
+        Futures.transform(
+            genericResponse,
+            new Function<BeamFnApi.InstructionResponse, 
BeamFnApi.ProcessBundleResponse>() {
+              @Override
+              public BeamFnApi.ProcessBundleResponse 
apply(BeamFnApi.InstructionResponse input) {
+                return input.getProcessBundle();
+              }
+            });
+
+    return ActiveBundle.create(bundleId, specificResponse, dataReceiver);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/44a7a88f/runners/core-java/src/test/java/org/apache/beam/runners/core/SdkHarnessClientTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SdkHarnessClientTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SdkHarnessClientTest.java
new file mode 100644
index 0000000..1bf8bbc
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SdkHarnessClientTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.concurrent.Future;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Unit tests for {@link SdkHarnessClient}. */
+@RunWith(JUnit4.class)
+public class SdkHarnessClientTest {
+
+  @Mock public FnApiControlClient fnApiControlClient;
+
+  private SdkHarnessClient sdkHarnessClient;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    sdkHarnessClient = SdkHarnessClient.usingFnApiClient(fnApiControlClient);
+  }
+
+  @Test
+  public void testRegisterDoesNotCrash() throws Exception {
+    String descriptorId1 = "descriptor1";
+    String descriptorId2 = "descriptor2";
+
+    SettableFuture<BeamFnApi.InstructionResponse> registerResponseFuture = 
SettableFuture.create();
+    when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class)))
+        .thenReturn(registerResponseFuture);
+
+    Future<BeamFnApi.RegisterResponse> responseFuture = 
sdkHarnessClient.register(
+        ImmutableList.of(
+            
BeamFnApi.ProcessBundleDescriptor.newBuilder().setId(descriptorId1).build(),
+            
BeamFnApi.ProcessBundleDescriptor.newBuilder().setId(descriptorId2).build()));
+
+    // Correlating the RegisterRequest and RegisterResponse is owned by the 
underlying
+    // FnApiControlClient. The SdkHarnessClient owns just wrapping the request 
and unwrapping
+    // the response.
+    //
+    // Currently there are no fields so there's nothing to check. This test is 
formulated
+    // to match the pattern it should have if/when the response is meaningful.
+    BeamFnApi.RegisterResponse response = 
BeamFnApi.RegisterResponse.getDefaultInstance();
+    registerResponseFuture.set(
+        
BeamFnApi.InstructionResponse.newBuilder().setRegister(response).build());
+    responseFuture.get();
+  }
+
+  @Test
+  public void testNewBundleNoDataDoesNotCrash() throws Exception {
+    String descriptorId1 = "descriptor1";
+
+    SettableFuture<BeamFnApi.InstructionResponse> processBundleResponseFuture =
+        SettableFuture.create();
+    when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class)))
+        .thenReturn(processBundleResponseFuture);
+
+    SdkHarnessClient.ActiveBundle activeBundle = 
sdkHarnessClient.newBundle(descriptorId1);
+
+    // Correlating the ProcessBundleRequest and ProcessBundleReponse is owned 
by the underlying
+    // FnApiControlClient. The SdkHarnessClient owns just wrapping the request 
and unwrapping
+    // the response.
+    //
+    // Currently there are no fields so there's nothing to check. This test is 
formulated
+    // to match the pattern it should have if/when the response is meaningful.
+    BeamFnApi.ProcessBundleResponse response = 
BeamFnApi.ProcessBundleResponse.getDefaultInstance();
+    processBundleResponseFuture.set(
+        
BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response).build());
+    activeBundle.getBundleResponse().get();
+  }
+}

Reply via email to