This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 7dac3f5ef40 [#21250] Trivial removal of loop over something that always has one element (#24014) 7dac3f5ef40 is described below commit 7dac3f5ef40b5d24b24d9ce5bec4717284260b85 Author: Luke Cwik <lc...@google.com> AuthorDate: Tue Nov 8 09:42:29 2022 -0800 [#21250] Trivial removal of loop over something that always has one element (#24014) Multiplexing was put into the PCollectionConsumerRegistry a long time ago and this seems to have been missed during that migration. --- .../apache/beam/fn/harness/FnApiDoFnRunner.java | 118 ++++++++++----------- 1 file changed, 54 insertions(+), 64 deletions(-) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index 1e60348eee0..2b449e0200b 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -119,10 +119,8 @@ import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString; import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.util.Durations; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableListMultimap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ListMultimap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Table; @@ -200,8 +198,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator Coder<Timer<Object>> coder = entry.getValue().getValue(); if (!localName.equals("") && localName.equals(runner.parDoPayload.getOnWindowExpirationTimerFamilySpec())) { - context.addIncomingTimerEndpoint( - localName, coder, timer -> runner.processOnWindowExpiration(timer)); + context.addIncomingTimerEndpoint(localName, coder, runner::processOnWindowExpiration); } else { context.addIncomingTimerEndpoint( localName, coder, timer -> runner.processTimer(localName, timeDomain, timer)); @@ -230,10 +227,10 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator private final Map<TupleTag<?>, Coder<?>> outputCoders; private final Map<String, KV<TimeDomain, Coder<Timer<Object>>>> timerFamilyInfos; private final ParDoPayload parDoPayload; - private final ListMultimap<String, FnDataReceiver<WindowedValue<?>>> localNameToConsumer; + private final Map<String, FnDataReceiver<WindowedValue<?>>> localNameToConsumer; private final BundleSplitListener splitListener; private final BundleFinalizer bundleFinalizer; - private final Collection<FnDataReceiver<WindowedValue<OutputT>>> mainOutputConsumers; + private final FnDataReceiver<WindowedValue<OutputT>> mainOutputConsumer; private final String mainInputId; private final FnApiStateAccessor<?> stateAccessor; @@ -478,10 +475,10 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator throw new IllegalArgumentException("Malformed ParDoPayload", exn); } - ImmutableListMultimap.Builder<String, FnDataReceiver<WindowedValue<?>>> - localNameToConsumerBuilder = ImmutableListMultimap.builder(); + ImmutableMap.Builder<String, FnDataReceiver<WindowedValue<?>>> localNameToConsumerBuilder = + ImmutableMap.builder(); for (Map.Entry<String, String> entry : pTransform.getOutputsMap().entrySet()) { - localNameToConsumerBuilder.putAll( + localNameToConsumerBuilder.put( entry.getKey(), getPCollectionConsumer.apply(entry.getValue())); } localNameToConsumer = localNameToConsumerBuilder.build(); @@ -491,9 +488,9 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator this.onTimerContext = new OnTimerContext(); this.onWindowExpirationContext = new OnWindowExpirationContext<>(); - this.mainOutputConsumers = - (Collection<FnDataReceiver<WindowedValue<OutputT>>>) - (Collection) localNameToConsumer.get(mainOutputTag.getId()); + this.mainOutputConsumer = + (FnDataReceiver<WindowedValue<OutputT>>) + (FnDataReceiver) localNameToConsumer.get(mainOutputTag.getId()); this.doFnSchemaInformation = ParDoTranslation.getSchemaInformation(parDoPayload); this.sideInputMapping = ParDoTranslation.getSideInputMapping(parDoPayload); this.doFnInvoker = DoFnInvokers.tryInvokeSetupFor(doFn, pipelineOptions); @@ -569,12 +566,10 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator || (doFnSignature.getSize() != null && doFnSignature.getSize().observesWindow()) || !sideInputMapping.isEmpty()) { // Only forward split/progress when the only consumer is splittable. - if (mainOutputConsumers.size() == 1 - && Iterables.getOnlyElement(mainOutputConsumers) instanceof HandlesSplits) { + if (mainOutputConsumer instanceof HandlesSplits) { mainInputConsumer = new SplittableFnDataReceiver() { - private final HandlesSplits splitDelegate = - (HandlesSplits) Iterables.getOnlyElement(mainOutputConsumers); + private final HandlesSplits splitDelegate = (HandlesSplits) mainOutputConsumer; @Override public void accept(WindowedValue input) throws Exception { @@ -609,12 +604,10 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN); } else { // Only forward split/progress when the only consumer is splittable. - if (mainOutputConsumers.size() == 1 - && Iterables.getOnlyElement(mainOutputConsumers) instanceof HandlesSplits) { + if (mainOutputConsumer instanceof HandlesSplits) { mainInputConsumer = new SplittableFnDataReceiver() { - private final HandlesSplits splitDelegate = - (HandlesSplits) Iterables.getOnlyElement(mainOutputConsumers); + private final HandlesSplits splitDelegate = (HandlesSplits) mainOutputConsumer; @Override public void accept(WindowedValue input) throws Exception { @@ -830,7 +823,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator try { currentRestriction = doFnInvoker.invokeGetInitialRestriction(processContext); outputTo( - mainOutputConsumers, + mainOutputConsumer, (WindowedValue) elem.withValue( KV.of( @@ -855,7 +848,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator currentWindow = windowIterator.next(); currentRestriction = doFnInvoker.invokeGetInitialRestriction(processContext); outputTo( - mainOutputConsumers, + mainOutputConsumer, (WindowedValue) WindowedValue.of( KV.of( @@ -1787,16 +1780,13 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator } /** Outputs the given element to the specified set of consumers wrapping any exceptions. */ - private <T> void outputTo( - Collection<FnDataReceiver<WindowedValue<T>>> consumers, WindowedValue<T> output) { + private <T> void outputTo(FnDataReceiver<WindowedValue<T>> consumer, WindowedValue<T> output) { if (currentWatermarkEstimator instanceof TimestampObservingWatermarkEstimator) { ((TimestampObservingWatermarkEstimator) currentWatermarkEstimator) .observeTimestamp(output.getTimestamp()); } try { - for (FnDataReceiver<WindowedValue<T>> consumer : consumers) { - consumer.accept(output); - } + consumer.accept(output); } catch (Throwable t) { throw UserCodeException.wrap(t); } @@ -2136,17 +2126,17 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator @Override public void output(OutputT output, Instant timestamp, BoundedWindow window) { outputTo( - mainOutputConsumers, WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING)); + mainOutputConsumer, WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING)); } @Override public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) { - Collection<FnDataReceiver<WindowedValue<T>>> consumers = - (Collection) localNameToConsumer.get(tag.getId()); - if (consumers == null) { + FnDataReceiver<WindowedValue<T>> consumer = + (FnDataReceiver) localNameToConsumer.get(tag.getId()); + if (consumer == null) { throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); } - outputTo(consumers, WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING)); + outputTo(consumer, WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING)); } } @@ -2248,20 +2238,20 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator public void outputWithTimestamp(OutputT output, Instant timestamp) { // TODO: Check that timestamp is valid once all runners can provide proper timestamps. outputTo( - mainOutputConsumers, + mainOutputConsumer, WindowedValue.of(output, timestamp, currentWindow, currentElement.getPane())); } @Override public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { // TODO: Check that timestamp is valid once all runners can provide proper timestamps. - Collection<FnDataReceiver<WindowedValue<T>>> consumers = - (Collection) localNameToConsumer.get(tag.getId()); - if (consumers == null) { + FnDataReceiver<WindowedValue<T>> consumer = + (FnDataReceiver) localNameToConsumer.get(tag.getId()); + if (consumer == null) { throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); } outputTo( - consumers, WindowedValue.of(output, timestamp, currentWindow, currentElement.getPane())); + consumer, WindowedValue.of(output, timestamp, currentWindow, currentElement.getPane())); } } @@ -2299,7 +2289,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator }); outputTo( - mainOutputConsumers, + mainOutputConsumer, (WindowedValue<OutputT>) WindowedValue.of( KV.of( @@ -2346,7 +2336,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator }); outputTo( - mainOutputConsumers, + mainOutputConsumer, (WindowedValue<OutputT>) WindowedValue.of( KV.of( @@ -2365,7 +2355,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator public void outputWithTimestamp(OutputT output, Instant timestamp) { checkTimestamp(timestamp); outputTo( - mainOutputConsumers, + mainOutputConsumer, WindowedValue.of( output, timestamp, currentElement.getWindows(), currentElement.getPane())); } @@ -2373,13 +2363,13 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator @Override public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { checkTimestamp(timestamp); - Collection<FnDataReceiver<WindowedValue<T>>> consumers = - (Collection) localNameToConsumer.get(tag.getId()); - if (consumers == null) { + FnDataReceiver<WindowedValue<T>> consumer = + (FnDataReceiver) localNameToConsumer.get(tag.getId()); + if (consumer == null) { throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); } outputTo( - consumers, + consumer, WindowedValue.of( output, timestamp, currentElement.getWindows(), currentElement.getPane())); } @@ -2591,7 +2581,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator @Override public void output(OutputT output) { outputTo( - mainOutputConsumers, + mainOutputConsumer, WindowedValue.of( output, currentTimer.getHoldTimestamp(), currentWindow, currentTimer.getPane())); } @@ -2600,19 +2590,19 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator public void outputWithTimestamp(OutputT output, Instant timestamp) { checkOnWindowExpirationTimestamp(timestamp); outputTo( - mainOutputConsumers, + mainOutputConsumer, WindowedValue.of(output, timestamp, currentWindow, currentTimer.getPane())); } @Override public <T> void output(TupleTag<T> tag, T output) { - Collection<FnDataReceiver<WindowedValue<T>>> consumers = - (Collection) localNameToConsumer.get(tag.getId()); - if (consumers == null) { + FnDataReceiver<WindowedValue<T>> consumer = + (FnDataReceiver) localNameToConsumer.get(tag.getId()); + if (consumer == null) { throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); } outputTo( - consumers, + consumer, WindowedValue.of( output, currentTimer.getHoldTimestamp(), currentWindow, currentTimer.getPane())); } @@ -2620,13 +2610,13 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator @Override public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { checkOnWindowExpirationTimestamp(timestamp); - Collection<FnDataReceiver<WindowedValue<T>>> consumers = - (Collection) localNameToConsumer.get(tag.getId()); - if (consumers == null) { + FnDataReceiver<WindowedValue<T>> consumer = + (FnDataReceiver) localNameToConsumer.get(tag.getId()); + if (consumer == null) { throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); } outputTo( - consumers, WindowedValue.of(output, timestamp, currentWindow, currentTimer.getPane())); + consumer, WindowedValue.of(output, timestamp, currentWindow, currentTimer.getPane())); } @SuppressWarnings( @@ -2745,7 +2735,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator public void output(OutputT output) { checkTimerTimestamp(currentTimer.getHoldTimestamp()); outputTo( - mainOutputConsumers, + mainOutputConsumer, WindowedValue.of( output, currentTimer.getHoldTimestamp(), currentWindow, currentTimer.getPane())); } @@ -2754,20 +2744,20 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator public void outputWithTimestamp(OutputT output, Instant timestamp) { checkTimerTimestamp(timestamp); outputTo( - mainOutputConsumers, + mainOutputConsumer, WindowedValue.of(output, timestamp, currentWindow, currentTimer.getPane())); } @Override public <T> void output(TupleTag<T> tag, T output) { checkTimerTimestamp(currentTimer.getHoldTimestamp()); - Collection<FnDataReceiver<WindowedValue<T>>> consumers = - (Collection) localNameToConsumer.get(tag.getId()); - if (consumers == null) { + FnDataReceiver<WindowedValue<T>> consumer = + (FnDataReceiver) localNameToConsumer.get(tag.getId()); + if (consumer == null) { throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); } outputTo( - consumers, + consumer, WindowedValue.of( output, currentTimer.getHoldTimestamp(), currentWindow, currentTimer.getPane())); } @@ -2775,13 +2765,13 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator @Override public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { checkTimerTimestamp(timestamp); - Collection<FnDataReceiver<WindowedValue<T>>> consumers = - (Collection) localNameToConsumer.get(tag.getId()); - if (consumers == null) { + FnDataReceiver<WindowedValue<T>> consumer = + (FnDataReceiver) localNameToConsumer.get(tag.getId()); + if (consumer == null) { throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); } outputTo( - consumers, WindowedValue.of(output, timestamp, currentWindow, currentTimer.getPane())); + consumer, WindowedValue.of(output, timestamp, currentWindow, currentTimer.getPane())); } @Override