Add SdkHarnessDoFnRunner

This encapsulates processing a bundle over the Fn API.

A Beam runner is responsible for:

 - Setting up a FnApiControlClientPoolService to listen for incoming
   FnApiSdkHarnessClient connections
 - Wrapping those connections in the higher-level SdkHarnessClient
 - Building the ProcessBundleDescriptor (instruction graph) to be executed
 - Establishing data plane endpoints referenced by the ProcessBundleDescriptor
 - Registering the data plane endpoints and ProcessBundleDescriptor with the
   SdkHarnessClient

This class is responsible for:

 - Registering each bundle with the SDK harness
 - Streaming the elements of each bundle to the SDK harness
   over the data plane


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0388de17
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0388de17
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0388de17

Branch: refs/heads/master
Commit: 0388de17acb83a9031b3934a577520d226f8f09d
Parents: 44a7a88
Author: Kenneth Knowles <k...@google.com>
Authored: Wed Jul 26 07:48:48 2017 -0700
Committer: Kenneth Knowles <k...@google.com>
Committed: Sun Sep 3 19:42:57 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/SdkHarnessDoFnRunner.java | 100 +++++++++++++++++++
 .../runners/core/SdkHarnessDoFnRunnerTest.java  |  73 ++++++++++++++
 2 files changed, 173 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0388de17/runners/core-java/src/main/java/org/apache/beam/runners/core/SdkHarnessDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SdkHarnessDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SdkHarnessDoFnRunner.java
new file mode 100644
index 0000000..27e784e
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SdkHarnessDoFnRunner.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
+
+/** Processes a bundle by sending it to an SDK harness over the Fn API. */
+public class SdkHarnessDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, OutputT> {
+
+  private final SdkHarnessClient sdkHarnessClient;
+  private final String processBundleDescriptorId;
+
+  /** {@code null} between bundles. */
+  @Nullable private SdkHarnessClient.ActiveBundle activeBundle;
+
+  private SdkHarnessDoFnRunner(
+      SdkHarnessClient sdkHarnessClient,
+      String processBundleDescriptorId) {
+    this.sdkHarnessClient = sdkHarnessClient;
+    this.processBundleDescriptorId = processBundleDescriptorId;
+  }
+
+  /**
+   * Returns a new {@link SdkHarnessDoFnRunner} suitable for just a particular 
{@link
+   * org.apache.beam.fn.v1.BeamFnApi.ProcessBundleDescriptor} (referenced by 
id here).
+   *
+   * <p>The {@link FnDataReceiver} must be the correct data plane service 
referenced
+   * in the primitive instructions in the
+   * {@link org.apache.beam.fn.v1.BeamFnApi.ProcessBundleDescriptor}.
+   *
+   * <p>Also outside of this class, the appropriate receivers must be 
registered with the
+   * output data plane channels of the descriptor.
+   */
+  public static <InputT, OutputT> SdkHarnessDoFnRunner<InputT, OutputT> create(
+      SdkHarnessClient sdkHarnessClient,
+      String processBundleDescriptorId) {
+    return new SdkHarnessDoFnRunner(sdkHarnessClient, 
processBundleDescriptorId);
+  }
+
+  @Override
+  public void startBundle() {
+    this.activeBundle =
+        sdkHarnessClient.newBundle(processBundleDescriptorId);
+  }
+
+  @Override
+  public void processElement(WindowedValue<InputT> elem) {
+    checkState(
+        activeBundle != null,
+        "%s attempted to process an element without an active bundle",
+        SdkHarnessDoFnRunner.class.getSimpleName());
+
+    try {
+      activeBundle.getInputReceiver().accept(elem);
+    } catch (Exception exc) {
+      throw new RuntimeException(exc);
+    }
+  }
+
+  @Override
+  public void onTimer(
+      String timerId, BoundedWindow window, Instant timestamp, TimeDomain 
timeDomain) {
+    throw new UnsupportedOperationException("Timers are not supported over the 
Fn API");
+  }
+
+  @Override
+  public void finishBundle() {
+    try {
+      activeBundle.getBundleResponse().get();
+    } catch (InterruptedException interrupted) {
+      Thread.interrupted();
+      return;
+    } catch (ExecutionException exc) {
+      throw UserCodeException.wrap(exc);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0388de17/runners/core-java/src/test/java/org/apache/beam/runners/core/SdkHarnessDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SdkHarnessDoFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SdkHarnessDoFnRunnerTest.java
new file mode 100644
index 0000000..68634f8
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SdkHarnessDoFnRunnerTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.when;
+
+import com.google.common.util.concurrent.SettableFuture;
+import java.io.IOException;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Unit tests for {@link SdkHarnessDoFnRunner}. */
+@RunWith(JUnit4.class)
+public class SdkHarnessDoFnRunnerTest {
+  @Mock private SdkHarnessClient mockClient;
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  @Test
+  public void testStartAndFinishBundleDoesNotCrash() {
+    String processBundleDescriptorId = "testDescriptor";
+    String bundleId = "testBundle";
+    SdkHarnessDoFnRunner<Void, Void> underTest =
+        SdkHarnessDoFnRunner.<Void, Void>create(mockClient, 
processBundleDescriptorId);
+
+    SettableFuture<BeamFnApi.ProcessBundleResponse> 
processBundleResponseFuture =
+        SettableFuture.create();
+    FnDataReceiver dummyInputReceiver = new FnDataReceiver() {
+      @Override
+      public void accept(Object input) throws Exception {
+        fail("Dummy input receiver should not have received data");
+      }
+
+      @Override
+      public void close() throws IOException {
+        // noop
+      }
+    };
+    SdkHarnessClient.ActiveBundle activeBundle =
+        SdkHarnessClient.ActiveBundle.create(
+            bundleId, processBundleResponseFuture, dummyInputReceiver);
+
+    when(mockClient.newBundle(anyString())).thenReturn(activeBundle);
+    underTest.startBundle();
+    
processBundleResponseFuture.set(BeamFnApi.ProcessBundleResponse.getDefaultInstance());
+    underTest.finishBundle();
+  }
+}

Reply via email to