This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit a270d857870a1c8dae3eacddbab228bc3e25f260
Author: Kenneth Knowles <k...@apache.org>
AuthorDate: Mon Feb 12 12:21:18 2018 -0800

    [BEAM-3697] Fix MoreFutures errorprone
---
 .../java/org/apache/beam/sdk/util/MoreFutures.java | 35 +++++-----
 .../org/apache/beam/sdk/util/MoreFuturesTest.java  | 81 ++++++++++++++++++++++
 2 files changed, 100 insertions(+), 16 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java
index 7b49503..8275fad 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java
@@ -94,7 +94,8 @@ public class MoreFutures {
   public static <T> CompletionStage<T> supplyAsync(
       ThrowingSupplier<T> supplier, ExecutorService executorService) {
     CompletableFuture<T> result = new CompletableFuture<>();
-    CompletableFuture.runAsync(
+
+    CompletionStage<Void> wrapper = CompletableFuture.runAsync(
         () -> {
           try {
             result.complete(supplier.get());
@@ -106,7 +107,7 @@ public class MoreFutures {
           }
         },
         executorService);
-    return result;
+    return wrapper.thenCompose(nothing -> result);
   }
 
   /**
@@ -125,20 +126,22 @@ public class MoreFutures {
   public static CompletionStage<Void> runAsync(
       ThrowingRunnable runnable, ExecutorService executorService) {
     CompletableFuture<Void> result = new CompletableFuture<>();
-    CompletableFuture.runAsync(
-        () -> {
-          try {
-            runnable.run();
-            result.complete(null);
-          } catch (InterruptedException e) {
-            result.completeExceptionally(e);
-            Thread.currentThread().interrupt();
-          } catch (Throwable t) {
-            result.completeExceptionally(t);
-          }
-        },
-        executorService);
-    return result;
+
+    CompletionStage<Void> wrapper =
+        CompletableFuture.runAsync(
+            () -> {
+              try {
+                runnable.run();
+                result.complete(null);
+              } catch (InterruptedException e) {
+                result.completeExceptionally(e);
+                Thread.currentThread().interrupt();
+              } catch (Throwable t) {
+                result.completeExceptionally(t);
+              }
+            },
+            executorService);
+    return wrapper.thenCompose(nothing -> result);
   }
 
   /**
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MoreFuturesTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MoreFuturesTest.java
new file mode 100644
index 0000000..22ab4c0
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MoreFuturesTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.isA;
+import static org.junit.Assert.assertThat;
+
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link MoreFutures}. */
+@RunWith(JUnit4.class)
+public class MoreFuturesTest {
+
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void supplyAsyncSuccess() throws Exception {
+    CompletionStage<Integer> future = MoreFutures.supplyAsync(() -> 42);
+    assertThat(MoreFutures.get(future), equalTo(42));
+  }
+
+  @Test
+  public void supplyAsyncFailure() throws Exception {
+    final String testMessage = "this is just a test";
+    CompletionStage<Long> future = MoreFutures.supplyAsync(() -> {
+      throw new IllegalStateException(testMessage);
+    });
+
+    thrown.expect(ExecutionException.class);
+    thrown.expectCause(isA(IllegalStateException.class));
+    thrown.expectMessage(testMessage);
+    MoreFutures.get(future);
+  }
+
+  @Test
+  public void runAsyncSuccess() throws Exception {
+    AtomicInteger result = new AtomicInteger(0);
+    CompletionStage<Void> sideEffectFuture = MoreFutures.runAsync(() -> {
+      result.set(42);
+    });
+
+    MoreFutures.get(sideEffectFuture);
+    assertThat(result.get(), equalTo(42));
+  }
+
+  @Test
+  public void runAsyncFailure() throws Exception {
+    final String testMessage = "this is just a test";
+    CompletionStage<Void> sideEffectFuture = MoreFutures.runAsync(() -> {
+      throw new IllegalStateException(testMessage);
+    });
+
+    thrown.expect(ExecutionException.class);
+    thrown.expectCause(isA(IllegalStateException.class));
+    thrown.expectMessage(testMessage);
+    MoreFutures.get(sideEffectFuture);
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
ieme...@apache.org.

Reply via email to