This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4910076bafc [FLINK-29250][rpc] Drop RcService#getTerminationFuture
4910076bafc is described below

commit 4910076bafc7d5d6091e3dae505d0eabbff0b72d
Author: Chesnay Schepler <[email protected]>
AuthorDate: Fri Sep 16 17:33:14 2022 +0200

    [FLINK-29250][rpc] Drop RcService#getTerminationFuture
---
 .../flink/runtime/rpc/akka/AkkaRpcService.java     |  5 --
 .../flink/runtime/rpc/akka/AkkaRpcServiceTest.java | 67 +++++++++++-----------
 .../org/apache/flink/runtime/rpc/RpcService.java   |  7 ---
 .../flink/runtime/rpc/TestingRpcService.java       |  5 --
 .../OperatorEventSendingCheckpointITCase.java      |  5 --
 5 files changed, 35 insertions(+), 54 deletions(-)

diff --git 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 40bb5bbbb09..c00db1dad3d 100644
--- 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -450,11 +450,6 @@ public class AkkaRpcService implements RpcService {
         return rpcEndpoint.getTerminationFuture();
     }
 
-    @Override
-    public CompletableFuture<Void> getTerminationFuture() {
-        return terminationFuture;
-    }
-
     @Override
     public ScheduledExecutor getScheduledExecutor() {
         return internalScheduledExecutor;
diff --git 
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
 
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index 1db75ee82bb..2998f1af5b3 100644
--- 
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ 
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -39,6 +39,7 @@ import javax.annotation.Nonnull;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -128,20 +129,6 @@ class AkkaRpcServiceTest {
                 .isEqualTo(AkkaUtils.getAddress(actorSystem).port().get());
     }
 
-    /** Tests that we can wait for the termination of the rpc service. */
-    @Test
-    void testTerminationFuture() throws Exception {
-        final AkkaRpcService rpcService = startAkkaRpcService();
-
-        CompletableFuture<Void> terminationFuture = 
rpcService.getTerminationFuture();
-
-        assertThat(terminationFuture).isNotDone();
-
-        rpcService.stopService();
-
-        terminationFuture.get();
-    }
-
     /**
      * Tests a simple scheduled runnable being executed by the RPC services 
scheduled executor
      * service.
@@ -279,16 +266,15 @@ class AkkaRpcServiceTest {
         try {
             final int numberActors = 5;
 
-            CompletableFuture<Void> terminationFuture = 
akkaRpcService.getTerminationFuture();
-
-            final Collection<CompletableFuture<Void>> onStopFutures =
+            final RpcServiceShutdownTestHelper rpcServiceShutdownTestHelper =
                     
startStopNCountingAsynchronousOnStopEndpoints(akkaRpcService, numberActors);
 
-            for (CompletableFuture<Void> onStopFuture : onStopFutures) {
+            for (CompletableFuture<Void> onStopFuture :
+                    rpcServiceShutdownTestHelper.getStopFutures()) {
                 onStopFuture.complete(null);
             }
 
-            terminationFuture.get();
+            rpcServiceShutdownTestHelper.waitForRpcServiceTermination();
             
assertThat(akkaRpcService.getActorSystem().whenTerminated().isCompleted()).isTrue();
         } finally {
             RpcUtils.terminateRpcService(akkaRpcService);
@@ -305,12 +291,11 @@ class AkkaRpcServiceTest {
 
         final int numberActors = 5;
 
-        CompletableFuture<Void> terminationFuture = 
akkaRpcService.getTerminationFuture();
-
-        final Collection<CompletableFuture<Void>> onStopFutures =
+        final RpcServiceShutdownTestHelper rpcServiceShutdownTestHelper =
                 startStopNCountingAsynchronousOnStopEndpoints(akkaRpcService, 
numberActors);
 
-        Iterator<CompletableFuture<Void>> iterator = onStopFutures.iterator();
+        final Iterator<CompletableFuture<Void>> iterator =
+                rpcServiceShutdownTestHelper.getStopFutures().iterator();
 
         for (int i = 0; i < numberActors - 1; i++) {
             iterator.next().complete(null);
@@ -318,11 +303,7 @@ class AkkaRpcServiceTest {
 
         iterator.next().completeExceptionally(new OnStopException("onStop 
exception occurred."));
 
-        for (CompletableFuture<Void> onStopFuture : onStopFutures) {
-            onStopFuture.complete(null);
-        }
-
-        assertThatThrownBy(() -> terminationFuture.get())
+        
assertThatThrownBy(rpcServiceShutdownTestHelper::waitForRpcServiceTermination)
                 
.satisfies(FlinkAssertions.anyCauseMatches(OnStopException.class));
 
         
assertThat(akkaRpcService.getActorSystem().whenTerminated().isCompleted()).isTrue();
@@ -349,8 +330,8 @@ class AkkaRpcServiceTest {
         }
     }
 
-    private Collection<CompletableFuture<Void>> 
startStopNCountingAsynchronousOnStopEndpoints(
-            AkkaRpcService akkaRpcService, int numberActors) throws Exception {
+    private static RpcServiceShutdownTestHelper 
startStopNCountingAsynchronousOnStopEndpoints(
+            AkkaRpcService akkaRpcService, int numberActors) throws 
InterruptedException {
         final Collection<CompletableFuture<Void>> onStopFutures = new 
ArrayList<>(numberActors);
 
         final CountDownLatch countDownLatch = new CountDownLatch(numberActors);
@@ -366,12 +347,34 @@ class AkkaRpcServiceTest {
 
         CompletableFuture<Void> terminationFuture = 
akkaRpcService.stopService();
 
+        countDownLatch.await();
+
         assertThat(terminationFuture).isNotDone();
         
assertThat(akkaRpcService.getActorSystem().whenTerminated().isCompleted()).isFalse();
 
-        countDownLatch.await();
+        return new RpcServiceShutdownTestHelper(
+                Collections.unmodifiableCollection(onStopFutures), 
terminationFuture);
+    }
+
+    private static class RpcServiceShutdownTestHelper {
+
+        private final Collection<CompletableFuture<Void>> stopFutures;
+        private final CompletableFuture<Void> terminationFuture;
 
-        return onStopFutures;
+        public RpcServiceShutdownTestHelper(
+                Collection<CompletableFuture<Void>> stopFutures,
+                CompletableFuture<Void> terminationFuture) {
+            this.stopFutures = stopFutures;
+            this.terminationFuture = terminationFuture;
+        }
+
+        public Collection<CompletableFuture<Void>> getStopFutures() {
+            return stopFutures;
+        }
+
+        public void waitForRpcServiceTermination() throws ExecutionException, 
InterruptedException {
+            terminationFuture.get();
+        }
     }
 
     @Nonnull
diff --git 
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
 
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index 4edf3f70e10..1eaa01c5906 100644
--- 
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ 
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -99,13 +99,6 @@ public interface RpcService {
      */
     CompletableFuture<Void> stopService();
 
-    /**
-     * Returns a future indicating when the RPC service has been shut down.
-     *
-     * @return Termination future
-     */
-    CompletableFuture<Void> getTerminationFuture();
-
     /**
      * Gets a scheduled executor from the RPC service. This executor can be 
used to schedule tasks
      * to be executed in the future.
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index b2c34c1b958..f73bdcc4674 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -199,11 +199,6 @@ public class TestingRpcService implements RpcService {
         backingRpcService.stopServer(selfGateway);
     }
 
-    @Override
-    public CompletableFuture<Void> getTerminationFuture() {
-        return backingRpcService.getTerminationFuture();
-    }
-
     @Override
     public ScheduledExecutor getScheduledExecutor() {
         return backingRpcService.getScheduledExecutor();
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
index 9757de12a2d..f54fd30ff5a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
@@ -504,11 +504,6 @@ public class OperatorEventSendingCheckpointITCase extends 
TestLogger {
             return rpcService.stopService();
         }
 
-        @Override
-        public CompletableFuture<Void> getTerminationFuture() {
-            return rpcService.getTerminationFuture();
-        }
-
         @Override
         public ScheduledExecutor getScheduledExecutor() {
             return rpcService.getScheduledExecutor();

Reply via email to