This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 75fa82cc2e6 Allow MoreFutures.allAsList/allAsListWithExceptions to 
have the passed in list to be mutated (#23811)
75fa82cc2e6 is described below

commit 75fa82cc2e68047246e3b9f9843d4f2a6547455b
Author: Luke Cwik <lc...@google.com>
AuthorDate: Mon Oct 24 16:58:08 2022 -0700

    Allow MoreFutures.allAsList/allAsListWithExceptions to have the passed in 
list to be mutated (#23811)
    
    This resolves ConcurrentModificationExceptions seen within WriteFiles and 
other places that use MoreFutures.allAsList* methods.
    
    Fixes #23809
---
 .../java/org/apache/beam/sdk/util/MoreFutures.java | 25 ++++----
 .../org/apache/beam/sdk/util/MoreFuturesTest.java  | 74 ++++++++++++++++++++++
 2 files changed, 85 insertions(+), 14 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java
index 6f053752d3f..57074a7d6db 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.util;
 
 import com.google.auto.value.AutoValue;
 import edu.umd.cs.findbugs.annotations.SuppressWarnings;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -161,17 +162,13 @@ public class MoreFutures {
   /** Like {@link CompletableFuture#allOf} but returning the result of 
constituent futures. */
   public static <T> CompletionStage<List<T>> allAsList(
       Collection<? extends CompletionStage<? extends T>> futures) {
-
     // CompletableFuture.allOf completes exceptionally if any of the futures 
do.
     // We have to gather the results separately.
-    CompletionStage<Void> blockAndDiscard =
-        CompletableFuture.allOf(futuresToCompletableFutures(futures));
+    CompletableFuture<? extends T>[] f = futuresToCompletableFutures(futures);
+    CompletionStage<Void> blockAndDiscard = CompletableFuture.allOf(f);
 
     return blockAndDiscard.thenApply(
-        nothing ->
-            futures.stream()
-                .map(future -> future.toCompletableFuture().join())
-                .collect(Collectors.toList()));
+        nothing -> 
Arrays.stream(f).map(CompletableFuture::join).collect(Collectors.toList()));
   }
 
   /**
@@ -207,25 +204,25 @@ public class MoreFutures {
     }
   }
 
-  /** Like {@link #allAsList} but return a list . */
+  /**
+   * Like {@link #allAsList} but return a list of {@link ExceptionOrResult} of 
constituent futures.
+   */
   public static <T> CompletionStage<List<ExceptionOrResult<T>>> 
allAsListWithExceptions(
       Collection<? extends CompletionStage<? extends T>> futures) {
-
     // CompletableFuture.allOf completes exceptionally if any of the futures 
do.
     // We have to gather the results separately.
-    CompletionStage<Void> blockAndDiscard =
-        CompletableFuture.allOf(futuresToCompletableFutures(futures))
-            .whenComplete((ignoredValues, arbitraryException) -> {});
+    CompletableFuture<? extends T>[] f = futuresToCompletableFutures(futures);
+    CompletionStage<Void> blockAndDiscard = CompletableFuture.allOf(f);
 
     return blockAndDiscard.thenApply(
         nothing ->
-            futures.stream()
+            Arrays.stream(f)
                 .map(
                     future -> {
                       // The limited scope of the exceptions wrapped allows 
CancellationException
                       // to still be thrown.
                       try {
-                        return 
ExceptionOrResult.<T>result(future.toCompletableFuture().join());
+                        return ExceptionOrResult.<T>result(future.join());
                       } catch (CompletionException exc) {
                         return ExceptionOrResult.<T>exception(exc);
                       }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MoreFuturesTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MoreFuturesTest.java
index 4b6790d22c3..b8a10793501 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MoreFuturesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MoreFuturesTest.java
@@ -20,10 +20,16 @@ package org.apache.beam.sdk.util;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.isA;
+import static org.junit.Assert.assertEquals;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.sdk.util.MoreFutures.ExceptionOrResult;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -84,4 +90,72 @@ public class MoreFuturesTest {
     thrown.expectMessage(testMessage);
     MoreFutures.get(sideEffectFuture);
   }
+
+  @Test
+  public void testAllAsListRespectsOriginalList() throws Exception {
+    CountDownLatch waitTillThreadRunning = new CountDownLatch(1);
+    CountDownLatch waitTillClearHasHappened = new CountDownLatch(1);
+    List<CompletionStage<Void>> stages = new ArrayList<>();
+    stages.add(MoreFutures.runAsync(waitTillThreadRunning::countDown));
+    stages.add(MoreFutures.runAsync(waitTillClearHasHappened::await));
+
+    CompletionStage<List<Void>> results = MoreFutures.allAsList(stages);
+    waitTillThreadRunning.await();
+    stages.clear();
+    waitTillClearHasHappened.countDown();
+    assertEquals(MoreFutures.get(results), Arrays.asList(null, null));
+  }
+
+  @Test
+  public void testAllAsListNoExceptionDueToMutation() throws Exception {
+    // This loop runs many times trying to exercise a race condition that 
existed where mutation
+    // of the passed in completion stages lead to various exceptions (such as a
+    // ConcurrentModificationException). See 
https://github.com/apache/beam/issues/23809
+    for (int i = 0; i < 10000; ++i) {
+      CountDownLatch waitTillThreadRunning = new CountDownLatch(1);
+      List<CompletionStage<Void>> stages = new ArrayList<>();
+      stages.add(MoreFutures.runAsync(waitTillThreadRunning::countDown));
+
+      CompletionStage<List<Void>> results = MoreFutures.allAsList(stages);
+      waitTillThreadRunning.await();
+      stages.clear();
+      MoreFutures.get(results);
+    }
+  }
+
+  @Test
+  public void testAllAsListWithExceptionsRespectsOriginalList() throws 
Exception {
+    CountDownLatch waitTillThreadRunning = new CountDownLatch(1);
+    CountDownLatch waitTillClearHasHappened = new CountDownLatch(1);
+    List<CompletionStage<Void>> stages = new ArrayList<>();
+    stages.add(MoreFutures.runAsync(waitTillThreadRunning::countDown));
+    stages.add(MoreFutures.runAsync(waitTillClearHasHappened::await));
+
+    CompletionStage<List<ExceptionOrResult<Void>>> results =
+        MoreFutures.allAsListWithExceptions(stages);
+    waitTillThreadRunning.await();
+    stages.clear();
+    waitTillClearHasHappened.countDown();
+    assertEquals(
+        MoreFutures.get(results),
+        Arrays.asList(ExceptionOrResult.result(null), 
ExceptionOrResult.result(null)));
+  }
+
+  @Test
+  public void testAllAsListWithExceptionsNoExceptionDueToMutation() throws 
Exception {
+    // This loop runs many times trying to exercise a race condition that 
existed where mutation
+    // of the passed in completion stages lead to various exceptions (such as a
+    // ConcurrentModificationException). See 
https://github.com/apache/beam/issues/23809
+    for (int i = 0; i < 10000; ++i) {
+      CountDownLatch waitTillThreadRunning = new CountDownLatch(1);
+      List<CompletionStage<Void>> stages = new ArrayList<>();
+      stages.add(MoreFutures.runAsync(waitTillThreadRunning::countDown));
+
+      CompletionStage<List<ExceptionOrResult<Void>>> results =
+          MoreFutures.allAsListWithExceptions(stages);
+      waitTillThreadRunning.await();
+      stages.clear();
+      MoreFutures.get(results);
+    }
+  }
 }

Reply via email to