Repository: beam
Updated Branches:
  refs/heads/master c3bcd4b42 -> 8b540d2dd


Basic Java wrapper for Fn API data plane


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/46ee5a52
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/46ee5a52
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/46ee5a52

Branch: refs/heads/master
Commit: 46ee5a52030794ecf6954af524255256242438eb
Parents: c3bcd4b
Author: Kenneth Knowles <k...@google.com>
Authored: Wed Aug 23 06:20:33 2017 -0700
Committer: Kenneth Knowles <k...@google.com>
Committed: Sun Sep 3 19:23:36 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/FnDataReceiver.java       | 33 ++++++++
 .../apache/beam/runners/core/FnDataService.java | 81 ++++++++++++++++++++
 2 files changed, 114 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/46ee5a52/runners/core-java/src/main/java/org/apache/beam/runners/core/FnDataReceiver.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/FnDataReceiver.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/FnDataReceiver.java
new file mode 100644
index 0000000..98c5e7f
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/FnDataReceiver.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.io.Closeable;
+
+/**
+ * A receiver of streamed data.
+ *
+ * <p>Provide a {@link FnDataReceiver} and target to a {@link FnDataService} 
to listen for incoming
+ * data.
+ *
+ * <p>Register a target with a {@link FnDataService} to gain a {@link 
FnDataReceiver} to which you
+ * may write outgoing data.
+ */
+public interface FnDataReceiver<T> extends Closeable {
+  void accept(T input) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/46ee5a52/runners/core-java/src/main/java/org/apache/beam/runners/core/FnDataService.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/FnDataService.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/FnDataService.java
new file mode 100644
index 0000000..0b23ded
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/FnDataService.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * The {@link FnDataService} is able to forward inbound elements to a consumer 
and is also a
+ * consumer of outbound elements. Callers can register themselves as consumers 
for inbound elements
+ * or can get a handle for a consumer for outbound elements.
+ */
+public interface FnDataService {
+
+  /**
+   * A logical endpoint is a pair of an instruction ID corresponding to the 
{@link
+   * org.apache.beam.fn.v1.BeamFnApi.ProcessBundleRequest} and the {@link
+   * org.apache.beam.fn.v1.BeamFnApi.Target} within the processing graph. This 
enables the same
+   * {@link FnDataService} to be re-used across multiple bundles.
+   */
+  @AutoValue
+  abstract class LogicalEndpoint {
+
+    public abstract String getInstructionId();
+
+    public abstract BeamFnApi.Target getTarget();
+
+    public static LogicalEndpoint of(String instructionId, BeamFnApi.Target 
target) {
+      return new AutoValue_FnDataService_LogicalEndpoint(instructionId, 
target);
+    }
+  }
+
+  /**
+   * Registers a receiver to be notified upon any incoming elements.
+   *
+   * <p>The provided coder is used to decode inbound elements. The decoded 
elements are passed to
+   * the provided receiver.
+   *
+   * <p>Any failure during decoding or processing of the element will complete 
the returned future
+   * exceptionally. On successful termination of the stream, the returned 
future is completed
+   * successfully.
+   *
+   * <p>The provided receiver is not required to be thread safe.
+   */
+  <T> ListenableFuture<Void> listen(
+      LogicalEndpoint inputLocation,
+      Coder<WindowedValue<T>> coder,
+      FnDataReceiver<WindowedValue<T>> listener)
+      throws Exception;
+
+  /**
+   * Creates a receiver to which you can write data values and have them sent 
over this data plane
+   * service.
+   *
+   * <p>The provided coder is used to encode elements on the outbound stream.
+   *
+   * <p>Closing the returned receiver signals the end of the stream.
+   *
+   * <p>The returned receiver is not thread safe.
+   */
+  <T> FnDataReceiver<WindowedValue<T>> send(
+      LogicalEndpoint outputLocation, Coder<WindowedValue<T>> coder) throws 
Exception;
+}

Reply via email to