This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 6173abb [BEAM-13015] Create a multiplexer that sends Elements based upon instruction id allowing for an inbound observer responsible for the entire instruction id. (#15747) 6173abb is described below commit 6173abb2e241865d89dff9ae679f4d422be84ee9 Author: Lukasz Cwik <lukec...@gmail.com> AuthorDate: Thu Oct 21 18:40:10 2021 -0700 [BEAM-13015] Create a multiplexer that sends Elements based upon instruction id allowing for an inbound observer responsible for the entire instruction id. (#15747) * [BEAM-13015] Create a multiplexer that sends Elements based upon instruction id allowing for an inbound observer responsible for the entire instruction id. * fixup! checkstyle * Update sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2.java Co-authored-by: Brian Hulette <hulet...@gmail.com> * fixup! Address PR comments. Co-authored-by: Brian Hulette <hulet...@gmail.com> --- .../sdk/fn/data/BeamFnDataGrpcMultiplexer2.java | 274 ++++++++++++++++ .../sdk/fn/data/BeamFnDataInboundObserver2.java | 196 +++++++++++ .../org/apache/beam/sdk/fn/data/DataEndpoint.java | 35 ++ .../org/apache/beam/sdk/fn/data/TimerEndpoint.java | 37 +++ .../fn/data/BeamFnDataGrpcMultiplexer2Test.java | 363 +++++++++++++++++++++ .../fn/data/BeamFnDataInboundObserver2Test.java | 249 ++++++++++++++ 6 files changed, 1154 insertions(+) diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2.java new file mode 100644 index 0000000..c00db8c --- /dev/null +++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.fn.data; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; +import org.apache.beam.model.fnexecution.v1.BeamFnApi; +import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.sdk.fn.CancellableQueue; +import org.apache.beam.sdk.fn.stream.OutboundObserverFactory; +import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.Status; +import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.StreamObserver; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A gRPC multiplexer for a specific {@link Endpoints.ApiServiceDescriptor}. + * + * <p>Multiplexes data for inbound consumers based upon their {@code instructionId}. + * + * <p>Multiplexing inbound and outbound streams is as thread safe as the consumers of those streams. + * For inbound streams, this is as thread safe as the inbound observers. For outbound streams, this + * is as thread safe as the underlying stream observer. + * + * <p>TODO: Add support for multiplexing over multiple outbound observers by stickying the output + * location with a specific outbound observer. + */ +public class BeamFnDataGrpcMultiplexer2 implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataGrpcMultiplexer2.class); + private final Endpoints.@Nullable ApiServiceDescriptor apiServiceDescriptor; + private final StreamObserver<BeamFnApi.Elements> inboundObserver; + private final StreamObserver<BeamFnApi.Elements> outboundObserver; + private final ConcurrentMap< + /*instructionId=*/ String, CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>>> + receivers; + private final ConcurrentMap<String, Boolean> erroredInstructionIds; + private final List<CancellableQueue<BeamFnApi.Elements>> unusedQueues; + + public BeamFnDataGrpcMultiplexer2( + Endpoints.@Nullable ApiServiceDescriptor apiServiceDescriptor, + OutboundObserverFactory outboundObserverFactory, + OutboundObserverFactory.BasicFactory<BeamFnApi.Elements, BeamFnApi.Elements> + baseOutboundObserverFactory) { + this.apiServiceDescriptor = apiServiceDescriptor; + this.receivers = new ConcurrentHashMap<>(); + this.inboundObserver = new InboundObserver(); + this.outboundObserver = + outboundObserverFactory.outboundObserverFor(baseOutboundObserverFactory, inboundObserver); + this.erroredInstructionIds = new ConcurrentHashMap<>(); + this.unusedQueues = new ArrayList<>(100); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .omitNullValues() + .add("apiServiceDescriptor", apiServiceDescriptor) + .add("consumers", receivers) + .toString(); + } + + public StreamObserver<BeamFnApi.Elements> getInboundObserver() { + return inboundObserver; + } + + public StreamObserver<BeamFnApi.Elements> getOutboundObserver() { + return outboundObserver; + } + + private CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>> receiverFuture( + String instructionId) { + return receivers.computeIfAbsent(instructionId, (unused) -> new CompletableFuture<>()); + } + + /** + * Registers a consumer for the specified intruction id. + * + * <p>The {@link BeamFnDataGrpcMultiplexer2} partitions {@link BeamFnApi.Elements} with multiple + * instruction ids ensuring that the receiver will only see {@link BeamFnApi.Elements} with a + * single instruction id. + * + * <p>The caller must {@link #unregisterConsumer unregister the consumer} when they no longer wish + * to receive messages. + */ + public void registerConsumer( + String instructionId, CloseableFnDataReceiver<BeamFnApi.Elements> receiver) { + receiverFuture(instructionId).complete(receiver); + } + + /** Unregisters a consumer. */ + public void unregisterConsumer(String instructionId) { + receivers.remove(instructionId); + } + + @VisibleForTesting + boolean hasConsumer(String instructionId) { + return receivers.containsKey(instructionId); + } + + @Override + public void close() throws Exception { + Exception exception = null; + for (CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>> receiver : + ImmutableList.copyOf(receivers.values())) { + // Cancel any observer waiting for the client to complete. If the receiver has already been + // completed or cancelled, this call will be ignored. + receiver.cancel(true); + if (!receiver.isCompletedExceptionally()) { + try { + receiver.get().close(); + } catch (Exception e) { + if (exception == null) { + exception = e; + } else { + exception.addSuppressed(e); + } + } + } + } + // Cancel any outbound calls and complete any inbound calls, as this multiplexer is hanging up + outboundObserver.onError( + Status.CANCELLED.withDescription("Multiplexer hanging up").asException()); + inboundObserver.onCompleted(); + if (exception != null) { + throw exception; + } + } + + /** + * A multiplexing {@link StreamObserver} that selects the inbound {@link Consumer} to pass the + * elements to. + * + * <p>The inbound observer blocks until the {@link Consumer} is bound allowing for the sending + * harness to initiate transmitting data without needing for the receiving harness to signal that + * it is ready to consume that data. + */ + private final class InboundObserver implements StreamObserver<BeamFnApi.Elements> { + @Override + public void onNext(BeamFnApi.Elements value) { + // Have a fast path to handle the common case and provide a short circuit to exit if we detect + // multiple instruction ids. + SINGLE_INSTRUCTION_ID: + { + String instructionId = null; + for (BeamFnApi.Elements.Data data : value.getDataList()) { + if (instructionId == null) { + instructionId = data.getInstructionId(); + } else if (!instructionId.equals(data.getInstructionId())) { + // Multiple instruction ids detected, break out of this block + break SINGLE_INSTRUCTION_ID; + } + } + for (BeamFnApi.Elements.Timers timers : value.getTimersList()) { + if (instructionId == null) { + instructionId = timers.getInstructionId(); + } else if (!instructionId.equals(timers.getInstructionId())) { + // Multiple instruction ids detected, break out of this block + break SINGLE_INSTRUCTION_ID; + } + } + if (instructionId == null) { + return; + } + forwardToConsumerForInstructionId(instructionId, value); + return; + } + + // Handle the case if there are multiple instruction ids. + HashSet<String> instructionIds = new HashSet<>(); + for (BeamFnApi.Elements.Data data : value.getDataList()) { + instructionIds.add(data.getInstructionId()); + } + for (BeamFnApi.Elements.Timers timers : value.getTimersList()) { + instructionIds.add(timers.getInstructionId()); + } + for (String instructionId : instructionIds) { + BeamFnApi.Elements.Builder builder = BeamFnApi.Elements.newBuilder(); + for (BeamFnApi.Elements.Data data : value.getDataList()) { + if (instructionId.equals(data.getInstructionId())) { + builder.addData(data); + } + } + for (BeamFnApi.Elements.Timers timers : value.getTimersList()) { + if (instructionId.equals(timers.getInstructionId())) { + builder.addTimers(timers); + } + } + forwardToConsumerForInstructionId(instructionId, builder.build()); + } + } + + private void forwardToConsumerForInstructionId(String instructionId, BeamFnApi.Elements value) { + if (erroredInstructionIds.containsKey(instructionId)) { + LOG.debug("Ignoring inbound data for failed instruction {}", instructionId); + return; + } + CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>> consumerFuture = + receiverFuture(instructionId); + if (!consumerFuture.isDone()) { + LOG.debug( + "Received data for instruction {} without consumer ready. " + + "Waiting for consumer to be registered.", + instructionId); + } + CloseableFnDataReceiver<BeamFnApi.Elements> consumer; + try { + consumer = consumerFuture.get(); + + /* + * TODO: On failure we should fail any bundles that were impacted eagerly + * instead of relying on the Runner harness to do all the failure handling. + */ + } catch (ExecutionException | InterruptedException e) { + LOG.error( + "Client interrupted during handling of data for instruction {}", instructionId, e); + outboundObserver.onError(e); + return; + } catch (RuntimeException e) { + LOG.error("Client failed to handle data for instruction {}", instructionId, e); + outboundObserver.onError(e); + return; + } + try { + consumer.accept(value); + } catch (Exception e) { + erroredInstructionIds.put(instructionId, true); + } + } + + @Override + public void onError(Throwable t) { + LOG.error( + "Failed to handle for {}", + apiServiceDescriptor == null ? "unknown endpoint" : apiServiceDescriptor, + t); + outboundObserver.onCompleted(); + ; + } + + @Override + public void onCompleted() { + LOG.warn( + "Hanged up for {}.", + apiServiceDescriptor == null ? "unknown endpoint" : apiServiceDescriptor); + outboundObserver.onCompleted(); + ; + } + } +} diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2.java new file mode 100644 index 0000000..887f650 --- /dev/null +++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.fn.data; + +import java.io.InputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.model.fnexecution.v1.BeamFnApi; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.fn.CancellableQueue; + +/** + * Decodes {@link BeamFnApi.Elements} partitioning them using the provided {@link DataEndpoint}s and + * {@link TimerEndpoint}s. + * + * <p>Note that this receiver uses a queue to buffer and pass elements from one thread to be + * processed by the thread which invokes {@link #awaitCompletion}. + * + * <p>Closing the receiver will unblock any upstream producer and downstream consumer exceptionally. + */ +public class BeamFnDataInboundObserver2 implements CloseableFnDataReceiver<BeamFnApi.Elements> { + + /** + * Creates a receiver that is able to consume elements multiplexing on to the provided set of + * endpoints. + */ + public static BeamFnDataInboundObserver2 forConsumers( + List<DataEndpoint<?>> dataEndpoints, List<TimerEndpoint<?>> timerEndpoints) { + return new BeamFnDataInboundObserver2(dataEndpoints, timerEndpoints); + } + + /** Holds the status of whether the endpoint has been completed or not. */ + private static class EndpointStatus<T> { + final T endpoint; + boolean isDone; + + EndpointStatus(T endpoint) { + this.endpoint = endpoint; + } + } + + private final Map<String, EndpointStatus<DataEndpoint<?>>> transformIdToDataEndpoint; + private final Map<String, Map<String, EndpointStatus<TimerEndpoint<?>>>> + transformIdToTimerFamilyIdToTimerEndpoint; + private final CancellableQueue<BeamFnApi.Elements> queue; + private final int totalNumEndpoints; + private int numEndpointsThatAreIncomplete; + + private BeamFnDataInboundObserver2( + List<DataEndpoint<?>> dataEndpoints, List<TimerEndpoint<?>> timerEndpoints) { + this.transformIdToDataEndpoint = new HashMap<>(); + for (DataEndpoint<?> endpoint : dataEndpoints) { + transformIdToDataEndpoint.put(endpoint.getTransformId(), new EndpointStatus<>(endpoint)); + } + this.transformIdToTimerFamilyIdToTimerEndpoint = new HashMap<>(); + for (TimerEndpoint<?> endpoint : timerEndpoints) { + transformIdToTimerFamilyIdToTimerEndpoint + .computeIfAbsent(endpoint.getTransformId(), unused -> new HashMap<>()) + .put(endpoint.getTimerFamilyId(), new EndpointStatus<>(endpoint)); + } + this.queue = new CancellableQueue<>(100); + this.totalNumEndpoints = dataEndpoints.size() + timerEndpoints.size(); + this.numEndpointsThatAreIncomplete = totalNumEndpoints; + } + + @Override + public void accept(BeamFnApi.Elements elements) throws Exception { + queue.put(elements); + } + + @Override + public void flush() throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws Exception { + queue.cancel(new IllegalStateException("Inbound observer closed.")); + } + + /** + * Uses the callers thread to process all elements received until we receive the end of the stream + * from the upstream producer for all endpoints specified. + * + * <p>Erroneous elements passed from the producer will be visible to the caller of this method. + */ + public void awaitCompletion() throws Exception { + try { + while (true) { + BeamFnApi.Elements elements = queue.take(); + for (BeamFnApi.Elements.Data data : elements.getDataList()) { + EndpointStatus<DataEndpoint<?>> endpoint = + transformIdToDataEndpoint.get(data.getTransformId()); + if (endpoint == null) { + throw new IllegalStateException( + String.format( + "Unable to find inbound data receiver for instruction %s and transform %s.", + data.getInstructionId(), data.getTransformId())); + } else if (endpoint.isDone) { + throw new IllegalStateException( + String.format( + "Received data after inbound data receiver is done for instruction %s and transform %s.", + data.getInstructionId(), data.getTransformId())); + } + InputStream inputStream = data.getData().newInput(); + Coder<Object> coder = (Coder<Object>) endpoint.endpoint.getCoder(); + FnDataReceiver<Object> receiver = + (FnDataReceiver<Object>) endpoint.endpoint.getReceiver(); + while (inputStream.available() > 0) { + receiver.accept(coder.decode(inputStream)); + } + if (data.getIsLast()) { + endpoint.isDone = true; + numEndpointsThatAreIncomplete -= 1; + if (numEndpointsThatAreIncomplete == 0) { + return; + } + } + } + + for (BeamFnApi.Elements.Timers timers : elements.getTimersList()) { + Map<String, EndpointStatus<TimerEndpoint<?>>> timerFamilyIdToEndpoints = + transformIdToTimerFamilyIdToTimerEndpoint.get(timers.getTransformId()); + if (timerFamilyIdToEndpoints == null) { + throw new IllegalStateException( + String.format( + "Unable to find inbound timer receiver for instruction %s, transform %s, and timer family %s.", + timers.getInstructionId(), timers.getTransformId(), timers.getTimerFamilyId())); + } + EndpointStatus<TimerEndpoint<?>> endpoint = + timerFamilyIdToEndpoints.get(timers.getTimerFamilyId()); + if (endpoint == null) { + throw new IllegalStateException( + String.format( + "Unable to find inbound timer receiver for instruction %s, transform %s, and timer family %s.", + timers.getInstructionId(), timers.getTransformId(), timers.getTimerFamilyId())); + } else if (endpoint.isDone) { + throw new IllegalStateException( + String.format( + "Received timer after inbound timer receiver is done for instruction %s, transform %s, and timer family %s.", + timers.getInstructionId(), timers.getTransformId(), timers.getTimerFamilyId())); + } + InputStream inputStream = timers.getTimers().newInput(); + Coder<Object> coder = (Coder<Object>) endpoint.endpoint.getCoder(); + FnDataReceiver<Object> receiver = + (FnDataReceiver<Object>) endpoint.endpoint.getReceiver(); + while (inputStream.available() > 0) { + receiver.accept(coder.decode(inputStream)); + } + if (timers.getIsLast()) { + numEndpointsThatAreIncomplete -= 1; + if (numEndpointsThatAreIncomplete == 0) { + return; + } + } + } + } + } catch (Exception e) { + queue.cancel(e); + throw e; + } finally { + close(); + } + } + + /** Enables this receiver to be used again for another bundle. */ + public void reset() { + numEndpointsThatAreIncomplete = totalNumEndpoints; + for (EndpointStatus<?> value : transformIdToDataEndpoint.values()) { + value.isDone = false; + } + for (Map<String, EndpointStatus<TimerEndpoint<?>>> value : + transformIdToTimerFamilyIdToTimerEndpoint.values()) { + for (EndpointStatus<?> status : value.values()) { + status.isDone = false; + } + } + queue.reset(); + } +} diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/DataEndpoint.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/DataEndpoint.java new file mode 100644 index 0000000..e789cb1 --- /dev/null +++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/DataEndpoint.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.fn.data; + +import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.coders.Coder; + +@AutoValue +public abstract class DataEndpoint<T> { + public static <T> DataEndpoint<T> create( + String transformId, Coder<T> coder, FnDataReceiver<T> receiver) { + return new AutoValue_DataEndpoint<>(transformId, coder, receiver); + } + + public abstract String getTransformId(); + + public abstract Coder<T> getCoder(); + + public abstract FnDataReceiver<T> getReceiver(); +} diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/TimerEndpoint.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/TimerEndpoint.java new file mode 100644 index 0000000..f32503a --- /dev/null +++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/TimerEndpoint.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.fn.data; + +import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.coders.Coder; + +@AutoValue +public abstract class TimerEndpoint<T> { + public static <T> TimerEndpoint<T> create( + String transformId, String timerFamilyId, Coder<T> coder, FnDataReceiver<T> receiver) { + return new AutoValue_TimerEndpoint<>(transformId, timerFamilyId, coder, receiver); + } + + public abstract String getTransformId(); + + public abstract String getTimerFamilyId(); + + public abstract Coder<T> getCoder(); + + public abstract FnDataReceiver<T> getReceiver(); +} diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2Test.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2Test.java new file mode 100644 index 0000000..c517995 --- /dev/null +++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2Test.java @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.fn.data; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.core.StringContains.containsString; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.beam.model.fnexecution.v1.BeamFnApi; +import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.sdk.fn.stream.OutboundObserverFactory; +import org.apache.beam.sdk.fn.test.TestExecutors; +import org.apache.beam.sdk.fn.test.TestExecutors.TestExecutorService; +import org.apache.beam.sdk.fn.test.TestStreams; +import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; +import org.junit.Rule; +import org.junit.Test; + +/** Tests for {@link BeamFnDataGrpcMultiplexer2}. */ +public class BeamFnDataGrpcMultiplexer2Test { + + private static final Endpoints.ApiServiceDescriptor DESCRIPTOR = + Endpoints.ApiServiceDescriptor.newBuilder().setUrl("test").build(); + private static final String DATA_INSTRUCTION_ID = "dataInstructionId"; + private static final String TIMER_INSTRUCTION_ID = "timerInstructionId"; + private static final BeamFnApi.Elements ELEMENTS = + BeamFnApi.Elements.newBuilder() + .addData( + BeamFnApi.Elements.Data.newBuilder() + .setInstructionId(DATA_INSTRUCTION_ID) + .setTransformId("dataTransformId") + .setData(ByteString.copyFrom(new byte[1]))) + .addTimers( + BeamFnApi.Elements.Timers.newBuilder() + .setInstructionId(TIMER_INSTRUCTION_ID) + .setTransformId("timerTransformId") + .setTimerFamilyId("timerFamilyId") + .setTimers(ByteString.copyFrom(new byte[2]))) + .build(); + private static final BeamFnApi.Elements TERMINAL_ELEMENTS = + BeamFnApi.Elements.newBuilder() + .addData( + BeamFnApi.Elements.Data.newBuilder() + .setInstructionId(DATA_INSTRUCTION_ID) + .setTransformId("dataTransformId") + .setIsLast(true)) + .addTimers( + BeamFnApi.Elements.Timers.newBuilder() + .setInstructionId(TIMER_INSTRUCTION_ID) + .setTransformId("timerTransformId") + .setTimerFamilyId("timerFamilyId") + .setIsLast(true)) + .build(); + + @Rule + public final TestExecutorService executor = TestExecutors.from(Executors::newCachedThreadPool); + + @Test + public void testOutboundObserver() { + Collection<BeamFnApi.Elements> values = new ArrayList<>(); + BeamFnDataGrpcMultiplexer2 multiplexer = + new BeamFnDataGrpcMultiplexer2( + DESCRIPTOR, + OutboundObserverFactory.clientDirect(), + inboundObserver -> TestStreams.withOnNext(values::add).build()); + multiplexer.getOutboundObserver().onNext(ELEMENTS); + assertThat(values, contains(ELEMENTS)); + } + + @Test + public void testInboundObserverBlocksTillConsumerConnects() throws Exception { + Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>(); + Collection<BeamFnApi.Elements> dataInboundValues = new ArrayList<>(); + Collection<BeamFnApi.Elements> timerInboundValues = new ArrayList<>(); + BeamFnDataGrpcMultiplexer2 multiplexer = + new BeamFnDataGrpcMultiplexer2( + DESCRIPTOR, + OutboundObserverFactory.clientDirect(), + inboundObserver -> TestStreams.withOnNext(outboundValues::add).build()); + Future<?> registerFuture = + executor.submit( + () -> { + multiplexer.registerConsumer( + DATA_INSTRUCTION_ID, + new CloseableFnDataReceiver<BeamFnApi.Elements>() { + @Override + public void flush() throws Exception { + fail("Unexpected call"); + } + + @Override + public void close() throws Exception { + fail("Unexpected call"); + } + + @Override + public void accept(BeamFnApi.Elements input) throws Exception { + dataInboundValues.add(input); + } + }); + multiplexer.registerConsumer( + TIMER_INSTRUCTION_ID, + new CloseableFnDataReceiver<BeamFnApi.Elements>() { + @Override + public void flush() throws Exception { + fail("Unexpected call"); + } + + @Override + public void close() throws Exception { + fail("Unexpected call"); + } + + @Override + public void accept(BeamFnApi.Elements input) throws Exception { + timerInboundValues.add(input); + } + }); + }); + + multiplexer.getInboundObserver().onNext(ELEMENTS); + assertTrue(multiplexer.hasConsumer(DATA_INSTRUCTION_ID)); + assertTrue(multiplexer.hasConsumer(TIMER_INSTRUCTION_ID)); + + // Verify that on terminal elements we still wait to be unregistered. + multiplexer.getInboundObserver().onNext(TERMINAL_ELEMENTS); + assertTrue(multiplexer.hasConsumer(DATA_INSTRUCTION_ID)); + assertTrue(multiplexer.hasConsumer(TIMER_INSTRUCTION_ID)); + + registerFuture.get(); + multiplexer.unregisterConsumer(DATA_INSTRUCTION_ID); + multiplexer.unregisterConsumer(TIMER_INSTRUCTION_ID); + assertFalse(multiplexer.hasConsumer(DATA_INSTRUCTION_ID)); + assertFalse(multiplexer.hasConsumer(TIMER_INSTRUCTION_ID)); + + // Assert that normal and terminal Elements are passed to the consumer + assertThat( + dataInboundValues, + contains( + ELEMENTS.toBuilder().clearTimers().build(), + TERMINAL_ELEMENTS.toBuilder().clearTimers().build())); + assertThat( + timerInboundValues, + contains( + ELEMENTS.toBuilder().clearData().build(), + TERMINAL_ELEMENTS.toBuilder().clearData().build())); + } + + @Test + public void testElementsNeedsPartitioning() throws Exception { + Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>(); + Collection<BeamFnApi.Elements> dataInboundValues = new ArrayList<>(); + Collection<BeamFnApi.Elements> timerInboundValues = new ArrayList<>(); + BeamFnDataGrpcMultiplexer2 multiplexer = + new BeamFnDataGrpcMultiplexer2( + DESCRIPTOR, + OutboundObserverFactory.clientDirect(), + inboundObserver -> TestStreams.withOnNext(outboundValues::add).build()); + multiplexer.registerConsumer( + DATA_INSTRUCTION_ID, + new CloseableFnDataReceiver<BeamFnApi.Elements>() { + @Override + public void flush() throws Exception { + fail("Unexpected call"); + } + + @Override + public void close() throws Exception { + fail("Unexpected call"); + } + + @Override + public void accept(BeamFnApi.Elements input) throws Exception { + dataInboundValues.add(input); + } + }); + multiplexer.registerConsumer( + TIMER_INSTRUCTION_ID, + new CloseableFnDataReceiver<BeamFnApi.Elements>() { + @Override + public void flush() throws Exception { + fail("Unexpected call"); + } + + @Override + public void close() throws Exception { + fail("Unexpected call"); + } + + @Override + public void accept(BeamFnApi.Elements input) throws Exception { + timerInboundValues.add(input); + } + }); + + multiplexer.getInboundObserver().onNext(ELEMENTS); + multiplexer.getInboundObserver().onNext(TERMINAL_ELEMENTS); + + // Assert that elements are partitioned based upon the instruction id. + assertThat( + dataInboundValues, + contains( + ELEMENTS.toBuilder().clearTimers().build(), + TERMINAL_ELEMENTS.toBuilder().clearTimers().build())); + assertThat( + timerInboundValues, + contains( + ELEMENTS.toBuilder().clearData().build(), + TERMINAL_ELEMENTS.toBuilder().clearData().build())); + } + + @Test + public void testElementsWithOnlySingleInstructionIdUsingHotPath() throws Exception { + Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>(); + Collection<BeamFnApi.Elements> dataInboundValues = new ArrayList<>(); + BeamFnDataGrpcMultiplexer2 multiplexer = + new BeamFnDataGrpcMultiplexer2( + DESCRIPTOR, + OutboundObserverFactory.clientDirect(), + inboundObserver -> TestStreams.withOnNext(outboundValues::add).build()); + multiplexer.registerConsumer( + DATA_INSTRUCTION_ID, + new CloseableFnDataReceiver<BeamFnApi.Elements>() { + @Override + public void flush() throws Exception { + fail("Unexpected call"); + } + + @Override + public void close() throws Exception { + fail("Unexpected call"); + } + + @Override + public void accept(BeamFnApi.Elements input) throws Exception { + dataInboundValues.add(input); + } + }); + + BeamFnApi.Elements value = ELEMENTS.toBuilder().clearTimers().build(); + + multiplexer.getInboundObserver().onNext(value); + + // Assert that we passed the same instance through. + assertSame(Iterables.getOnlyElement(dataInboundValues), value); + } + + @Test + public void testFailedProcessingCausesAdditionalInboundDataToBeIgnored() throws Exception { + Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>(); + Collection<BeamFnApi.Elements> dataInboundValues = new ArrayList<>(); + BeamFnDataGrpcMultiplexer2 multiplexer = + new BeamFnDataGrpcMultiplexer2( + DESCRIPTOR, + OutboundObserverFactory.clientDirect(), + inboundObserver -> TestStreams.withOnNext(outboundValues::add).build()); + multiplexer.registerConsumer( + DATA_INSTRUCTION_ID, + new CloseableFnDataReceiver<BeamFnApi.Elements>() { + @Override + public void flush() throws Exception { + fail("Unexpected call"); + } + + @Override + public void close() throws Exception { + fail("Unexpected call"); + } + + @Override + public void accept(BeamFnApi.Elements input) throws Exception { + if (dataInboundValues.size() == 1) { + throw new Exception("processing failed"); + } + dataInboundValues.add(input); + } + }); + + BeamFnApi.Elements.Data.Builder data = + BeamFnApi.Elements.Data.newBuilder().setInstructionId(DATA_INSTRUCTION_ID); + + multiplexer + .getInboundObserver() + .onNext(BeamFnApi.Elements.newBuilder().addData(data.setTransformId("A").build()).build()); + multiplexer + .getInboundObserver() + .onNext(BeamFnApi.Elements.newBuilder().addData(data.setTransformId("B").build()).build()); + multiplexer + .getInboundObserver() + .onNext(BeamFnApi.Elements.newBuilder().addData(data.setTransformId("C").build()).build()); + + // Assert that we ignored the other two elements + assertThat( + dataInboundValues, + contains( + BeamFnApi.Elements.newBuilder().addData(data.setTransformId("A").build()).build())); + } + + @Test + public void testClose() throws Exception { + Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>(); + Collection<Throwable> errorWasReturned = new ArrayList<>(); + AtomicBoolean wasClosed = new AtomicBoolean(); + final BeamFnDataGrpcMultiplexer2 multiplexer = + new BeamFnDataGrpcMultiplexer2( + DESCRIPTOR, + OutboundObserverFactory.clientDirect(), + inboundObserver -> + TestStreams.withOnNext(outboundValues::add) + .withOnError(errorWasReturned::add) + .build()); + multiplexer.registerConsumer( + DATA_INSTRUCTION_ID, + new CloseableFnDataReceiver<BeamFnApi.Elements>() { + @Override + public void flush() throws Exception { + fail("Unexpected call"); + } + + @Override + public void close() throws Exception { + wasClosed.set(true); + } + + @Override + public void accept(BeamFnApi.Elements input) throws Exception { + fail("Unexpected call"); + } + }); + + multiplexer.close(); + + assertTrue(wasClosed.get()); + assertThat( + Iterables.getOnlyElement(errorWasReturned).getMessage(), + containsString("Multiplexer hanging up")); + } +} diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2Test.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2Test.java new file mode 100644 index 0000000..8b2b679 --- /dev/null +++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2Test.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.fn.data; + +import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThrows; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.beam.model.fnexecution.v1.BeamFnApi; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.fn.test.TestExecutors; +import org.apache.beam.sdk.fn.test.TestExecutors.TestExecutorService; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link BeamFnDataInboundObserver2}. */ +@RunWith(JUnit4.class) +public class BeamFnDataInboundObserver2Test { + private static final Coder<WindowedValue<String>> CODER = + WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE); + private static final String TRANSFORM_ID = "transformId"; + private static final String TIMER_FAMILY_ID = "timerFamilyId"; + + @Rule + public final TestExecutorService executor = TestExecutors.from(Executors::newCachedThreadPool); + + @Test + public void testConsumptionOfValuesHappensOnAwaitCompletionCallersThread() throws Exception { + Thread thread = Thread.currentThread(); + Collection<WindowedValue<String>> values = new ArrayList<>(); + Collection<WindowedValue<String>> timers = new ArrayList<>(); + BeamFnDataInboundObserver2 observer = + BeamFnDataInboundObserver2.forConsumers( + Arrays.asList( + DataEndpoint.create( + TRANSFORM_ID, + CODER, + (value) -> { + assertSame(thread, Thread.currentThread()); + values.add(value); + })), + Arrays.asList( + TimerEndpoint.create( + TRANSFORM_ID, + TIMER_FAMILY_ID, + CODER, + (value) -> { + assertSame(thread, Thread.currentThread()); + timers.add(value); + }))); + + Future<?> future = + executor.submit( + () -> { + // Test decoding multiple messages + observer.accept(dataWith("ABC", "DEF", "GHI")); + observer.accept(lastData()); + observer.accept(timerWith("UVW")); + observer.accept(timerWith("XYZ")); + observer.accept(lastTimer()); + return null; + }); + + observer.awaitCompletion(); + assertThat( + values, + contains( + valueInGlobalWindow("ABC"), valueInGlobalWindow("DEF"), valueInGlobalWindow("GHI"))); + assertThat(timers, contains(valueInGlobalWindow("UVW"), valueInGlobalWindow("XYZ"))); + future.get(); + } + + @Test + public void testAwaitCompletionFailureVisibleToAwaitCompletionCallerAndProducer() + throws Exception { + BeamFnDataInboundObserver2 observer = + BeamFnDataInboundObserver2.forConsumers( + Arrays.asList( + DataEndpoint.create( + TRANSFORM_ID, + CODER, + (value) -> { + throw new Exception("test consumer failed"); + })), + Collections.emptyList()); + + Future<?> future = + executor.submit( + () -> { + observer.accept(dataWith("ABC")); + assertThrows( + "test consumer failed", + Exception.class, + () -> { + while (true) { + // keep trying to send messages since the queue buffers messages and the + // consumer + // may have not yet noticed the bad state. + observer.accept(dataWith("ABC")); + } + }); + return null; + }); + + assertThrows("test consumer failed", Exception.class, () -> observer.awaitCompletion()); + future.get(); + } + + @Test + public void testCloseVisibleToAwaitCompletionCallerAndProducer() throws Exception { + BeamFnDataInboundObserver2 observer = + BeamFnDataInboundObserver2.forConsumers( + Arrays.asList(DataEndpoint.create(TRANSFORM_ID, CODER, (value) -> {})), + Collections.emptyList()); + + Future<?> future = + executor.submit( + () -> { + observer.accept(dataWith("ABC")); + assertThrows( + "Inbound observer closed", + IllegalStateException.class, + () -> { + while (true) { + // keep trying to send messages since the queue buffers messages and the + // consumer + // may have not yet noticed the bad state. + observer.accept(dataWith("ABC")); + } + }); + return null; + }); + Future<?> future2 = + executor.submit( + () -> { + observer.close(); + return null; + }); + + assertThrows( + "Inbound observer closed", IllegalStateException.class, () -> observer.awaitCompletion()); + future.get(); + future2.get(); + } + + @Test + public void testBadProducerDataFailureVisibleToAwaitCompletionCallerAndProducer() + throws Exception { + BeamFnDataInboundObserver2 observer = + BeamFnDataInboundObserver2.forConsumers( + Arrays.asList(DataEndpoint.create(TRANSFORM_ID, CODER, (value) -> {})), + Collections.emptyList()); + Future<?> future = + executor.submit( + () -> { + observer.accept(timerWith("DEF")); + assertThrows( + "Unable to find inbound timer receiver for instruction", + IllegalStateException.class, + () -> { + // keep trying to send messages since the queue buffers messages and the + // consumer + // may have not yet noticed the bad state. + while (true) { + observer.accept(dataWith("ABC")); + } + }); + return null; + }); + + assertThrows( + "Unable to find inbound timer receiver for instruction", + IllegalStateException.class, + () -> observer.awaitCompletion()); + future.get(); + } + + private BeamFnApi.Elements dataWith(String... values) throws Exception { + ByteString.Output output = ByteString.newOutput(); + for (String value : values) { + CODER.encode(valueInGlobalWindow(value), output); + } + return BeamFnApi.Elements.newBuilder() + .addData( + BeamFnApi.Elements.Data.newBuilder() + .setTransformId(TRANSFORM_ID) + .setData(output.toByteString())) + .build(); + } + + private BeamFnApi.Elements lastData() throws Exception { + return BeamFnApi.Elements.newBuilder() + .addData(BeamFnApi.Elements.Data.newBuilder().setTransformId(TRANSFORM_ID).setIsLast(true)) + .build(); + } + + private BeamFnApi.Elements timerWith(String... values) throws Exception { + ByteString.Output output = ByteString.newOutput(); + for (String value : values) { + CODER.encode(valueInGlobalWindow(value), output); + } + return BeamFnApi.Elements.newBuilder() + .addTimers( + BeamFnApi.Elements.Timers.newBuilder() + .setTransformId(TRANSFORM_ID) + .setTimerFamilyId(TIMER_FAMILY_ID) + .setTimers(output.toByteString())) + .build(); + } + + private BeamFnApi.Elements lastTimer() throws Exception { + return BeamFnApi.Elements.newBuilder() + .addTimers( + BeamFnApi.Elements.Timers.newBuilder() + .setTransformId(TRANSFORM_ID) + .setTimerFamilyId(TIMER_FAMILY_ID) + .setIsLast(true)) + .build(); + } +}