This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 299a7b01308 Drop '2' from BeamFnDataGrpcMultiplexer2 and
BeamFnDataInboundObserver2 (#25144)
299a7b01308 is described below
commit 299a7b01308ca2e17618b5e5dace1a3bdb8df737
Author: Luke Cwik <[email protected]>
AuthorDate: Tue Jan 24 14:39:57 2023 -0800
Drop '2' from BeamFnDataGrpcMultiplexer2 and BeamFnDataInboundObserver2
(#25144)
This is a follow-up for a comment in
https://github.com/apache/beam/pull/25104
Also remove InboundDataClient which was missed in
https://github.com/apache/beam/pull/25104
---
.../fnexecution/control/SdkHarnessClient.java | 10 ++--
.../runners/fnexecution/data/GrpcDataService.java | 12 ++---
.../fnexecution/data/GrpcDataServiceTest.java | 10 ++--
...plexer2.java => BeamFnDataGrpcMultiplexer.java} | 8 +--
...server2.java => BeamFnDataInboundObserver.java} | 8 +--
.../apache/beam/sdk/fn/data/InboundDataClient.java | 60 ----------------------
...est.java => BeamFnDataGrpcMultiplexerTest.java} | 28 +++++-----
...est.java => BeamFnDataInboundObserverTest.java} | 24 ++++-----
.../fn/harness/control/ProcessBundleHandler.java | 10 ++--
.../beam/fn/harness/data/BeamFnDataGrpcClient.java | 12 ++---
.../fn/harness/data/BeamFnDataGrpcClientTest.java | 14 ++---
11 files changed, 68 insertions(+), 128 deletions(-)
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
index 609988b7d4e..1a2b1c03a0f 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
@@ -51,7 +51,7 @@ import
org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.IdGenerators;
-import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2;
+import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver;
import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.DataEndpoint;
@@ -268,7 +268,7 @@ public class SdkHarnessClient implements AutoCloseable {
CompletionStage<BeamFnApi.ProcessBundleResponse> specificResponse =
genericResponse.thenApply(InstructionResponse::getProcessBundle);
- Optional<BeamFnDataInboundObserver2> beamFnDataInboundObserver;
+ Optional<BeamFnDataInboundObserver> beamFnDataInboundObserver;
if (outputReceivers.isEmpty() && timerReceivers.isEmpty()) {
beamFnDataInboundObserver = Optional.empty();
} else {
@@ -291,7 +291,7 @@ public class SdkHarnessClient implements AutoCloseable {
timerReceiver.getValue().getReceiver()));
}
beamFnDataInboundObserver =
- Optional.of(BeamFnDataInboundObserver2.forConsumers(dataEndpoints,
timerEndpoints));
+ Optional.of(BeamFnDataInboundObserver.forConsumers(dataEndpoints,
timerEndpoints));
fnApiDataService.registerReceiver(bundleId,
beamFnDataInboundObserver.get());
}
@@ -339,7 +339,7 @@ public class SdkHarnessClient implements AutoCloseable {
private final CompletionStage<BeamFnApi.ProcessBundleResponse> response;
private final BeamFnDataOutboundAggregator beamFnDataOutboundAggregator;
private final Map<LogicalEndpoint, FnDataReceiver<?>> inputReceivers;
- private final Optional<BeamFnDataInboundObserver2>
beamFnDataInboundObserver;
+ private final Optional<BeamFnDataInboundObserver>
beamFnDataInboundObserver;
private final StateDelegator.Registration stateRegistration;
private final BundleProgressHandler progressHandler;
private final BundleSplitHandler splitHandler;
@@ -354,7 +354,7 @@ public class SdkHarnessClient implements AutoCloseable {
CompletionStage<ProcessBundleResponse> response,
BeamFnDataOutboundAggregator beamFnDataOutboundAggregator,
Map<LogicalEndpoint, FnDataReceiver<?>> inputReceivers,
- Optional<BeamFnDataInboundObserver2> beamFnDataInboundObserver,
+ Optional<BeamFnDataInboundObserver> beamFnDataInboundObserver,
StateDelegator.Registration stateRegistration,
BundleProgressHandler progressHandler,
BundleSplitHandler splitHandler,
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/GrpcDataService.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/GrpcDataService.java
index 883d72103a9..6e34e0eae67 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/GrpcDataService.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/GrpcDataService.java
@@ -27,7 +27,7 @@ import java.util.function.Supplier;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
-import org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer2;
+import org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer;
import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.server.FnService;
@@ -60,7 +60,7 @@ public class GrpcDataService extends
BeamFnDataGrpc.BeamFnDataImplBase
return new GrpcDataService(options, executor, outboundObserverFactory);
}
- private final SettableFuture<BeamFnDataGrpcMultiplexer2> connectedClient;
+ private final SettableFuture<BeamFnDataGrpcMultiplexer> connectedClient;
/**
* A collection of multiplexers which are not used to send data. A handle to
these multiplexers is
* maintained in order to perform an orderly shutdown.
@@ -68,7 +68,7 @@ public class GrpcDataService extends
BeamFnDataGrpc.BeamFnDataImplBase
* <p>TODO: (BEAM-3811) Replace with some cancellable collection, to ensure
that new clients of a
* closed {@link GrpcDataService} are closed with that {@link
GrpcDataService}.
*/
- private final Queue<BeamFnDataGrpcMultiplexer2> additionalMultiplexers;
+ private final Queue<BeamFnDataGrpcMultiplexer> additionalMultiplexers;
private final PipelineOptions options;
private final ExecutorService executor;
@@ -99,8 +99,8 @@ public class GrpcDataService extends
BeamFnDataGrpc.BeamFnDataImplBase
public StreamObserver<BeamFnApi.Elements> data(
final StreamObserver<BeamFnApi.Elements> outboundElementObserver) {
LOG.info("Beam Fn Data client connected.");
- BeamFnDataGrpcMultiplexer2 multiplexer =
- new BeamFnDataGrpcMultiplexer2(
+ BeamFnDataGrpcMultiplexer multiplexer =
+ new BeamFnDataGrpcMultiplexer(
null, outboundObserverFactory, inbound -> outboundElementObserver);
// First client that connects completes this future.
if (!connectedClient.set(multiplexer)) {
@@ -121,7 +121,7 @@ public class GrpcDataService extends
BeamFnDataGrpc.BeamFnDataImplBase
// Multiplexer, but if there isn't any multiplexer it prevents callers
blocking forever.
connectedClient.cancel(true);
// Close any other open connections
- for (BeamFnDataGrpcMultiplexer2 additional : additionalMultiplexers) {
+ for (BeamFnDataGrpcMultiplexer additional : additionalMultiplexers) {
try {
additional.close();
} catch (Exception ignored) {
diff --git
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/data/GrpcDataServiceTest.java
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/data/GrpcDataServiceTest.java
index 86bf247fa34..6766928b968 100644
---
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/data/GrpcDataServiceTest.java
+++
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/data/GrpcDataServiceTest.java
@@ -40,7 +40,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2;
+import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver;
import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
import org.apache.beam.sdk.fn.data.DataEndpoint;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
@@ -150,18 +150,18 @@ public class GrpcDataServiceTest {
}
List<Collection<WindowedValue<String>>> serverInboundValues = new
ArrayList<>();
- Collection<BeamFnDataInboundObserver2> inboundObservers = new
ArrayList<>();
+ Collection<BeamFnDataInboundObserver> inboundObservers = new
ArrayList<>();
for (int i = 0; i < 3; ++i) {
final Collection<WindowedValue<String>> serverInboundValue = new
ArrayList<>();
serverInboundValues.add(serverInboundValue);
- BeamFnDataInboundObserver2 inboundObserver =
- BeamFnDataInboundObserver2.forConsumers(
+ BeamFnDataInboundObserver inboundObserver =
+ BeamFnDataInboundObserver.forConsumers(
Arrays.asList(DataEndpoint.create(TRANSFORM_ID, CODER,
serverInboundValue::add)),
Collections.emptyList());
service.registerReceiver(Integer.toString(i), inboundObserver);
inboundObservers.add(inboundObserver);
}
- for (BeamFnDataInboundObserver2 inboundObserver : inboundObservers) {
+ for (BeamFnDataInboundObserver inboundObserver : inboundObservers) {
inboundObserver.awaitCompletion();
}
waitForInboundElements.countDown();
diff --git
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2.java
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer.java
similarity index 97%
rename from
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2.java
rename to
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer.java
index bc552dce08d..edca740dce9 100644
---
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2.java
+++
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer.java
@@ -47,8 +47,8 @@ import org.slf4j.LoggerFactory;
* <p>TODO: Add support for multiplexing over multiple outbound observers by
stickying the output
* location with a specific outbound observer.
*/
-public class BeamFnDataGrpcMultiplexer2 implements AutoCloseable {
- private static final Logger LOG =
LoggerFactory.getLogger(BeamFnDataGrpcMultiplexer2.class);
+public class BeamFnDataGrpcMultiplexer implements AutoCloseable {
+ private static final Logger LOG =
LoggerFactory.getLogger(BeamFnDataGrpcMultiplexer.class);
private final Endpoints.@Nullable ApiServiceDescriptor apiServiceDescriptor;
private final StreamObserver<BeamFnApi.Elements> inboundObserver;
private final StreamObserver<BeamFnApi.Elements> outboundObserver;
@@ -57,7 +57,7 @@ public class BeamFnDataGrpcMultiplexer2 implements
AutoCloseable {
receivers;
private final ConcurrentMap<String, Boolean> erroredInstructionIds;
- public BeamFnDataGrpcMultiplexer2(
+ public BeamFnDataGrpcMultiplexer(
Endpoints.@Nullable ApiServiceDescriptor apiServiceDescriptor,
OutboundObserverFactory outboundObserverFactory,
OutboundObserverFactory.BasicFactory<BeamFnApi.Elements,
BeamFnApi.Elements>
@@ -95,7 +95,7 @@ public class BeamFnDataGrpcMultiplexer2 implements
AutoCloseable {
/**
* Registers a consumer for the specified intruction id.
*
- * <p>The {@link BeamFnDataGrpcMultiplexer2} partitions {@link
BeamFnApi.Elements} with multiple
+ * <p>The {@link BeamFnDataGrpcMultiplexer} partitions {@link
BeamFnApi.Elements} with multiple
* instruction ids ensuring that the receiver will only see {@link
BeamFnApi.Elements} with a
* single instruction id.
*
diff --git
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2.java
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver.java
similarity index 97%
rename from
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2.java
rename to
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver.java
index 2008f59faa3..96c8dee4899 100644
---
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2.java
+++
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver.java
@@ -37,15 +37,15 @@ import org.apache.beam.sdk.fn.CancellableQueue;
*
* <p>Closing the receiver will unblock any upstream producer and downstream
consumer exceptionally.
*/
-public class BeamFnDataInboundObserver2 implements
CloseableFnDataReceiver<BeamFnApi.Elements> {
+public class BeamFnDataInboundObserver implements
CloseableFnDataReceiver<BeamFnApi.Elements> {
/**
* Creates a receiver that is able to consume elements multiplexing on to
the provided set of
* endpoints.
*/
- public static BeamFnDataInboundObserver2 forConsumers(
+ public static BeamFnDataInboundObserver forConsumers(
List<DataEndpoint<?>> dataEndpoints, List<TimerEndpoint<?>>
timerEndpoints) {
- return new BeamFnDataInboundObserver2(dataEndpoints, timerEndpoints);
+ return new BeamFnDataInboundObserver(dataEndpoints, timerEndpoints);
}
/** Holds the status of whether the endpoint has been completed or not. */
@@ -78,7 +78,7 @@ public class BeamFnDataInboundObserver2 implements
CloseableFnDataReceiver<BeamF
private final int totalNumEndpoints;
private int numEndpointsThatAreIncomplete;
- private BeamFnDataInboundObserver2(
+ private BeamFnDataInboundObserver(
List<DataEndpoint<?>> dataEndpoints, List<TimerEndpoint<?>>
timerEndpoints) {
this.transformIdToDataEndpoint = new HashMap<>();
for (DataEndpoint<?> endpoint : dataEndpoints) {
diff --git
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/InboundDataClient.java
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/InboundDataClient.java
deleted file mode 100644
index afab4ee8f70..00000000000
---
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/InboundDataClient.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.fn.data;
-
-import java.util.concurrent.CancellationException;
-
-/**
- * A client representing some stream of inbound data. An inbound data client
can be completed or
- * cancelled, in which case it will ignore any future inputs. It can also be
awaited on.
- *
- * @deprecated Migrate to {@link BeamFnDataInboundObserver2}.
- */
-@Deprecated
-public interface InboundDataClient {
- /**
- * Block until the client has completed reading from the inbound stream.
- *
- * @throws InterruptedException if the client is interrupted before
completing.
- * @throws CancellationException if the client is cancelled before
completing.
- * @throws Exception if the client throws an exception while awaiting
completion.
- */
- void awaitCompletion() throws InterruptedException, Exception;
-
- /** Runs the runnable once the client has completed reading from the inbound
stream. */
- void runWhenComplete(Runnable completeRunnable);
-
- /**
- * Returns true if the client is done, either via completing successfully or
by being cancelled.
- */
- boolean isDone();
-
- /** Cancels the client, causing it to drop any future inbound data. */
- void cancel();
-
- /** Mark the client as completed. */
- void complete();
-
- /**
- * Mark the client as completed with an exception. Calls to awaitCompletion
will terminate by
- * throwing the provided exception.
- *
- * @param t the throwable that caused this client to fail
- */
- void fail(Throwable t);
-}
diff --git
a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2Test.java
b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexerTest.java
similarity index 95%
rename from
sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2Test.java
rename to
sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexerTest.java
index a30eac33fa8..538e607e8ab 100644
---
a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2Test.java
+++
b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexerTest.java
@@ -41,8 +41,8 @@ import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterable
import org.junit.Rule;
import org.junit.Test;
-/** Tests for {@link BeamFnDataGrpcMultiplexer2}. */
-public class BeamFnDataGrpcMultiplexer2Test {
+/** Tests for {@link BeamFnDataGrpcMultiplexer}. */
+public class BeamFnDataGrpcMultiplexerTest {
private static final Endpoints.ApiServiceDescriptor DESCRIPTOR =
Endpoints.ApiServiceDescriptor.newBuilder().setUrl("test").build();
@@ -83,8 +83,8 @@ public class BeamFnDataGrpcMultiplexer2Test {
@Test
public void testOutboundObserver() {
Collection<BeamFnApi.Elements> values = new ArrayList<>();
- BeamFnDataGrpcMultiplexer2 multiplexer =
- new BeamFnDataGrpcMultiplexer2(
+ BeamFnDataGrpcMultiplexer multiplexer =
+ new BeamFnDataGrpcMultiplexer(
DESCRIPTOR,
OutboundObserverFactory.clientDirect(),
inboundObserver -> TestStreams.withOnNext(values::add).build());
@@ -97,8 +97,8 @@ public class BeamFnDataGrpcMultiplexer2Test {
Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>();
Collection<BeamFnApi.Elements> dataInboundValues = new ArrayList<>();
Collection<BeamFnApi.Elements> timerInboundValues = new ArrayList<>();
- BeamFnDataGrpcMultiplexer2 multiplexer =
- new BeamFnDataGrpcMultiplexer2(
+ BeamFnDataGrpcMultiplexer multiplexer =
+ new BeamFnDataGrpcMultiplexer(
DESCRIPTOR,
OutboundObserverFactory.clientDirect(),
inboundObserver ->
TestStreams.withOnNext(outboundValues::add).build());
@@ -176,8 +176,8 @@ public class BeamFnDataGrpcMultiplexer2Test {
Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>();
Collection<BeamFnApi.Elements> dataInboundValues = new ArrayList<>();
Collection<BeamFnApi.Elements> timerInboundValues = new ArrayList<>();
- BeamFnDataGrpcMultiplexer2 multiplexer =
- new BeamFnDataGrpcMultiplexer2(
+ BeamFnDataGrpcMultiplexer multiplexer =
+ new BeamFnDataGrpcMultiplexer(
DESCRIPTOR,
OutboundObserverFactory.clientDirect(),
inboundObserver ->
TestStreams.withOnNext(outboundValues::add).build());
@@ -238,8 +238,8 @@ public class BeamFnDataGrpcMultiplexer2Test {
public void testElementsWithOnlySingleInstructionIdUsingHotPath() throws
Exception {
Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>();
Collection<BeamFnApi.Elements> dataInboundValues = new ArrayList<>();
- BeamFnDataGrpcMultiplexer2 multiplexer =
- new BeamFnDataGrpcMultiplexer2(
+ BeamFnDataGrpcMultiplexer multiplexer =
+ new BeamFnDataGrpcMultiplexer(
DESCRIPTOR,
OutboundObserverFactory.clientDirect(),
inboundObserver ->
TestStreams.withOnNext(outboundValues::add).build());
@@ -274,8 +274,8 @@ public class BeamFnDataGrpcMultiplexer2Test {
public void testFailedProcessingCausesAdditionalInboundDataToBeIgnored()
throws Exception {
Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>();
Collection<BeamFnApi.Elements> dataInboundValues = new ArrayList<>();
- BeamFnDataGrpcMultiplexer2 multiplexer =
- new BeamFnDataGrpcMultiplexer2(
+ BeamFnDataGrpcMultiplexer multiplexer =
+ new BeamFnDataGrpcMultiplexer(
DESCRIPTOR,
OutboundObserverFactory.clientDirect(),
inboundObserver ->
TestStreams.withOnNext(outboundValues::add).build());
@@ -326,8 +326,8 @@ public class BeamFnDataGrpcMultiplexer2Test {
Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>();
Collection<Throwable> errorWasReturned = new ArrayList<>();
AtomicBoolean wasClosed = new AtomicBoolean();
- final BeamFnDataGrpcMultiplexer2 multiplexer =
- new BeamFnDataGrpcMultiplexer2(
+ final BeamFnDataGrpcMultiplexer multiplexer =
+ new BeamFnDataGrpcMultiplexer(
DESCRIPTOR,
OutboundObserverFactory.clientDirect(),
inboundObserver ->
diff --git
a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2Test.java
b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java
similarity index 93%
rename from
sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2Test.java
rename to
sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java
index c1e8e4293d4..137c2f890fb 100644
---
a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2Test.java
+++
b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java
@@ -42,9 +42,9 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-/** Tests for {@link BeamFnDataInboundObserver2}. */
+/** Tests for {@link BeamFnDataInboundObserver}. */
@RunWith(JUnit4.class)
-public class BeamFnDataInboundObserver2Test {
+public class BeamFnDataInboundObserverTest {
private static final Coder<WindowedValue<String>> CODER =
WindowedValue.getFullCoder(StringUtf8Coder.of(),
GlobalWindow.Coder.INSTANCE);
private static final String TRANSFORM_ID = "transformId";
@@ -58,8 +58,8 @@ public class BeamFnDataInboundObserver2Test {
Thread thread = Thread.currentThread();
Collection<WindowedValue<String>> values = new ArrayList<>();
Collection<WindowedValue<String>> timers = new ArrayList<>();
- BeamFnDataInboundObserver2 observer =
- BeamFnDataInboundObserver2.forConsumers(
+ BeamFnDataInboundObserver observer =
+ BeamFnDataInboundObserver.forConsumers(
Arrays.asList(
DataEndpoint.create(
TRANSFORM_ID,
@@ -102,8 +102,8 @@ public class BeamFnDataInboundObserver2Test {
@Test
public void
testAwaitCompletionFailureVisibleToAwaitCompletionCallerAndProducer()
throws Exception {
- BeamFnDataInboundObserver2 observer =
- BeamFnDataInboundObserver2.forConsumers(
+ BeamFnDataInboundObserver observer =
+ BeamFnDataInboundObserver.forConsumers(
Arrays.asList(
DataEndpoint.create(
TRANSFORM_ID,
@@ -137,8 +137,8 @@ public class BeamFnDataInboundObserver2Test {
@Test
public void testCloseVisibleToAwaitCompletionCallerAndProducer() throws
Exception {
- BeamFnDataInboundObserver2 observer =
- BeamFnDataInboundObserver2.forConsumers(
+ BeamFnDataInboundObserver observer =
+ BeamFnDataInboundObserver.forConsumers(
Arrays.asList(DataEndpoint.create(TRANSFORM_ID, CODER, (value) ->
{})),
Collections.emptyList());
@@ -147,7 +147,7 @@ public class BeamFnDataInboundObserver2Test {
() -> {
observer.accept(dataWith("ABC"));
assertThrows(
- BeamFnDataInboundObserver2.CloseException.class,
+ BeamFnDataInboundObserver.CloseException.class,
() -> {
while (true) {
// keep trying to send messages since the queue buffers
messages and the
@@ -165,7 +165,7 @@ public class BeamFnDataInboundObserver2Test {
return null;
});
- assertThrows(BeamFnDataInboundObserver2.CloseException.class, () ->
observer.awaitCompletion());
+ assertThrows(BeamFnDataInboundObserver.CloseException.class, () ->
observer.awaitCompletion());
future.get();
future2.get();
}
@@ -173,8 +173,8 @@ public class BeamFnDataInboundObserver2Test {
@Test
public void
testBadProducerDataFailureVisibleToAwaitCompletionCallerAndProducer()
throws Exception {
- BeamFnDataInboundObserver2 observer =
- BeamFnDataInboundObserver2.forConsumers(
+ BeamFnDataInboundObserver observer =
+ BeamFnDataInboundObserver.forConsumers(
Arrays.asList(DataEndpoint.create(TRANSFORM_ID, CODER, (value) ->
{})),
Collections.emptyList());
Future<?> future =
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index bddfcc8c360..f2b95450391 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -77,7 +77,7 @@ import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants.Urns;
import org.apache.beam.runners.core.metrics.ShortIdMap;
-import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2;
+import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver;
import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
import org.apache.beam.sdk.fn.data.DataEndpoint;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
@@ -538,7 +538,7 @@ public class ProcessBundleHandler {
+
bundleProcessor.getInboundObserver().getUnfinishedEndpoints());
}
} else if
(!bundleProcessor.getInboundEndpointApiServiceDescriptors().isEmpty()) {
- BeamFnDataInboundObserver2 observer =
bundleProcessor.getInboundObserver();
+ BeamFnDataInboundObserver observer =
bundleProcessor.getInboundObserver();
beamFnDataClient.registerReceiver(
request.getInstructionId(),
bundleProcessor.getInboundEndpointApiServiceDescriptors(),
@@ -1122,16 +1122,16 @@ public class ProcessBundleHandler {
return this.bundleCache;
}
- private BeamFnDataInboundObserver2 inboundObserver2;
+ private BeamFnDataInboundObserver inboundObserver2;
- BeamFnDataInboundObserver2 getInboundObserver() {
+ BeamFnDataInboundObserver getInboundObserver() {
return inboundObserver2;
}
/** Finishes construction of the {@link BundleProcessor}. */
void finish() {
inboundObserver2 =
- BeamFnDataInboundObserver2.forConsumers(getInboundDataEndpoints(),
getTimerEndpoints());
+ BeamFnDataInboundObserver.forConsumers(getInboundDataEndpoints(),
getTimerEndpoints());
for (BeamFnDataOutboundAggregator aggregator :
getOutboundAggregators().values()) {
aggregator.start();
}
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java
index b6a36f0deb1..a6057a220da 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java
@@ -26,7 +26,7 @@ import
org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
-import org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer2;
+import org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer;
import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
@@ -44,7 +44,7 @@ public class BeamFnDataGrpcClient implements BeamFnDataClient
{
private static final Logger LOG =
LoggerFactory.getLogger(BeamFnDataGrpcClient.class);
- private final ConcurrentMap<Endpoints.ApiServiceDescriptor,
BeamFnDataGrpcMultiplexer2>
+ private final ConcurrentMap<Endpoints.ApiServiceDescriptor,
BeamFnDataGrpcMultiplexer>
multiplexerCache;
private final Function<Endpoints.ApiServiceDescriptor, ManagedChannel>
channelFactory;
private final OutboundObserverFactory outboundObserverFactory;
@@ -67,7 +67,7 @@ public class BeamFnDataGrpcClient implements BeamFnDataClient
{
CloseableFnDataReceiver<Elements> receiver) {
LOG.debug("Registering consumer for {}", instructionId);
for (int i = 0, size = apiServiceDescriptors.size(); i < size; i++) {
- BeamFnDataGrpcMultiplexer2 client =
getClientFor(apiServiceDescriptors.get(i));
+ BeamFnDataGrpcMultiplexer client =
getClientFor(apiServiceDescriptors.get(i));
client.registerConsumer(instructionId, receiver);
}
}
@@ -77,7 +77,7 @@ public class BeamFnDataGrpcClient implements BeamFnDataClient
{
String instructionId, List<ApiServiceDescriptor> apiServiceDescriptors) {
LOG.debug("Unregistering consumer for {}", instructionId);
for (int i = 0, size = apiServiceDescriptors.size(); i < size; i++) {
- BeamFnDataGrpcMultiplexer2 client =
getClientFor(apiServiceDescriptors.get(i));
+ BeamFnDataGrpcMultiplexer client =
getClientFor(apiServiceDescriptors.get(i));
client.unregisterConsumer(instructionId);
}
}
@@ -94,12 +94,12 @@ public class BeamFnDataGrpcClient implements
BeamFnDataClient {
collectElementsIfNoFlushes);
}
- private BeamFnDataGrpcMultiplexer2 getClientFor(
+ private BeamFnDataGrpcMultiplexer getClientFor(
Endpoints.ApiServiceDescriptor apiServiceDescriptor) {
return multiplexerCache.computeIfAbsent(
apiServiceDescriptor,
(Endpoints.ApiServiceDescriptor descriptor) ->
- new BeamFnDataGrpcMultiplexer2(
+ new BeamFnDataGrpcMultiplexer(
descriptor,
outboundObserverFactory,
BeamFnDataGrpc.newStub(channelFactory.apply(apiServiceDescriptor))::data));
diff --git
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
index f39dfa278d7..d670a5e110f 100644
---
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
+++
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
@@ -39,7 +39,7 @@ import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2;
+import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver;
import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
import org.apache.beam.sdk.fn.data.DataEndpoint;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
@@ -169,12 +169,12 @@ public class BeamFnDataGrpcClientTest {
(Endpoints.ApiServiceDescriptor descriptor) -> channel,
OutboundObserverFactory.trivial());
- BeamFnDataInboundObserver2 observerA =
- BeamFnDataInboundObserver2.forConsumers(
+ BeamFnDataInboundObserver observerA =
+ BeamFnDataInboundObserver.forConsumers(
Arrays.asList(DataEndpoint.create(TRANSFORM_ID_A, CODER,
inboundValuesA::add)),
Collections.emptyList());
- BeamFnDataInboundObserver2 observerB =
- BeamFnDataInboundObserver2.forConsumers(
+ BeamFnDataInboundObserver observerB =
+ BeamFnDataInboundObserver.forConsumers(
Arrays.asList(DataEndpoint.create(TRANSFORM_ID_B, CODER,
inboundValuesB::add)),
Collections.emptyList());
@@ -245,8 +245,8 @@ public class BeamFnDataGrpcClientTest {
(Endpoints.ApiServiceDescriptor descriptor) -> channel,
OutboundObserverFactory.trivial());
- BeamFnDataInboundObserver2 observer =
- BeamFnDataInboundObserver2.forConsumers(
+ BeamFnDataInboundObserver observer =
+ BeamFnDataInboundObserver.forConsumers(
Arrays.asList(
DataEndpoint.create(
TRANSFORM_ID_A,