This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 75fa82cc2e6 Allow MoreFutures.allAsList/allAsListWithExceptions to have the passed in list to be mutated (#23811) 75fa82cc2e6 is described below commit 75fa82cc2e68047246e3b9f9843d4f2a6547455b Author: Luke Cwik <lc...@google.com> AuthorDate: Mon Oct 24 16:58:08 2022 -0700 Allow MoreFutures.allAsList/allAsListWithExceptions to have the passed in list to be mutated (#23811) This resolves ConcurrentModificationExceptions seen within WriteFiles and other places that use MoreFutures.allAsList* methods. Fixes #23809 --- .../java/org/apache/beam/sdk/util/MoreFutures.java | 25 ++++---- .../org/apache/beam/sdk/util/MoreFuturesTest.java | 74 ++++++++++++++++++++++ 2 files changed, 85 insertions(+), 14 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java index 6f053752d3f..57074a7d6db 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.util; import com.google.auto.value.AutoValue; import edu.umd.cs.findbugs.annotations.SuppressWarnings; +import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -161,17 +162,13 @@ public class MoreFutures { /** Like {@link CompletableFuture#allOf} but returning the result of constituent futures. */ public static <T> CompletionStage<List<T>> allAsList( Collection<? extends CompletionStage<? extends T>> futures) { - // CompletableFuture.allOf completes exceptionally if any of the futures do. // We have to gather the results separately. - CompletionStage<Void> blockAndDiscard = - CompletableFuture.allOf(futuresToCompletableFutures(futures)); + CompletableFuture<? extends T>[] f = futuresToCompletableFutures(futures); + CompletionStage<Void> blockAndDiscard = CompletableFuture.allOf(f); return blockAndDiscard.thenApply( - nothing -> - futures.stream() - .map(future -> future.toCompletableFuture().join()) - .collect(Collectors.toList())); + nothing -> Arrays.stream(f).map(CompletableFuture::join).collect(Collectors.toList())); } /** @@ -207,25 +204,25 @@ public class MoreFutures { } } - /** Like {@link #allAsList} but return a list . */ + /** + * Like {@link #allAsList} but return a list of {@link ExceptionOrResult} of constituent futures. + */ public static <T> CompletionStage<List<ExceptionOrResult<T>>> allAsListWithExceptions( Collection<? extends CompletionStage<? extends T>> futures) { - // CompletableFuture.allOf completes exceptionally if any of the futures do. // We have to gather the results separately. - CompletionStage<Void> blockAndDiscard = - CompletableFuture.allOf(futuresToCompletableFutures(futures)) - .whenComplete((ignoredValues, arbitraryException) -> {}); + CompletableFuture<? extends T>[] f = futuresToCompletableFutures(futures); + CompletionStage<Void> blockAndDiscard = CompletableFuture.allOf(f); return blockAndDiscard.thenApply( nothing -> - futures.stream() + Arrays.stream(f) .map( future -> { // The limited scope of the exceptions wrapped allows CancellationException // to still be thrown. try { - return ExceptionOrResult.<T>result(future.toCompletableFuture().join()); + return ExceptionOrResult.<T>result(future.join()); } catch (CompletionException exc) { return ExceptionOrResult.<T>exception(exc); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MoreFuturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MoreFuturesTest.java index 4b6790d22c3..b8a10793501 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MoreFuturesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MoreFuturesTest.java @@ -20,10 +20,16 @@ package org.apache.beam.sdk.util; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.isA; +import static org.junit.Assert.assertEquals; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.CompletionStage; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.beam.sdk.util.MoreFutures.ExceptionOrResult; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -84,4 +90,72 @@ public class MoreFuturesTest { thrown.expectMessage(testMessage); MoreFutures.get(sideEffectFuture); } + + @Test + public void testAllAsListRespectsOriginalList() throws Exception { + CountDownLatch waitTillThreadRunning = new CountDownLatch(1); + CountDownLatch waitTillClearHasHappened = new CountDownLatch(1); + List<CompletionStage<Void>> stages = new ArrayList<>(); + stages.add(MoreFutures.runAsync(waitTillThreadRunning::countDown)); + stages.add(MoreFutures.runAsync(waitTillClearHasHappened::await)); + + CompletionStage<List<Void>> results = MoreFutures.allAsList(stages); + waitTillThreadRunning.await(); + stages.clear(); + waitTillClearHasHappened.countDown(); + assertEquals(MoreFutures.get(results), Arrays.asList(null, null)); + } + + @Test + public void testAllAsListNoExceptionDueToMutation() throws Exception { + // This loop runs many times trying to exercise a race condition that existed where mutation + // of the passed in completion stages lead to various exceptions (such as a + // ConcurrentModificationException). See https://github.com/apache/beam/issues/23809 + for (int i = 0; i < 10000; ++i) { + CountDownLatch waitTillThreadRunning = new CountDownLatch(1); + List<CompletionStage<Void>> stages = new ArrayList<>(); + stages.add(MoreFutures.runAsync(waitTillThreadRunning::countDown)); + + CompletionStage<List<Void>> results = MoreFutures.allAsList(stages); + waitTillThreadRunning.await(); + stages.clear(); + MoreFutures.get(results); + } + } + + @Test + public void testAllAsListWithExceptionsRespectsOriginalList() throws Exception { + CountDownLatch waitTillThreadRunning = new CountDownLatch(1); + CountDownLatch waitTillClearHasHappened = new CountDownLatch(1); + List<CompletionStage<Void>> stages = new ArrayList<>(); + stages.add(MoreFutures.runAsync(waitTillThreadRunning::countDown)); + stages.add(MoreFutures.runAsync(waitTillClearHasHappened::await)); + + CompletionStage<List<ExceptionOrResult<Void>>> results = + MoreFutures.allAsListWithExceptions(stages); + waitTillThreadRunning.await(); + stages.clear(); + waitTillClearHasHappened.countDown(); + assertEquals( + MoreFutures.get(results), + Arrays.asList(ExceptionOrResult.result(null), ExceptionOrResult.result(null))); + } + + @Test + public void testAllAsListWithExceptionsNoExceptionDueToMutation() throws Exception { + // This loop runs many times trying to exercise a race condition that existed where mutation + // of the passed in completion stages lead to various exceptions (such as a + // ConcurrentModificationException). See https://github.com/apache/beam/issues/23809 + for (int i = 0; i < 10000; ++i) { + CountDownLatch waitTillThreadRunning = new CountDownLatch(1); + List<CompletionStage<Void>> stages = new ArrayList<>(); + stages.add(MoreFutures.runAsync(waitTillThreadRunning::countDown)); + + CompletionStage<List<ExceptionOrResult<Void>>> results = + MoreFutures.allAsListWithExceptions(stages); + waitTillThreadRunning.await(); + stages.clear(); + MoreFutures.get(results); + } + } }