[ 
https://issues.apache.org/jira/browse/BEAM-2899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16272079#comment-16272079
 ] 

ASF GitHub Bot commented on BEAM-2899:
--------------------------------------

tgroh closed pull request #4105: [BEAM-2899]  Fork FnDataService from 
runners-core
URL: https://github.com/apache/beam/pull/4105
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml
index 48111348ff5..86acf826288 100644
--- a/runners/core-java/pom.xml
+++ b/runners/core-java/pom.xml
@@ -67,11 +67,6 @@
       <artifactId>beam-sdks-java-core</artifactId>
     </dependency>
 
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-model-fn-execution</artifactId>
-    </dependency>
-
     <dependency>
       <groupId>org.apache.beam</groupId>
       <artifactId>beam-runners-core-construction-java</artifactId>
@@ -100,26 +95,11 @@
       <artifactId>guava</artifactId>
     </dependency>
 
-    <dependency>
-      <groupId>io.grpc</groupId>
-      <artifactId>grpc-core</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>io.grpc</groupId>
-      <artifactId>grpc-stub</artifactId>
-    </dependency>
-
     <dependency>
       <groupId>joda-time</groupId>
       <artifactId>joda-time</artifactId>
     </dependency>
 
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-    </dependency>
-
     <!-- test dependencies -->
 
     <!-- Utilities such as WindowMatchers -->
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClient.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClient.java
deleted file mode 100644
index 811444c96ce..00000000000
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClient.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.fn;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import io.grpc.Status;
-import io.grpc.StatusRuntimeException;
-import io.grpc.stub.StreamObserver;
-import java.io.Closeable;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A client for the control plane of an SDK harness, which can issue requests 
to it over the Fn API.
- *
- * <p>This class presents a low-level Java API de-inverting the Fn API's gRPC 
layer.
- *
- * <p>The Fn API is inverted so the runner is the server and the SDK harness 
is the client, for
- * firewalling reasons (the runner may execute in a more privileged 
environment forbidding outbound
- * connections).
- *
- * <p>This low-level client is responsible only for correlating requests with 
responses.
- *
- * @deprecated Runners should depend on the beam-runners-java-fn-execution 
module for this
- *     functionality.
- */
-@Deprecated
-class FnApiControlClient implements Closeable {
-  private static final Logger LOG = 
LoggerFactory.getLogger(FnApiControlClient.class);
-
-  // All writes to this StreamObserver need to be synchronized.
-  private final StreamObserver<BeamFnApi.InstructionRequest> requestReceiver;
-  private final ResponseStreamObserver responseObserver = new 
ResponseStreamObserver();
-  private final Map<String, SettableFuture<BeamFnApi.InstructionResponse>> 
outstandingRequests;
-  private volatile boolean isClosed;
-
-  private FnApiControlClient(StreamObserver<BeamFnApi.InstructionRequest> 
requestReceiver) {
-    this.requestReceiver = requestReceiver;
-    this.outstandingRequests = new ConcurrentHashMap<>();
-  }
-
-  /**
-   * Returns a {@link FnApiControlClient} which will submit its requests to 
the provided
-   * observer.
-   *
-   * <p>It is the responsibility of the caller to register this object as an 
observer of incoming
-   * responses (this will generally be done as part of fulfilling the contract 
of a gRPC service).
-   */
-  public static FnApiControlClient forRequestObserver(
-      StreamObserver<BeamFnApi.InstructionRequest> requestObserver) {
-    return new FnApiControlClient(requestObserver);
-  }
-
-  public synchronized ListenableFuture<BeamFnApi.InstructionResponse> handle(
-      BeamFnApi.InstructionRequest request) {
-    LOG.debug("Sending InstructionRequest {}", request);
-    SettableFuture<BeamFnApi.InstructionResponse> resultFuture = 
SettableFuture.create();
-    outstandingRequests.put(request.getInstructionId(), resultFuture);
-    requestReceiver.onNext(request);
-    return resultFuture;
-  }
-
-  StreamObserver<BeamFnApi.InstructionResponse> asResponseObserver() {
-    return responseObserver;
-  }
-
-  @Override
-  public void close() {
-    closeAndTerminateOutstandingRequests(new IllegalStateException("Runner 
closed connection"));
-  }
-
-  /** Closes this client and terminates any outstanding requests 
exceptionally. */
-  private synchronized void closeAndTerminateOutstandingRequests(Throwable 
cause) {
-    if (isClosed) {
-      return;
-    }
-
-    // Make a copy of the map to make the view of the outstanding requests 
consistent.
-    Map<String, SettableFuture<BeamFnApi.InstructionResponse>> 
outstandingRequestsCopy =
-        new ConcurrentHashMap<>(outstandingRequests);
-    outstandingRequests.clear();
-    isClosed = true;
-
-    if (outstandingRequestsCopy.isEmpty()) {
-      requestReceiver.onCompleted();
-      return;
-    }
-    requestReceiver.onError(
-        new 
StatusRuntimeException(Status.CANCELLED.withDescription(cause.getMessage())));
-
-    LOG.error(
-        "{} closed, clearing outstanding requests {}",
-        FnApiControlClient.class.getSimpleName(),
-        outstandingRequestsCopy);
-    for (SettableFuture<BeamFnApi.InstructionResponse> outstandingRequest :
-        outstandingRequestsCopy.values()) {
-      outstandingRequest.setException(cause);
-    }
-  }
-
-  /**
-   * A private view of this class as a {@link StreamObserver} for connecting 
as a gRPC listener.
-   */
-  private class ResponseStreamObserver implements 
StreamObserver<BeamFnApi.InstructionResponse> {
-    /**
-     * Processes an incoming {@link BeamFnApi.InstructionResponse} by 
correlating it with the
-     * corresponding {@link BeamFnApi.InstructionRequest} and completes the 
future that was returned
-     * by {@link #handle}.
-     */
-    @Override
-    public void onNext(BeamFnApi.InstructionResponse response) {
-      LOG.debug("Received InstructionResponse {}", response);
-      SettableFuture<BeamFnApi.InstructionResponse> completableFuture =
-          outstandingRequests.remove(response.getInstructionId());
-      if (completableFuture != null) {
-        completableFuture.set(response);
-      }
-    }
-
-    /** */
-    @Override
-    public void onCompleted() {
-      closeAndTerminateOutstandingRequests(
-          new IllegalStateException("SDK harness closed connection"));
-    }
-
-    @Override
-    public void onError(Throwable cause) {
-      LOG.error("{} received error {}", 
FnApiControlClient.class.getSimpleName(), cause);
-      closeAndTerminateOutstandingRequests(cause);
-    }
-  }
-}
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClientPoolService.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClientPoolService.java
deleted file mode 100644
index 21fc4f73fd0..00000000000
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClientPoolService.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.fn;
-
-import io.grpc.stub.StreamObserver;
-import java.util.concurrent.BlockingQueue;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A Fn API control service which adds incoming SDK harness connections to a 
pool.
- *
- * @deprecated Runners should depend on the beam-runners-java-fn-execution 
module for this
- *     functionality.
- */
-@Deprecated
-public class FnApiControlClientPoolService extends 
BeamFnControlGrpc.BeamFnControlImplBase {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(FnApiControlClientPoolService.class);
-
-  private final BlockingQueue<FnApiControlClient> clientPool;
-
-  private FnApiControlClientPoolService(BlockingQueue<FnApiControlClient> 
clientPool) {
-    this.clientPool = clientPool;
-  }
-
-  /**
-   * Creates a new {@link FnApiControlClientPoolService} which will enqueue 
and vend new SDK harness
-   * connections.
-   */
-  public static FnApiControlClientPoolService offeringClientsToPool(
-      BlockingQueue<FnApiControlClient> clientPool) {
-    return new FnApiControlClientPoolService(clientPool);
-  }
-
-  /**
-   * Called by gRPC for each incoming connection from an SDK harness, and 
enqueue an available SDK
-   * harness client.
-   *
-   * <p>Note: currently does not distinguish what sort of SDK it is, so a 
separate instance is
-   * required for each.
-   */
-  @Override
-  public StreamObserver<BeamFnApi.InstructionResponse> control(
-      StreamObserver<BeamFnApi.InstructionRequest> requestObserver) {
-    LOGGER.info("Beam Fn Control client connected.");
-    FnApiControlClient newClient = 
FnApiControlClient.forRequestObserver(requestObserver);
-    try {
-      clientPool.put(newClient);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new RuntimeException(e);
-    }
-    return newClient.asResponseObserver();
-  }
-}
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnDataReceiver.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnDataReceiver.java
deleted file mode 100644
index 639d678ad03..00000000000
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnDataReceiver.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.fn;
-
-import java.io.Closeable;
-
-/**
- * A receiver of streamed data.
- *
- * <p>Provide a {@link FnDataReceiver} and target to a {@link FnDataService} 
to listen for incoming
- * data.
- *
- * <p>Register a target with a {@link FnDataService} to gain a {@link 
FnDataReceiver} to which you
- * may write outgoing data.
- *
- * @deprecated Runners should depend on the beam-runners-java-fn-execution 
module for this
- *     functionality.
- */
-@Deprecated
-public interface FnDataReceiver<T> extends Closeable {
-  void accept(T input) throws Exception;
-}
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/SdkHarnessClient.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/SdkHarnessClient.java
deleted file mode 100644
index 091dea14168..00000000000
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/SdkHarnessClient.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.fn;
-
-import com.google.auto.value.AutoValue;
-import com.google.common.base.Function;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import java.io.IOException;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-
-/**
- * A high-level client for an SDK harness.
- *
- * <p>This provides a Java-friendly wrapper around {@link FnApiControlClient} 
and {@link
- * FnDataReceiver}, which handle lower-level gRPC message wrangling.
- *
- * @deprecated Runners should depend on the beam-runners-java-fn-execution 
module for this
- *     functionality.
- */
-@Deprecated
-public class SdkHarnessClient {
-
-  /**
-   * A supply of unique identifiers, used internally. These must be unique 
across all Fn API
-   * clients.
-   */
-  public interface IdGenerator {
-    String getId();
-  }
-
-  /** A supply of unique identifiers that are simply incrementing longs. */
-  private static class CountingIdGenerator implements IdGenerator {
-    private final AtomicLong nextId = new AtomicLong(0L);
-
-    @Override
-    public String getId() {
-      return String.valueOf(nextId.incrementAndGet());
-    }
-  }
-
-  /**
-   * An active bundle for a particular {@link
-   * BeamFnApi.ProcessBundleDescriptor}.
-   */
-  @AutoValue
-  public abstract static class ActiveBundle<InputT> {
-    public abstract String getBundleId();
-
-    public abstract Future<BeamFnApi.ProcessBundleResponse> 
getBundleResponse();
-
-    public abstract FnDataReceiver<InputT> getInputReceiver();
-
-    public static <InputT> ActiveBundle<InputT> create(
-        String bundleId,
-        Future<BeamFnApi.ProcessBundleResponse> response,
-        FnDataReceiver<InputT> dataReceiver) {
-      return new AutoValue_SdkHarnessClient_ActiveBundle(bundleId, response, 
dataReceiver);
-    }
-  }
-
-  private final IdGenerator idGenerator;
-  private final FnApiControlClient fnApiControlClient;
-
-  private SdkHarnessClient(
-      FnApiControlClient fnApiControlClient,
-      IdGenerator idGenerator) {
-    this.idGenerator = idGenerator;
-    this.fnApiControlClient = fnApiControlClient;
-  }
-
-  /**
-   * Creates a client for a particular SDK harness. It is the responsibility 
of the caller to ensure
-   * that these correspond to the same SDK harness, so control plane and data 
plane messages can be
-   * correctly associated.
-   */
-  public static SdkHarnessClient usingFnApiClient(FnApiControlClient 
fnApiControlClient) {
-    return new SdkHarnessClient(fnApiControlClient, new CountingIdGenerator());
-  }
-
-  public SdkHarnessClient withIdGenerator(IdGenerator idGenerator) {
-    return new SdkHarnessClient(fnApiControlClient, idGenerator);
-  }
-
-  /**
-   * Registers a {@link BeamFnApi.ProcessBundleDescriptor} for future
-   * processing.
-   *
-   * <p>A client may block on the result future, but may also proceed without 
blocking.
-   */
-  public Future<BeamFnApi.RegisterResponse> register(
-      Iterable<BeamFnApi.ProcessBundleDescriptor> processBundleDescriptors) {
-
-    // TODO: validate that all the necessary data endpoints are known
-
-    ListenableFuture<BeamFnApi.InstructionResponse> genericResponse =
-        fnApiControlClient.handle(
-            BeamFnApi.InstructionRequest.newBuilder()
-                .setInstructionId(idGenerator.getId())
-                .setRegister(
-                    BeamFnApi.RegisterRequest.newBuilder()
-                        
.addAllProcessBundleDescriptor(processBundleDescriptors)
-                        .build())
-                .build());
-
-    return Futures.transform(
-        genericResponse,
-        new Function<BeamFnApi.InstructionResponse, 
BeamFnApi.RegisterResponse>() {
-          @Override
-          public BeamFnApi.RegisterResponse 
apply(BeamFnApi.InstructionResponse input) {
-            return input.getRegister();
-          }
-        });
-  }
-
-  /**
-   * Start a new bundle for the given {@link
-   * BeamFnApi.ProcessBundleDescriptor} identifier.
-   *
-   * <p>The input channels for the returned {@link ActiveBundle} are derived 
from the
-   * instructions in the {@link BeamFnApi.ProcessBundleDescriptor}.
-   */
-  public ActiveBundle newBundle(String processBundleDescriptorId) {
-    String bundleId = idGenerator.getId();
-
-    // TODO: acquire an input receiver from appropriate FnDataService
-    FnDataReceiver dataReceiver = new FnDataReceiver() {
-      @Override
-      public void accept(Object input) throws Exception {
-        throw new UnsupportedOperationException("Placeholder FnDataReceiver 
cannot accept data.");
-      }
-
-      @Override
-      public void close() throws IOException {
-        // noop
-      }
-    };
-
-    ListenableFuture<BeamFnApi.InstructionResponse> genericResponse =
-        fnApiControlClient.handle(
-            BeamFnApi.InstructionRequest.newBuilder()
-                .setProcessBundle(
-                    BeamFnApi.ProcessBundleRequest.newBuilder()
-                        
.setProcessBundleDescriptorReference(processBundleDescriptorId))
-                .build());
-
-    ListenableFuture<BeamFnApi.ProcessBundleResponse> specificResponse =
-        Futures.transform(
-            genericResponse,
-            new Function<BeamFnApi.InstructionResponse, 
BeamFnApi.ProcessBundleResponse>() {
-              @Override
-              public BeamFnApi.ProcessBundleResponse 
apply(BeamFnApi.InstructionResponse input) {
-                return input.getProcessBundle();
-              }
-            });
-
-    return ActiveBundle.create(bundleId, specificResponse, dataReceiver);
-  }
-}
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/SdkHarnessDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/SdkHarnessDoFnRunner.java
deleted file mode 100644
index d27077fdde3..00000000000
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/SdkHarnessDoFnRunner.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.fn;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import java.util.concurrent.ExecutionException;
-import javax.annotation.Nullable;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor;
-import org.apache.beam.runners.core.DoFnRunner;
-import org.apache.beam.sdk.state.TimeDomain;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.joda.time.Instant;
-
-/**
- * Processes a bundle by sending it to an SDK harness over the Fn API.
- *
- * @deprecated Runners should interact with the Control and Data plane 
directly, rather than through
- *     a {@link DoFnRunner}. Consider the beam-runners-java-fn-execution 
artifact instead.
- */
-@Deprecated
-public class SdkHarnessDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, OutputT> {
-
-  private final SdkHarnessClient sdkHarnessClient;
-  private final String processBundleDescriptorId;
-
-  /** {@code null} between bundles. */
-  @Nullable private SdkHarnessClient.ActiveBundle activeBundle;
-
-  private SdkHarnessDoFnRunner(
-      SdkHarnessClient sdkHarnessClient,
-      String processBundleDescriptorId) {
-    this.sdkHarnessClient = sdkHarnessClient;
-    this.processBundleDescriptorId = processBundleDescriptorId;
-  }
-
-  /**
-   * Returns a new {@link SdkHarnessDoFnRunner} suitable for just a particular 
{@link
-   * ProcessBundleDescriptor} (referenced by id here).
-   *
-   * <p>The {@link FnDataReceiver} must be the correct data plane service 
referenced
-   * in the primitive instructions in the
-   * {@link ProcessBundleDescriptor}.
-   *
-   * <p>Also outside of this class, the appropriate receivers must be 
registered with the
-   * output data plane channels of the descriptor.
-   */
-  public static <InputT, OutputT> SdkHarnessDoFnRunner<InputT, OutputT> create(
-      SdkHarnessClient sdkHarnessClient,
-      String processBundleDescriptorId) {
-    return new SdkHarnessDoFnRunner(sdkHarnessClient, 
processBundleDescriptorId);
-  }
-
-  @Override
-  public void startBundle() {
-    this.activeBundle =
-        sdkHarnessClient.newBundle(processBundleDescriptorId);
-  }
-
-  @Override
-  public void processElement(WindowedValue<InputT> elem) {
-    checkState(
-        activeBundle != null,
-        "%s attempted to process an element without an active bundle",
-        SdkHarnessDoFnRunner.class.getSimpleName());
-
-    try {
-      activeBundle.getInputReceiver().accept(elem);
-    } catch (Exception exc) {
-      throw new RuntimeException(exc);
-    }
-  }
-
-  @Override
-  public void onTimer(
-      String timerId, BoundedWindow window, Instant timestamp, TimeDomain 
timeDomain) {
-    throw new UnsupportedOperationException("Timers are not supported over the 
Fn API");
-  }
-
-  @Override
-  public void finishBundle() {
-    try {
-      activeBundle.getBundleResponse().get();
-    } catch (InterruptedException interrupted) {
-      Thread.interrupted();
-      return;
-    } catch (ExecutionException exc) {
-      throw UserCodeException.wrap(exc);
-    }
-  }
-}
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/package-info.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/package-info.java
deleted file mode 100644
index bea8051c49d..00000000000
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/package-info.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Provides utilities for a Beam runner to interact with a client using the Fn 
API.
- */
-@DefaultAnnotation(NonNull.class)
-package org.apache.beam.runners.core.fn;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/fn/FnApiControlClientPoolServiceTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/fn/FnApiControlClientPoolServiceTest.java
deleted file mode 100644
index da02d924eef..00000000000
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/fn/FnApiControlClientPoolServiceTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.fn;
-
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import io.grpc.stub.StreamObserver;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Unit tests for {@link FnApiControlClientPoolService}. */
-@RunWith(JUnit4.class)
-public class FnApiControlClientPoolServiceTest {
-
-  // For ease of straight-line testing, we use a LinkedBlockingQueue; in 
practice a SynchronousQueue
-  // for matching incoming connections and server threads is likely.
-  private final BlockingQueue<FnApiControlClient> pool = new 
LinkedBlockingQueue<>();
-  private FnApiControlClientPoolService controlService =
-      FnApiControlClientPoolService.offeringClientsToPool(pool);
-
-  @Test
-  public void testIncomingConnection() throws Exception {
-    StreamObserver<BeamFnApi.InstructionRequest> requestObserver = 
mock(StreamObserver.class);
-    StreamObserver<BeamFnApi.InstructionResponse> responseObserver =
-        controlService.control(requestObserver);
-
-    FnApiControlClient client = pool.take();
-
-    // Check that the client is wired up to the request channel
-    String id = "fakeInstruction";
-    ListenableFuture<BeamFnApi.InstructionResponse> responseFuture =
-        
client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
-    verify(requestObserver).onNext(any(BeamFnApi.InstructionRequest.class));
-    assertThat(responseFuture.isDone(), is(false));
-
-    // Check that the response channel really came from the client
-    responseObserver.onNext(
-        
BeamFnApi.InstructionResponse.newBuilder().setInstructionId(id).build());
-    responseFuture.get();
-  }
-}
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/fn/FnApiControlClientTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/fn/FnApiControlClientTest.java
deleted file mode 100644
index 279e974cc31..00000000000
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/fn/FnApiControlClientTest.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.fn;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.isA;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.verify;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import io.grpc.stub.StreamObserver;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/** Unit tests for {@link FnApiControlClient}. */
-@RunWith(JUnit4.class)
-public class FnApiControlClientTest {
-
-  @Rule public ExpectedException thrown = ExpectedException.none();
-
-  @Mock public StreamObserver<BeamFnApi.InstructionRequest> mockObserver;
-  private FnApiControlClient client;
-
-  @Before
-  public void setup() {
-    MockitoAnnotations.initMocks(this);
-    client = FnApiControlClient.forRequestObserver(mockObserver);
-  }
-
-  @Test
-  public void testRequestSent() {
-    String id = "instructionId";
-    
client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
-
-    verify(mockObserver).onNext(any(BeamFnApi.InstructionRequest.class));
-  }
-
-  @Test
-  public void testRequestSuccess() throws Exception {
-    String id = "successfulInstruction";
-
-    Future<BeamFnApi.InstructionResponse> responseFuture =
-        
client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
-    client
-        .asResponseObserver()
-        
.onNext(BeamFnApi.InstructionResponse.newBuilder().setInstructionId(id).build());
-
-    BeamFnApi.InstructionResponse response = responseFuture.get();
-
-    assertThat(response.getInstructionId(), equalTo(id));
-  }
-
-  @Test
-  public void testUnknownResponseIgnored() throws Exception {
-    String id = "actualInstruction";
-    String unknownId = "unknownInstruction";
-
-    ListenableFuture<BeamFnApi.InstructionResponse> responseFuture =
-        
client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
-
-    client
-        .asResponseObserver()
-        
.onNext(BeamFnApi.InstructionResponse.newBuilder().setInstructionId(unknownId).build());
-
-    assertThat(responseFuture.isDone(), is(false));
-    assertThat(responseFuture.isCancelled(), is(false));
-  }
-
-  @Test
-  public void testOnCompletedCancelsOutstanding() throws Exception {
-    String id = "clientHangUpInstruction";
-
-    Future<BeamFnApi.InstructionResponse> responseFuture =
-        
client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
-
-    client.asResponseObserver().onCompleted();
-
-    thrown.expect(ExecutionException.class);
-    thrown.expectCause(isA(IllegalStateException.class));
-    thrown.expectMessage("closed");
-    responseFuture.get();
-  }
-
-  @Test
-  public void testOnErrorCancelsOutstanding() throws Exception {
-    String id = "errorInstruction";
-
-    Future<BeamFnApi.InstructionResponse> responseFuture =
-        
client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
-
-    class FrazzleException extends Exception {}
-    client.asResponseObserver().onError(new FrazzleException());
-
-    thrown.expect(ExecutionException.class);
-    thrown.expectCause(isA(FrazzleException.class));
-    responseFuture.get();
-  }
-
-  @Test
-  public void testCloseCancelsOutstanding() throws Exception {
-    String id = "serverCloseInstruction";
-
-    Future<BeamFnApi.InstructionResponse> responseFuture =
-        
client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
-
-    client.close();
-
-    thrown.expect(ExecutionException.class);
-    thrown.expectCause(isA(IllegalStateException.class));
-    thrown.expectMessage("closed");
-    responseFuture.get();
-  }
-}
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/fn/SdkHarnessClientTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/fn/SdkHarnessClientTest.java
deleted file mode 100644
index 7783b2f2f88..00000000000
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/fn/SdkHarnessClientTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.fn;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.when;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.concurrent.Future;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/** Unit tests for {@link SdkHarnessClient}. */
-@RunWith(JUnit4.class)
-public class SdkHarnessClientTest {
-
-  @Mock public FnApiControlClient fnApiControlClient;
-
-  private SdkHarnessClient sdkHarnessClient;
-
-  @Before
-  public void setup() {
-    MockitoAnnotations.initMocks(this);
-    sdkHarnessClient = SdkHarnessClient.usingFnApiClient(fnApiControlClient);
-  }
-
-  @Test
-  public void testRegisterDoesNotCrash() throws Exception {
-    String descriptorId1 = "descriptor1";
-    String descriptorId2 = "descriptor2";
-
-    SettableFuture<BeamFnApi.InstructionResponse> registerResponseFuture = 
SettableFuture.create();
-    when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class)))
-        .thenReturn(registerResponseFuture);
-
-    Future<BeamFnApi.RegisterResponse> responseFuture = 
sdkHarnessClient.register(
-        ImmutableList.of(
-            
BeamFnApi.ProcessBundleDescriptor.newBuilder().setId(descriptorId1).build(),
-            
BeamFnApi.ProcessBundleDescriptor.newBuilder().setId(descriptorId2).build()));
-
-    // Correlating the RegisterRequest and RegisterResponse is owned by the 
underlying
-    // FnApiControlClient. The SdkHarnessClient owns just wrapping the request 
and unwrapping
-    // the response.
-    //
-    // Currently there are no fields so there's nothing to check. This test is 
formulated
-    // to match the pattern it should have if/when the response is meaningful.
-    BeamFnApi.RegisterResponse response = 
BeamFnApi.RegisterResponse.getDefaultInstance();
-    registerResponseFuture.set(
-        
BeamFnApi.InstructionResponse.newBuilder().setRegister(response).build());
-    responseFuture.get();
-  }
-
-  @Test
-  public void testNewBundleNoDataDoesNotCrash() throws Exception {
-    String descriptorId1 = "descriptor1";
-
-    SettableFuture<BeamFnApi.InstructionResponse> processBundleResponseFuture =
-        SettableFuture.create();
-    when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class)))
-        .thenReturn(processBundleResponseFuture);
-
-    SdkHarnessClient.ActiveBundle activeBundle = 
sdkHarnessClient.newBundle(descriptorId1);
-
-    // Correlating the ProcessBundleRequest and ProcessBundleReponse is owned 
by the underlying
-    // FnApiControlClient. The SdkHarnessClient owns just wrapping the request 
and unwrapping
-    // the response.
-    //
-    // Currently there are no fields so there's nothing to check. This test is 
formulated
-    // to match the pattern it should have if/when the response is meaningful.
-    BeamFnApi.ProcessBundleResponse response = 
BeamFnApi.ProcessBundleResponse.getDefaultInstance();
-    processBundleResponseFuture.set(
-        
BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response).build());
-    activeBundle.getBundleResponse().get();
-  }
-}
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/fn/SdkHarnessDoFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/fn/SdkHarnessDoFnRunnerTest.java
deleted file mode 100644
index 8f160049e05..00000000000
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/fn/SdkHarnessDoFnRunnerTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.fn;
-
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.when;
-
-import com.google.common.util.concurrent.SettableFuture;
-import java.io.IOException;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/** Unit tests for {@link SdkHarnessDoFnRunner}. */
-@RunWith(JUnit4.class)
-public class SdkHarnessDoFnRunnerTest {
-  @Mock private SdkHarnessClient mockClient;
-
-  @Before
-  public void setUp() {
-    MockitoAnnotations.initMocks(this);
-  }
-
-  @Test
-  public void testStartAndFinishBundleDoesNotCrash() {
-    String processBundleDescriptorId = "testDescriptor";
-    String bundleId = "testBundle";
-    SdkHarnessDoFnRunner<Void, Void> underTest =
-        SdkHarnessDoFnRunner.<Void, Void>create(mockClient, 
processBundleDescriptorId);
-
-    SettableFuture<BeamFnApi.ProcessBundleResponse> 
processBundleResponseFuture =
-        SettableFuture.create();
-    FnDataReceiver dummyInputReceiver = new FnDataReceiver() {
-      @Override
-      public void accept(Object input) throws Exception {
-        fail("Dummy input receiver should not have received data");
-      }
-
-      @Override
-      public void close() throws IOException {
-        // noop
-      }
-    };
-    SdkHarnessClient.ActiveBundle activeBundle =
-        SdkHarnessClient.ActiveBundle.create(
-            bundleId, processBundleResponseFuture, dummyInputReceiver);
-
-    when(mockClient.newBundle(anyString())).thenReturn(activeBundle);
-    underTest.startBundle();
-    
processBundleResponseFuture.set(BeamFnApi.ProcessBundleResponse.getDefaultInstance());
-    underTest.finishBundle();
-  }
-}
diff --git a/runners/java-fn-execution/build.gradle 
b/runners/java-fn-execution/build.gradle
index dd4eaaed47d..e948c7ce625 100644
--- a/runners/java-fn-execution/build.gradle
+++ b/runners/java-fn-execution/build.gradle
@@ -1,5 +1,4 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
+/* * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
@@ -30,18 +29,11 @@ description = "Apache Beam :: Runners :: Java Fn Execution"
  */
 
evaluationDependsOn(":beam-sdks-parent:beam-sdks-java-parent:beam-sdks-java-fn-execution")
 
-configurations.all {
-  // Fn Execution contains shared utilities for Runners and Harnesses which 
use            
-  // the Portability framework. Runner-side interactions must not require a
-  // dependency on any particular SDK, so this library must not introduce such 
an
-  // edge.
-  exclude group: "org.apache.beam", module: "beam-sdks-java-core"
-}
-
 dependencies {
   compile library.java.guava
   shadow project(path: ":beam-model-parent:beam-model-pipeline", 
configuration: "shadow")
   shadow project(path: ":beam-model-parent:beam-model-fn-execution", 
configuration: "shadow")
+  shadow project(path: 
":beam-sdks-parent:beam-sdks-java-parent:beam-sdks-java-core", configuration: 
"shadow")
   shadow project(path: 
":beam-sdks-parent:beam-sdks-java-parent:beam-sdks-java-fn-execution", 
configuration: "shadow")
   shadow library.java.grpc_core
   shadow library.java.grpc_stub
diff --git a/runners/java-fn-execution/pom.xml 
b/runners/java-fn-execution/pom.xml
index f275d69207e..85d4da1a6fc 100644
--- a/runners/java-fn-execution/pom.xml
+++ b/runners/java-fn-execution/pom.xml
@@ -48,6 +48,11 @@
       <artifactId>beam-sdks-java-fn-execution</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>io.grpc</groupId>
       <artifactId>grpc-core</artifactId>
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnDataService.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/FnDataService.java
similarity index 93%
rename from 
runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnDataService.java
rename to 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/FnDataService.java
index 2a6777e4bcd..4366cfac514 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnDataService.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/FnDataService.java
@@ -15,7 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.core.fn;
+
+package org.apache.beam.runners.fnexecution.data;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -27,13 +28,8 @@
  * The {@link FnDataService} is able to forward inbound elements to a consumer 
and is also a
  * consumer of outbound elements. Callers can register themselves as consumers 
for inbound elements
  * or can get a handle for a consumer for outbound elements.
- *
- * @deprecated Runners should depend on the beam-runners-java-fn-execution 
module for this
- *     functionality.
  */
-@Deprecated
 public interface FnDataService {
-
   /**
    * A logical endpoint is a pair of an instruction ID corresponding to the 
{@link
    * BeamFnApi.ProcessBundleRequest} and the {@link
@@ -64,7 +60,7 @@ public static LogicalEndpoint of(String instructionId, 
BeamFnApi.Target target)
    *
    * <p>The provided receiver is not required to be thread safe.
    */
-  <T> ListenableFuture<Void> listen(
+  <T> ListenableFuture<Void> receive(
       LogicalEndpoint inputLocation,
       Coder<WindowedValue<T>> coder,
       FnDataReceiver<WindowedValue<T>> listener)
@@ -82,4 +78,5 @@ public static LogicalEndpoint of(String instructionId, 
BeamFnApi.Target target)
    */
   <T> FnDataReceiver<WindowedValue<T>> send(
       LogicalEndpoint outputLocation, Coder<WindowedValue<T>> coder) throws 
Exception;
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Universal Local Runner
> ----------------------
>
>                 Key: BEAM-2899
>                 URL: https://issues.apache.org/jira/browse/BEAM-2899
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-core
>            Reporter: Henning Rohde
>            Assignee: Thomas Groh
>              Labels: portability
>
> To make the portability effort tractable, we should implement a Universal 
> Local Runner (ULR) in Java that runs in a single server process plus docker 
> containers for the SDK harness containers. It would serve multiple purposes:
>   (1) A reference implementation for other runners. Ideally, any new feature 
> should be implemented in the ULR first.
>   (2) A fully-featured test runner for SDKs who participate in the 
> portability framework. It thus complements the direct runners.
>   (3) A test runner for user code that depends on or customizes the runtime 
> environment. For example, a DoFn that shells out has a dependency that may be 
> satisfied on the user's desktop (and thus works fine on the direct runner), 
> but perhaps not by the container harness image. The ULR allows for an easy 
> way to find out.
> The Java direct runner presumably has lots of pieces that can be reused.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to