Add SdkHarnessDoFnRunner This encapsulates processing a bundle over the Fn API.
A Beam runner is responsible for: - Setting up a FnApiControlClientPoolService to listen for incoming FnApiSdkHarnessClient connections - Wrapping those connections in the higher-level SdkHarnessClient - Building the ProcessBundleDescriptor (instruction graph) to be executed - Establishing data plane endpoints referenced by the ProcessBundleDescriptor - Registering the data plane endpoints and ProcessBundleDescriptor with the SdkHarnessClient This class is responsible for: - Registering each bundle with the SDK harness - Streaming the elements of each bundle to the SDK harness over the data plane Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0388de17 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0388de17 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0388de17 Branch: refs/heads/master Commit: 0388de17acb83a9031b3934a577520d226f8f09d Parents: 44a7a88 Author: Kenneth Knowles <k...@google.com> Authored: Wed Jul 26 07:48:48 2017 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Sun Sep 3 19:42:57 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/core/SdkHarnessDoFnRunner.java | 100 +++++++++++++++++++ .../runners/core/SdkHarnessDoFnRunnerTest.java | 73 ++++++++++++++ 2 files changed, 173 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/0388de17/runners/core-java/src/main/java/org/apache/beam/runners/core/SdkHarnessDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SdkHarnessDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SdkHarnessDoFnRunner.java new file mode 100644 index 0000000..27e784e --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SdkHarnessDoFnRunner.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static com.google.common.base.Preconditions.checkState; + +import java.util.concurrent.ExecutionException; +import javax.annotation.Nullable; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.util.WindowedValue; +import org.joda.time.Instant; + +/** Processes a bundle by sending it to an SDK harness over the Fn API. */ +public class SdkHarnessDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> { + + private final SdkHarnessClient sdkHarnessClient; + private final String processBundleDescriptorId; + + /** {@code null} between bundles. */ + @Nullable private SdkHarnessClient.ActiveBundle activeBundle; + + private SdkHarnessDoFnRunner( + SdkHarnessClient sdkHarnessClient, + String processBundleDescriptorId) { + this.sdkHarnessClient = sdkHarnessClient; + this.processBundleDescriptorId = processBundleDescriptorId; + } + + /** + * Returns a new {@link SdkHarnessDoFnRunner} suitable for just a particular {@link + * org.apache.beam.fn.v1.BeamFnApi.ProcessBundleDescriptor} (referenced by id here). + * + * <p>The {@link FnDataReceiver} must be the correct data plane service referenced + * in the primitive instructions in the + * {@link org.apache.beam.fn.v1.BeamFnApi.ProcessBundleDescriptor}. + * + * <p>Also outside of this class, the appropriate receivers must be registered with the + * output data plane channels of the descriptor. + */ + public static <InputT, OutputT> SdkHarnessDoFnRunner<InputT, OutputT> create( + SdkHarnessClient sdkHarnessClient, + String processBundleDescriptorId) { + return new SdkHarnessDoFnRunner(sdkHarnessClient, processBundleDescriptorId); + } + + @Override + public void startBundle() { + this.activeBundle = + sdkHarnessClient.newBundle(processBundleDescriptorId); + } + + @Override + public void processElement(WindowedValue<InputT> elem) { + checkState( + activeBundle != null, + "%s attempted to process an element without an active bundle", + SdkHarnessDoFnRunner.class.getSimpleName()); + + try { + activeBundle.getInputReceiver().accept(elem); + } catch (Exception exc) { + throw new RuntimeException(exc); + } + } + + @Override + public void onTimer( + String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) { + throw new UnsupportedOperationException("Timers are not supported over the Fn API"); + } + + @Override + public void finishBundle() { + try { + activeBundle.getBundleResponse().get(); + } catch (InterruptedException interrupted) { + Thread.interrupted(); + return; + } catch (ExecutionException exc) { + throw UserCodeException.wrap(exc); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0388de17/runners/core-java/src/test/java/org/apache/beam/runners/core/SdkHarnessDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SdkHarnessDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SdkHarnessDoFnRunnerTest.java new file mode 100644 index 0000000..68634f8 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SdkHarnessDoFnRunnerTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.when; + +import com.google.common.util.concurrent.SettableFuture; +import java.io.IOException; +import org.apache.beam.fn.v1.BeamFnApi; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** Unit tests for {@link SdkHarnessDoFnRunner}. */ +@RunWith(JUnit4.class) +public class SdkHarnessDoFnRunnerTest { + @Mock private SdkHarnessClient mockClient; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testStartAndFinishBundleDoesNotCrash() { + String processBundleDescriptorId = "testDescriptor"; + String bundleId = "testBundle"; + SdkHarnessDoFnRunner<Void, Void> underTest = + SdkHarnessDoFnRunner.<Void, Void>create(mockClient, processBundleDescriptorId); + + SettableFuture<BeamFnApi.ProcessBundleResponse> processBundleResponseFuture = + SettableFuture.create(); + FnDataReceiver dummyInputReceiver = new FnDataReceiver() { + @Override + public void accept(Object input) throws Exception { + fail("Dummy input receiver should not have received data"); + } + + @Override + public void close() throws IOException { + // noop + } + }; + SdkHarnessClient.ActiveBundle activeBundle = + SdkHarnessClient.ActiveBundle.create( + bundleId, processBundleResponseFuture, dummyInputReceiver); + + when(mockClient.newBundle(anyString())).thenReturn(activeBundle); + underTest.startBundle(); + processBundleResponseFuture.set(BeamFnApi.ProcessBundleResponse.getDefaultInstance()); + underTest.finishBundle(); + } +}