This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 299a7b01308 Drop '2' from BeamFnDataGrpcMultiplexer2 and 
BeamFnDataInboundObserver2 (#25144)
299a7b01308 is described below

commit 299a7b01308ca2e17618b5e5dace1a3bdb8df737
Author: Luke Cwik <[email protected]>
AuthorDate: Tue Jan 24 14:39:57 2023 -0800

    Drop '2' from BeamFnDataGrpcMultiplexer2 and BeamFnDataInboundObserver2 
(#25144)
    
    This is a follow-up for a comment in 
https://github.com/apache/beam/pull/25104
    
    Also remove InboundDataClient which was missed in 
https://github.com/apache/beam/pull/25104
---
 .../fnexecution/control/SdkHarnessClient.java      | 10 ++--
 .../runners/fnexecution/data/GrpcDataService.java  | 12 ++---
 .../fnexecution/data/GrpcDataServiceTest.java      | 10 ++--
 ...plexer2.java => BeamFnDataGrpcMultiplexer.java} |  8 +--
 ...server2.java => BeamFnDataInboundObserver.java} |  8 +--
 .../apache/beam/sdk/fn/data/InboundDataClient.java | 60 ----------------------
 ...est.java => BeamFnDataGrpcMultiplexerTest.java} | 28 +++++-----
 ...est.java => BeamFnDataInboundObserverTest.java} | 24 ++++-----
 .../fn/harness/control/ProcessBundleHandler.java   | 10 ++--
 .../beam/fn/harness/data/BeamFnDataGrpcClient.java | 12 ++---
 .../fn/harness/data/BeamFnDataGrpcClientTest.java  | 14 ++---
 11 files changed, 68 insertions(+), 128 deletions(-)

diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
index 609988b7d4e..1a2b1c03a0f 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
@@ -51,7 +51,7 @@ import 
org.apache.beam.runners.fnexecution.state.StateRequestHandler;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.fn.IdGenerators;
-import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2;
+import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver;
 import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
 import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
 import org.apache.beam.sdk.fn.data.DataEndpoint;
@@ -268,7 +268,7 @@ public class SdkHarnessClient implements AutoCloseable {
 
       CompletionStage<BeamFnApi.ProcessBundleResponse> specificResponse =
           genericResponse.thenApply(InstructionResponse::getProcessBundle);
-      Optional<BeamFnDataInboundObserver2> beamFnDataInboundObserver;
+      Optional<BeamFnDataInboundObserver> beamFnDataInboundObserver;
       if (outputReceivers.isEmpty() && timerReceivers.isEmpty()) {
         beamFnDataInboundObserver = Optional.empty();
       } else {
@@ -291,7 +291,7 @@ public class SdkHarnessClient implements AutoCloseable {
                   timerReceiver.getValue().getReceiver()));
         }
         beamFnDataInboundObserver =
-            Optional.of(BeamFnDataInboundObserver2.forConsumers(dataEndpoints, 
timerEndpoints));
+            Optional.of(BeamFnDataInboundObserver.forConsumers(dataEndpoints, 
timerEndpoints));
         fnApiDataService.registerReceiver(bundleId, 
beamFnDataInboundObserver.get());
       }
 
@@ -339,7 +339,7 @@ public class SdkHarnessClient implements AutoCloseable {
       private final CompletionStage<BeamFnApi.ProcessBundleResponse> response;
       private final BeamFnDataOutboundAggregator beamFnDataOutboundAggregator;
       private final Map<LogicalEndpoint, FnDataReceiver<?>> inputReceivers;
-      private final Optional<BeamFnDataInboundObserver2> 
beamFnDataInboundObserver;
+      private final Optional<BeamFnDataInboundObserver> 
beamFnDataInboundObserver;
       private final StateDelegator.Registration stateRegistration;
       private final BundleProgressHandler progressHandler;
       private final BundleSplitHandler splitHandler;
@@ -354,7 +354,7 @@ public class SdkHarnessClient implements AutoCloseable {
           CompletionStage<ProcessBundleResponse> response,
           BeamFnDataOutboundAggregator beamFnDataOutboundAggregator,
           Map<LogicalEndpoint, FnDataReceiver<?>> inputReceivers,
-          Optional<BeamFnDataInboundObserver2> beamFnDataInboundObserver,
+          Optional<BeamFnDataInboundObserver> beamFnDataInboundObserver,
           StateDelegator.Registration stateRegistration,
           BundleProgressHandler progressHandler,
           BundleSplitHandler splitHandler,
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/GrpcDataService.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/GrpcDataService.java
index 883d72103a9..6e34e0eae67 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/GrpcDataService.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/GrpcDataService.java
@@ -27,7 +27,7 @@ import java.util.function.Supplier;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
 import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
-import org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer2;
+import org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer;
 import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
 import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
 import org.apache.beam.sdk.fn.server.FnService;
@@ -60,7 +60,7 @@ public class GrpcDataService extends 
BeamFnDataGrpc.BeamFnDataImplBase
     return new GrpcDataService(options, executor, outboundObserverFactory);
   }
 
-  private final SettableFuture<BeamFnDataGrpcMultiplexer2> connectedClient;
+  private final SettableFuture<BeamFnDataGrpcMultiplexer> connectedClient;
   /**
    * A collection of multiplexers which are not used to send data. A handle to 
these multiplexers is
    * maintained in order to perform an orderly shutdown.
@@ -68,7 +68,7 @@ public class GrpcDataService extends 
BeamFnDataGrpc.BeamFnDataImplBase
    * <p>TODO: (BEAM-3811) Replace with some cancellable collection, to ensure 
that new clients of a
    * closed {@link GrpcDataService} are closed with that {@link 
GrpcDataService}.
    */
-  private final Queue<BeamFnDataGrpcMultiplexer2> additionalMultiplexers;
+  private final Queue<BeamFnDataGrpcMultiplexer> additionalMultiplexers;
 
   private final PipelineOptions options;
   private final ExecutorService executor;
@@ -99,8 +99,8 @@ public class GrpcDataService extends 
BeamFnDataGrpc.BeamFnDataImplBase
   public StreamObserver<BeamFnApi.Elements> data(
       final StreamObserver<BeamFnApi.Elements> outboundElementObserver) {
     LOG.info("Beam Fn Data client connected.");
-    BeamFnDataGrpcMultiplexer2 multiplexer =
-        new BeamFnDataGrpcMultiplexer2(
+    BeamFnDataGrpcMultiplexer multiplexer =
+        new BeamFnDataGrpcMultiplexer(
             null, outboundObserverFactory, inbound -> outboundElementObserver);
     // First client that connects completes this future.
     if (!connectedClient.set(multiplexer)) {
@@ -121,7 +121,7 @@ public class GrpcDataService extends 
BeamFnDataGrpc.BeamFnDataImplBase
     // Multiplexer, but if there isn't any multiplexer it prevents callers 
blocking forever.
     connectedClient.cancel(true);
     // Close any other open connections
-    for (BeamFnDataGrpcMultiplexer2 additional : additionalMultiplexers) {
+    for (BeamFnDataGrpcMultiplexer additional : additionalMultiplexers) {
       try {
         additional.close();
       } catch (Exception ignored) {
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/data/GrpcDataServiceTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/data/GrpcDataServiceTest.java
index 86bf247fa34..6766928b968 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/data/GrpcDataServiceTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/data/GrpcDataServiceTest.java
@@ -40,7 +40,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.LengthPrefixCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2;
+import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver;
 import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
 import org.apache.beam.sdk.fn.data.DataEndpoint;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
@@ -150,18 +150,18 @@ public class GrpcDataServiceTest {
       }
 
       List<Collection<WindowedValue<String>>> serverInboundValues = new 
ArrayList<>();
-      Collection<BeamFnDataInboundObserver2> inboundObservers = new 
ArrayList<>();
+      Collection<BeamFnDataInboundObserver> inboundObservers = new 
ArrayList<>();
       for (int i = 0; i < 3; ++i) {
         final Collection<WindowedValue<String>> serverInboundValue = new 
ArrayList<>();
         serverInboundValues.add(serverInboundValue);
-        BeamFnDataInboundObserver2 inboundObserver =
-            BeamFnDataInboundObserver2.forConsumers(
+        BeamFnDataInboundObserver inboundObserver =
+            BeamFnDataInboundObserver.forConsumers(
                 Arrays.asList(DataEndpoint.create(TRANSFORM_ID, CODER, 
serverInboundValue::add)),
                 Collections.emptyList());
         service.registerReceiver(Integer.toString(i), inboundObserver);
         inboundObservers.add(inboundObserver);
       }
-      for (BeamFnDataInboundObserver2 inboundObserver : inboundObservers) {
+      for (BeamFnDataInboundObserver inboundObserver : inboundObservers) {
         inboundObserver.awaitCompletion();
       }
       waitForInboundElements.countDown();
diff --git 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2.java
 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer.java
similarity index 97%
rename from 
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2.java
rename to 
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer.java
index bc552dce08d..edca740dce9 100644
--- 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2.java
+++ 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer.java
@@ -47,8 +47,8 @@ import org.slf4j.LoggerFactory;
  * <p>TODO: Add support for multiplexing over multiple outbound observers by 
stickying the output
  * location with a specific outbound observer.
  */
-public class BeamFnDataGrpcMultiplexer2 implements AutoCloseable {
-  private static final Logger LOG = 
LoggerFactory.getLogger(BeamFnDataGrpcMultiplexer2.class);
+public class BeamFnDataGrpcMultiplexer implements AutoCloseable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BeamFnDataGrpcMultiplexer.class);
   private final Endpoints.@Nullable ApiServiceDescriptor apiServiceDescriptor;
   private final StreamObserver<BeamFnApi.Elements> inboundObserver;
   private final StreamObserver<BeamFnApi.Elements> outboundObserver;
@@ -57,7 +57,7 @@ public class BeamFnDataGrpcMultiplexer2 implements 
AutoCloseable {
       receivers;
   private final ConcurrentMap<String, Boolean> erroredInstructionIds;
 
-  public BeamFnDataGrpcMultiplexer2(
+  public BeamFnDataGrpcMultiplexer(
       Endpoints.@Nullable ApiServiceDescriptor apiServiceDescriptor,
       OutboundObserverFactory outboundObserverFactory,
       OutboundObserverFactory.BasicFactory<BeamFnApi.Elements, 
BeamFnApi.Elements>
@@ -95,7 +95,7 @@ public class BeamFnDataGrpcMultiplexer2 implements 
AutoCloseable {
   /**
    * Registers a consumer for the specified intruction id.
    *
-   * <p>The {@link BeamFnDataGrpcMultiplexer2} partitions {@link 
BeamFnApi.Elements} with multiple
+   * <p>The {@link BeamFnDataGrpcMultiplexer} partitions {@link 
BeamFnApi.Elements} with multiple
    * instruction ids ensuring that the receiver will only see {@link 
BeamFnApi.Elements} with a
    * single instruction id.
    *
diff --git 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2.java
 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver.java
similarity index 97%
rename from 
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2.java
rename to 
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver.java
index 2008f59faa3..96c8dee4899 100644
--- 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2.java
+++ 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver.java
@@ -37,15 +37,15 @@ import org.apache.beam.sdk.fn.CancellableQueue;
  *
  * <p>Closing the receiver will unblock any upstream producer and downstream 
consumer exceptionally.
  */
-public class BeamFnDataInboundObserver2 implements 
CloseableFnDataReceiver<BeamFnApi.Elements> {
+public class BeamFnDataInboundObserver implements 
CloseableFnDataReceiver<BeamFnApi.Elements> {
 
   /**
    * Creates a receiver that is able to consume elements multiplexing on to 
the provided set of
    * endpoints.
    */
-  public static BeamFnDataInboundObserver2 forConsumers(
+  public static BeamFnDataInboundObserver forConsumers(
       List<DataEndpoint<?>> dataEndpoints, List<TimerEndpoint<?>> 
timerEndpoints) {
-    return new BeamFnDataInboundObserver2(dataEndpoints, timerEndpoints);
+    return new BeamFnDataInboundObserver(dataEndpoints, timerEndpoints);
   }
 
   /** Holds the status of whether the endpoint has been completed or not. */
@@ -78,7 +78,7 @@ public class BeamFnDataInboundObserver2 implements 
CloseableFnDataReceiver<BeamF
   private final int totalNumEndpoints;
   private int numEndpointsThatAreIncomplete;
 
-  private BeamFnDataInboundObserver2(
+  private BeamFnDataInboundObserver(
       List<DataEndpoint<?>> dataEndpoints, List<TimerEndpoint<?>> 
timerEndpoints) {
     this.transformIdToDataEndpoint = new HashMap<>();
     for (DataEndpoint<?> endpoint : dataEndpoints) {
diff --git 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/InboundDataClient.java
 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/InboundDataClient.java
deleted file mode 100644
index afab4ee8f70..00000000000
--- 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/InboundDataClient.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.fn.data;
-
-import java.util.concurrent.CancellationException;
-
-/**
- * A client representing some stream of inbound data. An inbound data client 
can be completed or
- * cancelled, in which case it will ignore any future inputs. It can also be 
awaited on.
- *
- * @deprecated Migrate to {@link BeamFnDataInboundObserver2}.
- */
-@Deprecated
-public interface InboundDataClient {
-  /**
-   * Block until the client has completed reading from the inbound stream.
-   *
-   * @throws InterruptedException if the client is interrupted before 
completing.
-   * @throws CancellationException if the client is cancelled before 
completing.
-   * @throws Exception if the client throws an exception while awaiting 
completion.
-   */
-  void awaitCompletion() throws InterruptedException, Exception;
-
-  /** Runs the runnable once the client has completed reading from the inbound 
stream. */
-  void runWhenComplete(Runnable completeRunnable);
-
-  /**
-   * Returns true if the client is done, either via completing successfully or 
by being cancelled.
-   */
-  boolean isDone();
-
-  /** Cancels the client, causing it to drop any future inbound data. */
-  void cancel();
-
-  /** Mark the client as completed. */
-  void complete();
-
-  /**
-   * Mark the client as completed with an exception. Calls to awaitCompletion 
will terminate by
-   * throwing the provided exception.
-   *
-   * @param t the throwable that caused this client to fail
-   */
-  void fail(Throwable t);
-}
diff --git 
a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2Test.java
 
b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexerTest.java
similarity index 95%
rename from 
sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2Test.java
rename to 
sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexerTest.java
index a30eac33fa8..538e607e8ab 100644
--- 
a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2Test.java
+++ 
b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexerTest.java
@@ -41,8 +41,8 @@ import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterable
 import org.junit.Rule;
 import org.junit.Test;
 
-/** Tests for {@link BeamFnDataGrpcMultiplexer2}. */
-public class BeamFnDataGrpcMultiplexer2Test {
+/** Tests for {@link BeamFnDataGrpcMultiplexer}. */
+public class BeamFnDataGrpcMultiplexerTest {
 
   private static final Endpoints.ApiServiceDescriptor DESCRIPTOR =
       Endpoints.ApiServiceDescriptor.newBuilder().setUrl("test").build();
@@ -83,8 +83,8 @@ public class BeamFnDataGrpcMultiplexer2Test {
   @Test
   public void testOutboundObserver() {
     Collection<BeamFnApi.Elements> values = new ArrayList<>();
-    BeamFnDataGrpcMultiplexer2 multiplexer =
-        new BeamFnDataGrpcMultiplexer2(
+    BeamFnDataGrpcMultiplexer multiplexer =
+        new BeamFnDataGrpcMultiplexer(
             DESCRIPTOR,
             OutboundObserverFactory.clientDirect(),
             inboundObserver -> TestStreams.withOnNext(values::add).build());
@@ -97,8 +97,8 @@ public class BeamFnDataGrpcMultiplexer2Test {
     Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>();
     Collection<BeamFnApi.Elements> dataInboundValues = new ArrayList<>();
     Collection<BeamFnApi.Elements> timerInboundValues = new ArrayList<>();
-    BeamFnDataGrpcMultiplexer2 multiplexer =
-        new BeamFnDataGrpcMultiplexer2(
+    BeamFnDataGrpcMultiplexer multiplexer =
+        new BeamFnDataGrpcMultiplexer(
             DESCRIPTOR,
             OutboundObserverFactory.clientDirect(),
             inboundObserver -> 
TestStreams.withOnNext(outboundValues::add).build());
@@ -176,8 +176,8 @@ public class BeamFnDataGrpcMultiplexer2Test {
     Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>();
     Collection<BeamFnApi.Elements> dataInboundValues = new ArrayList<>();
     Collection<BeamFnApi.Elements> timerInboundValues = new ArrayList<>();
-    BeamFnDataGrpcMultiplexer2 multiplexer =
-        new BeamFnDataGrpcMultiplexer2(
+    BeamFnDataGrpcMultiplexer multiplexer =
+        new BeamFnDataGrpcMultiplexer(
             DESCRIPTOR,
             OutboundObserverFactory.clientDirect(),
             inboundObserver -> 
TestStreams.withOnNext(outboundValues::add).build());
@@ -238,8 +238,8 @@ public class BeamFnDataGrpcMultiplexer2Test {
   public void testElementsWithOnlySingleInstructionIdUsingHotPath() throws 
Exception {
     Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>();
     Collection<BeamFnApi.Elements> dataInboundValues = new ArrayList<>();
-    BeamFnDataGrpcMultiplexer2 multiplexer =
-        new BeamFnDataGrpcMultiplexer2(
+    BeamFnDataGrpcMultiplexer multiplexer =
+        new BeamFnDataGrpcMultiplexer(
             DESCRIPTOR,
             OutboundObserverFactory.clientDirect(),
             inboundObserver -> 
TestStreams.withOnNext(outboundValues::add).build());
@@ -274,8 +274,8 @@ public class BeamFnDataGrpcMultiplexer2Test {
   public void testFailedProcessingCausesAdditionalInboundDataToBeIgnored() 
throws Exception {
     Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>();
     Collection<BeamFnApi.Elements> dataInboundValues = new ArrayList<>();
-    BeamFnDataGrpcMultiplexer2 multiplexer =
-        new BeamFnDataGrpcMultiplexer2(
+    BeamFnDataGrpcMultiplexer multiplexer =
+        new BeamFnDataGrpcMultiplexer(
             DESCRIPTOR,
             OutboundObserverFactory.clientDirect(),
             inboundObserver -> 
TestStreams.withOnNext(outboundValues::add).build());
@@ -326,8 +326,8 @@ public class BeamFnDataGrpcMultiplexer2Test {
     Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>();
     Collection<Throwable> errorWasReturned = new ArrayList<>();
     AtomicBoolean wasClosed = new AtomicBoolean();
-    final BeamFnDataGrpcMultiplexer2 multiplexer =
-        new BeamFnDataGrpcMultiplexer2(
+    final BeamFnDataGrpcMultiplexer multiplexer =
+        new BeamFnDataGrpcMultiplexer(
             DESCRIPTOR,
             OutboundObserverFactory.clientDirect(),
             inboundObserver ->
diff --git 
a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2Test.java
 
b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java
similarity index 93%
rename from 
sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2Test.java
rename to 
sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java
index c1e8e4293d4..137c2f890fb 100644
--- 
a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2Test.java
+++ 
b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java
@@ -42,9 +42,9 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-/** Tests for {@link BeamFnDataInboundObserver2}. */
+/** Tests for {@link BeamFnDataInboundObserver}. */
 @RunWith(JUnit4.class)
-public class BeamFnDataInboundObserver2Test {
+public class BeamFnDataInboundObserverTest {
   private static final Coder<WindowedValue<String>> CODER =
       WindowedValue.getFullCoder(StringUtf8Coder.of(), 
GlobalWindow.Coder.INSTANCE);
   private static final String TRANSFORM_ID = "transformId";
@@ -58,8 +58,8 @@ public class BeamFnDataInboundObserver2Test {
     Thread thread = Thread.currentThread();
     Collection<WindowedValue<String>> values = new ArrayList<>();
     Collection<WindowedValue<String>> timers = new ArrayList<>();
-    BeamFnDataInboundObserver2 observer =
-        BeamFnDataInboundObserver2.forConsumers(
+    BeamFnDataInboundObserver observer =
+        BeamFnDataInboundObserver.forConsumers(
             Arrays.asList(
                 DataEndpoint.create(
                     TRANSFORM_ID,
@@ -102,8 +102,8 @@ public class BeamFnDataInboundObserver2Test {
   @Test
   public void 
testAwaitCompletionFailureVisibleToAwaitCompletionCallerAndProducer()
       throws Exception {
-    BeamFnDataInboundObserver2 observer =
-        BeamFnDataInboundObserver2.forConsumers(
+    BeamFnDataInboundObserver observer =
+        BeamFnDataInboundObserver.forConsumers(
             Arrays.asList(
                 DataEndpoint.create(
                     TRANSFORM_ID,
@@ -137,8 +137,8 @@ public class BeamFnDataInboundObserver2Test {
 
   @Test
   public void testCloseVisibleToAwaitCompletionCallerAndProducer() throws 
Exception {
-    BeamFnDataInboundObserver2 observer =
-        BeamFnDataInboundObserver2.forConsumers(
+    BeamFnDataInboundObserver observer =
+        BeamFnDataInboundObserver.forConsumers(
             Arrays.asList(DataEndpoint.create(TRANSFORM_ID, CODER, (value) -> 
{})),
             Collections.emptyList());
 
@@ -147,7 +147,7 @@ public class BeamFnDataInboundObserver2Test {
             () -> {
               observer.accept(dataWith("ABC"));
               assertThrows(
-                  BeamFnDataInboundObserver2.CloseException.class,
+                  BeamFnDataInboundObserver.CloseException.class,
                   () -> {
                     while (true) {
                       // keep trying to send messages since the queue buffers 
messages and the
@@ -165,7 +165,7 @@ public class BeamFnDataInboundObserver2Test {
               return null;
             });
 
-    assertThrows(BeamFnDataInboundObserver2.CloseException.class, () -> 
observer.awaitCompletion());
+    assertThrows(BeamFnDataInboundObserver.CloseException.class, () -> 
observer.awaitCompletion());
     future.get();
     future2.get();
   }
@@ -173,8 +173,8 @@ public class BeamFnDataInboundObserver2Test {
   @Test
   public void 
testBadProducerDataFailureVisibleToAwaitCompletionCallerAndProducer()
       throws Exception {
-    BeamFnDataInboundObserver2 observer =
-        BeamFnDataInboundObserver2.forConsumers(
+    BeamFnDataInboundObserver observer =
+        BeamFnDataInboundObserver.forConsumers(
             Arrays.asList(DataEndpoint.create(TRANSFORM_ID, CODER, (value) -> 
{})),
             Collections.emptyList());
     Future<?> future =
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index bddfcc8c360..f2b95450391 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -77,7 +77,7 @@ import org.apache.beam.runners.core.construction.Timer;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.runners.core.metrics.MonitoringInfoConstants.Urns;
 import org.apache.beam.runners.core.metrics.ShortIdMap;
-import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2;
+import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver;
 import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
 import org.apache.beam.sdk.fn.data.DataEndpoint;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
@@ -538,7 +538,7 @@ public class ProcessBundleHandler {
                       + 
bundleProcessor.getInboundObserver().getUnfinishedEndpoints());
             }
           } else if 
(!bundleProcessor.getInboundEndpointApiServiceDescriptors().isEmpty()) {
-            BeamFnDataInboundObserver2 observer = 
bundleProcessor.getInboundObserver();
+            BeamFnDataInboundObserver observer = 
bundleProcessor.getInboundObserver();
             beamFnDataClient.registerReceiver(
                 request.getInstructionId(),
                 bundleProcessor.getInboundEndpointApiServiceDescriptors(),
@@ -1122,16 +1122,16 @@ public class ProcessBundleHandler {
       return this.bundleCache;
     }
 
-    private BeamFnDataInboundObserver2 inboundObserver2;
+    private BeamFnDataInboundObserver inboundObserver2;
 
-    BeamFnDataInboundObserver2 getInboundObserver() {
+    BeamFnDataInboundObserver getInboundObserver() {
       return inboundObserver2;
     }
 
     /** Finishes construction of the {@link BundleProcessor}. */
     void finish() {
       inboundObserver2 =
-          BeamFnDataInboundObserver2.forConsumers(getInboundDataEndpoints(), 
getTimerEndpoints());
+          BeamFnDataInboundObserver.forConsumers(getInboundDataEndpoints(), 
getTimerEndpoints());
       for (BeamFnDataOutboundAggregator aggregator : 
getOutboundAggregators().values()) {
         aggregator.start();
       }
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java
index b6a36f0deb1..a6057a220da 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java
@@ -26,7 +26,7 @@ import 
org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
 import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
-import org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer2;
+import org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer;
 import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
 import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
 import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
@@ -44,7 +44,7 @@ public class BeamFnDataGrpcClient implements BeamFnDataClient 
{
 
   private static final Logger LOG = 
LoggerFactory.getLogger(BeamFnDataGrpcClient.class);
 
-  private final ConcurrentMap<Endpoints.ApiServiceDescriptor, 
BeamFnDataGrpcMultiplexer2>
+  private final ConcurrentMap<Endpoints.ApiServiceDescriptor, 
BeamFnDataGrpcMultiplexer>
       multiplexerCache;
   private final Function<Endpoints.ApiServiceDescriptor, ManagedChannel> 
channelFactory;
   private final OutboundObserverFactory outboundObserverFactory;
@@ -67,7 +67,7 @@ public class BeamFnDataGrpcClient implements BeamFnDataClient 
{
       CloseableFnDataReceiver<Elements> receiver) {
     LOG.debug("Registering consumer for {}", instructionId);
     for (int i = 0, size = apiServiceDescriptors.size(); i < size; i++) {
-      BeamFnDataGrpcMultiplexer2 client = 
getClientFor(apiServiceDescriptors.get(i));
+      BeamFnDataGrpcMultiplexer client = 
getClientFor(apiServiceDescriptors.get(i));
       client.registerConsumer(instructionId, receiver);
     }
   }
@@ -77,7 +77,7 @@ public class BeamFnDataGrpcClient implements BeamFnDataClient 
{
       String instructionId, List<ApiServiceDescriptor> apiServiceDescriptors) {
     LOG.debug("Unregistering consumer for {}", instructionId);
     for (int i = 0, size = apiServiceDescriptors.size(); i < size; i++) {
-      BeamFnDataGrpcMultiplexer2 client = 
getClientFor(apiServiceDescriptors.get(i));
+      BeamFnDataGrpcMultiplexer client = 
getClientFor(apiServiceDescriptors.get(i));
       client.unregisterConsumer(instructionId);
     }
   }
@@ -94,12 +94,12 @@ public class BeamFnDataGrpcClient implements 
BeamFnDataClient {
         collectElementsIfNoFlushes);
   }
 
-  private BeamFnDataGrpcMultiplexer2 getClientFor(
+  private BeamFnDataGrpcMultiplexer getClientFor(
       Endpoints.ApiServiceDescriptor apiServiceDescriptor) {
     return multiplexerCache.computeIfAbsent(
         apiServiceDescriptor,
         (Endpoints.ApiServiceDescriptor descriptor) ->
-            new BeamFnDataGrpcMultiplexer2(
+            new BeamFnDataGrpcMultiplexer(
                 descriptor,
                 outboundObserverFactory,
                 
BeamFnDataGrpc.newStub(channelFactory.apply(apiServiceDescriptor))::data));
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
index f39dfa278d7..d670a5e110f 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
@@ -39,7 +39,7 @@ import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.LengthPrefixCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2;
+import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver;
 import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
 import org.apache.beam.sdk.fn.data.DataEndpoint;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
@@ -169,12 +169,12 @@ public class BeamFnDataGrpcClientTest {
               (Endpoints.ApiServiceDescriptor descriptor) -> channel,
               OutboundObserverFactory.trivial());
 
-      BeamFnDataInboundObserver2 observerA =
-          BeamFnDataInboundObserver2.forConsumers(
+      BeamFnDataInboundObserver observerA =
+          BeamFnDataInboundObserver.forConsumers(
               Arrays.asList(DataEndpoint.create(TRANSFORM_ID_A, CODER, 
inboundValuesA::add)),
               Collections.emptyList());
-      BeamFnDataInboundObserver2 observerB =
-          BeamFnDataInboundObserver2.forConsumers(
+      BeamFnDataInboundObserver observerB =
+          BeamFnDataInboundObserver.forConsumers(
               Arrays.asList(DataEndpoint.create(TRANSFORM_ID_B, CODER, 
inboundValuesB::add)),
               Collections.emptyList());
 
@@ -245,8 +245,8 @@ public class BeamFnDataGrpcClientTest {
               (Endpoints.ApiServiceDescriptor descriptor) -> channel,
               OutboundObserverFactory.trivial());
 
-      BeamFnDataInboundObserver2 observer =
-          BeamFnDataInboundObserver2.forConsumers(
+      BeamFnDataInboundObserver observer =
+          BeamFnDataInboundObserver.forConsumers(
               Arrays.asList(
                   DataEndpoint.create(
                       TRANSFORM_ID_A,

Reply via email to