This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7dac3f5ef40 [#21250] Trivial removal of loop over something that 
always has one element (#24014)
7dac3f5ef40 is described below

commit 7dac3f5ef40b5d24b24d9ce5bec4717284260b85
Author: Luke Cwik <lc...@google.com>
AuthorDate: Tue Nov 8 09:42:29 2022 -0800

    [#21250] Trivial removal of loop over something that always has one element 
(#24014)
    
    Multiplexing was put into the PCollectionConsumerRegistry a long time ago 
and this seems to have been missed during that migration.
---
 .../apache/beam/fn/harness/FnApiDoFnRunner.java    | 118 ++++++++++-----------
 1 file changed, 54 insertions(+), 64 deletions(-)

diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 1e60348eee0..2b449e0200b 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -119,10 +119,8 @@ import 
org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
 import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.util.Durations;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableListMultimap;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ListMultimap;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Table;
@@ -200,8 +198,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
         Coder<Timer<Object>> coder = entry.getValue().getValue();
         if (!localName.equals("")
             && 
localName.equals(runner.parDoPayload.getOnWindowExpirationTimerFamilySpec())) {
-          context.addIncomingTimerEndpoint(
-              localName, coder, timer -> 
runner.processOnWindowExpiration(timer));
+          context.addIncomingTimerEndpoint(localName, coder, 
runner::processOnWindowExpiration);
         } else {
           context.addIncomingTimerEndpoint(
               localName, coder, timer -> runner.processTimer(localName, 
timeDomain, timer));
@@ -230,10 +227,10 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
   private final Map<TupleTag<?>, Coder<?>> outputCoders;
   private final Map<String, KV<TimeDomain, Coder<Timer<Object>>>> 
timerFamilyInfos;
   private final ParDoPayload parDoPayload;
-  private final ListMultimap<String, FnDataReceiver<WindowedValue<?>>> 
localNameToConsumer;
+  private final Map<String, FnDataReceiver<WindowedValue<?>>> 
localNameToConsumer;
   private final BundleSplitListener splitListener;
   private final BundleFinalizer bundleFinalizer;
-  private final Collection<FnDataReceiver<WindowedValue<OutputT>>> 
mainOutputConsumers;
+  private final FnDataReceiver<WindowedValue<OutputT>> mainOutputConsumer;
 
   private final String mainInputId;
   private final FnApiStateAccessor<?> stateAccessor;
@@ -478,10 +475,10 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
       throw new IllegalArgumentException("Malformed ParDoPayload", exn);
     }
 
-    ImmutableListMultimap.Builder<String, FnDataReceiver<WindowedValue<?>>>
-        localNameToConsumerBuilder = ImmutableListMultimap.builder();
+    ImmutableMap.Builder<String, FnDataReceiver<WindowedValue<?>>> 
localNameToConsumerBuilder =
+        ImmutableMap.builder();
     for (Map.Entry<String, String> entry : 
pTransform.getOutputsMap().entrySet()) {
-      localNameToConsumerBuilder.putAll(
+      localNameToConsumerBuilder.put(
           entry.getKey(), getPCollectionConsumer.apply(entry.getValue()));
     }
     localNameToConsumer = localNameToConsumerBuilder.build();
@@ -491,9 +488,9 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
     this.onTimerContext = new OnTimerContext();
     this.onWindowExpirationContext = new OnWindowExpirationContext<>();
 
-    this.mainOutputConsumers =
-        (Collection<FnDataReceiver<WindowedValue<OutputT>>>)
-            (Collection) localNameToConsumer.get(mainOutputTag.getId());
+    this.mainOutputConsumer =
+        (FnDataReceiver<WindowedValue<OutputT>>)
+            (FnDataReceiver) localNameToConsumer.get(mainOutputTag.getId());
     this.doFnSchemaInformation = 
ParDoTranslation.getSchemaInformation(parDoPayload);
     this.sideInputMapping = ParDoTranslation.getSideInputMapping(parDoPayload);
     this.doFnInvoker = DoFnInvokers.tryInvokeSetupFor(doFn, pipelineOptions);
@@ -569,12 +566,10 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
             || (doFnSignature.getSize() != null && 
doFnSignature.getSize().observesWindow())
             || !sideInputMapping.isEmpty()) {
           // Only forward split/progress when the only consumer is splittable.
-          if (mainOutputConsumers.size() == 1
-              && Iterables.getOnlyElement(mainOutputConsumers) instanceof 
HandlesSplits) {
+          if (mainOutputConsumer instanceof HandlesSplits) {
             mainInputConsumer =
                 new SplittableFnDataReceiver() {
-                  private final HandlesSplits splitDelegate =
-                      (HandlesSplits) 
Iterables.getOnlyElement(mainOutputConsumers);
+                  private final HandlesSplits splitDelegate = (HandlesSplits) 
mainOutputConsumer;
 
                   @Override
                   public void accept(WindowedValue input) throws Exception {
@@ -609,12 +604,10 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
                   
PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN);
         } else {
           // Only forward split/progress when the only consumer is splittable.
-          if (mainOutputConsumers.size() == 1
-              && Iterables.getOnlyElement(mainOutputConsumers) instanceof 
HandlesSplits) {
+          if (mainOutputConsumer instanceof HandlesSplits) {
             mainInputConsumer =
                 new SplittableFnDataReceiver() {
-                  private final HandlesSplits splitDelegate =
-                      (HandlesSplits) 
Iterables.getOnlyElement(mainOutputConsumers);
+                  private final HandlesSplits splitDelegate = (HandlesSplits) 
mainOutputConsumer;
 
                   @Override
                   public void accept(WindowedValue input) throws Exception {
@@ -830,7 +823,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
     try {
       currentRestriction = 
doFnInvoker.invokeGetInitialRestriction(processContext);
       outputTo(
-          mainOutputConsumers,
+          mainOutputConsumer,
           (WindowedValue)
               elem.withValue(
                   KV.of(
@@ -855,7 +848,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
         currentWindow = windowIterator.next();
         currentRestriction = 
doFnInvoker.invokeGetInitialRestriction(processContext);
         outputTo(
-            mainOutputConsumers,
+            mainOutputConsumer,
             (WindowedValue)
                 WindowedValue.of(
                     KV.of(
@@ -1787,16 +1780,13 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
   }
 
   /** Outputs the given element to the specified set of consumers wrapping any 
exceptions. */
-  private <T> void outputTo(
-      Collection<FnDataReceiver<WindowedValue<T>>> consumers, WindowedValue<T> 
output) {
+  private <T> void outputTo(FnDataReceiver<WindowedValue<T>> consumer, 
WindowedValue<T> output) {
     if (currentWatermarkEstimator instanceof 
TimestampObservingWatermarkEstimator) {
       ((TimestampObservingWatermarkEstimator) currentWatermarkEstimator)
           .observeTimestamp(output.getTimestamp());
     }
     try {
-      for (FnDataReceiver<WindowedValue<T>> consumer : consumers) {
-        consumer.accept(output);
-      }
+      consumer.accept(output);
     } catch (Throwable t) {
       throw UserCodeException.wrap(t);
     }
@@ -2136,17 +2126,17 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
       @Override
       public void output(OutputT output, Instant timestamp, BoundedWindow 
window) {
         outputTo(
-            mainOutputConsumers, WindowedValue.of(output, timestamp, window, 
PaneInfo.NO_FIRING));
+            mainOutputConsumer, WindowedValue.of(output, timestamp, window, 
PaneInfo.NO_FIRING));
       }
 
       @Override
       public <T> void output(TupleTag<T> tag, T output, Instant timestamp, 
BoundedWindow window) {
-        Collection<FnDataReceiver<WindowedValue<T>>> consumers =
-            (Collection) localNameToConsumer.get(tag.getId());
-        if (consumers == null) {
+        FnDataReceiver<WindowedValue<T>> consumer =
+            (FnDataReceiver) localNameToConsumer.get(tag.getId());
+        if (consumer == null) {
           throw new IllegalArgumentException(String.format("Unknown output tag 
%s", tag));
         }
-        outputTo(consumers, WindowedValue.of(output, timestamp, window, 
PaneInfo.NO_FIRING));
+        outputTo(consumer, WindowedValue.of(output, timestamp, window, 
PaneInfo.NO_FIRING));
       }
     }
 
@@ -2248,20 +2238,20 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
     public void outputWithTimestamp(OutputT output, Instant timestamp) {
       // TODO: Check that timestamp is valid once all runners can provide 
proper timestamps.
       outputTo(
-          mainOutputConsumers,
+          mainOutputConsumer,
           WindowedValue.of(output, timestamp, currentWindow, 
currentElement.getPane()));
     }
 
     @Override
     public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
       // TODO: Check that timestamp is valid once all runners can provide 
proper timestamps.
-      Collection<FnDataReceiver<WindowedValue<T>>> consumers =
-          (Collection) localNameToConsumer.get(tag.getId());
-      if (consumers == null) {
+      FnDataReceiver<WindowedValue<T>> consumer =
+          (FnDataReceiver) localNameToConsumer.get(tag.getId());
+      if (consumer == null) {
         throw new IllegalArgumentException(String.format("Unknown output tag 
%s", tag));
       }
       outputTo(
-          consumers, WindowedValue.of(output, timestamp, currentWindow, 
currentElement.getPane()));
+          consumer, WindowedValue.of(output, timestamp, currentWindow, 
currentElement.getPane()));
     }
   }
 
@@ -2299,7 +2289,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
               });
 
       outputTo(
-          mainOutputConsumers,
+          mainOutputConsumer,
           (WindowedValue<OutputT>)
               WindowedValue.of(
                   KV.of(
@@ -2346,7 +2336,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
               });
 
       outputTo(
-          mainOutputConsumers,
+          mainOutputConsumer,
           (WindowedValue<OutputT>)
               WindowedValue.of(
                   KV.of(
@@ -2365,7 +2355,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
     public void outputWithTimestamp(OutputT output, Instant timestamp) {
       checkTimestamp(timestamp);
       outputTo(
-          mainOutputConsumers,
+          mainOutputConsumer,
           WindowedValue.of(
               output, timestamp, currentElement.getWindows(), 
currentElement.getPane()));
     }
@@ -2373,13 +2363,13 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
     @Override
     public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
       checkTimestamp(timestamp);
-      Collection<FnDataReceiver<WindowedValue<T>>> consumers =
-          (Collection) localNameToConsumer.get(tag.getId());
-      if (consumers == null) {
+      FnDataReceiver<WindowedValue<T>> consumer =
+          (FnDataReceiver) localNameToConsumer.get(tag.getId());
+      if (consumer == null) {
         throw new IllegalArgumentException(String.format("Unknown output tag 
%s", tag));
       }
       outputTo(
-          consumers,
+          consumer,
           WindowedValue.of(
               output, timestamp, currentElement.getWindows(), 
currentElement.getPane()));
     }
@@ -2591,7 +2581,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
       @Override
       public void output(OutputT output) {
         outputTo(
-            mainOutputConsumers,
+            mainOutputConsumer,
             WindowedValue.of(
                 output, currentTimer.getHoldTimestamp(), currentWindow, 
currentTimer.getPane()));
       }
@@ -2600,19 +2590,19 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
       public void outputWithTimestamp(OutputT output, Instant timestamp) {
         checkOnWindowExpirationTimestamp(timestamp);
         outputTo(
-            mainOutputConsumers,
+            mainOutputConsumer,
             WindowedValue.of(output, timestamp, currentWindow, 
currentTimer.getPane()));
       }
 
       @Override
       public <T> void output(TupleTag<T> tag, T output) {
-        Collection<FnDataReceiver<WindowedValue<T>>> consumers =
-            (Collection) localNameToConsumer.get(tag.getId());
-        if (consumers == null) {
+        FnDataReceiver<WindowedValue<T>> consumer =
+            (FnDataReceiver) localNameToConsumer.get(tag.getId());
+        if (consumer == null) {
           throw new IllegalArgumentException(String.format("Unknown output tag 
%s", tag));
         }
         outputTo(
-            consumers,
+            consumer,
             WindowedValue.of(
                 output, currentTimer.getHoldTimestamp(), currentWindow, 
currentTimer.getPane()));
       }
@@ -2620,13 +2610,13 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
       @Override
       public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
         checkOnWindowExpirationTimestamp(timestamp);
-        Collection<FnDataReceiver<WindowedValue<T>>> consumers =
-            (Collection) localNameToConsumer.get(tag.getId());
-        if (consumers == null) {
+        FnDataReceiver<WindowedValue<T>> consumer =
+            (FnDataReceiver) localNameToConsumer.get(tag.getId());
+        if (consumer == null) {
           throw new IllegalArgumentException(String.format("Unknown output tag 
%s", tag));
         }
         outputTo(
-            consumers, WindowedValue.of(output, timestamp, currentWindow, 
currentTimer.getPane()));
+            consumer, WindowedValue.of(output, timestamp, currentWindow, 
currentTimer.getPane()));
       }
 
       @SuppressWarnings(
@@ -2745,7 +2735,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
       public void output(OutputT output) {
         checkTimerTimestamp(currentTimer.getHoldTimestamp());
         outputTo(
-            mainOutputConsumers,
+            mainOutputConsumer,
             WindowedValue.of(
                 output, currentTimer.getHoldTimestamp(), currentWindow, 
currentTimer.getPane()));
       }
@@ -2754,20 +2744,20 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
       public void outputWithTimestamp(OutputT output, Instant timestamp) {
         checkTimerTimestamp(timestamp);
         outputTo(
-            mainOutputConsumers,
+            mainOutputConsumer,
             WindowedValue.of(output, timestamp, currentWindow, 
currentTimer.getPane()));
       }
 
       @Override
       public <T> void output(TupleTag<T> tag, T output) {
         checkTimerTimestamp(currentTimer.getHoldTimestamp());
-        Collection<FnDataReceiver<WindowedValue<T>>> consumers =
-            (Collection) localNameToConsumer.get(tag.getId());
-        if (consumers == null) {
+        FnDataReceiver<WindowedValue<T>> consumer =
+            (FnDataReceiver) localNameToConsumer.get(tag.getId());
+        if (consumer == null) {
           throw new IllegalArgumentException(String.format("Unknown output tag 
%s", tag));
         }
         outputTo(
-            consumers,
+            consumer,
             WindowedValue.of(
                 output, currentTimer.getHoldTimestamp(), currentWindow, 
currentTimer.getPane()));
       }
@@ -2775,13 +2765,13 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
       @Override
       public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
         checkTimerTimestamp(timestamp);
-        Collection<FnDataReceiver<WindowedValue<T>>> consumers =
-            (Collection) localNameToConsumer.get(tag.getId());
-        if (consumers == null) {
+        FnDataReceiver<WindowedValue<T>> consumer =
+            (FnDataReceiver) localNameToConsumer.get(tag.getId());
+        if (consumer == null) {
           throw new IllegalArgumentException(String.format("Unknown output tag 
%s", tag));
         }
         outputTo(
-            consumers, WindowedValue.of(output, timestamp, currentWindow, 
currentTimer.getPane()));
+            consumer, WindowedValue.of(output, timestamp, currentWindow, 
currentTimer.getPane()));
       }
 
       @Override

Reply via email to