This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7fd696764ad6642ced76e96e4228547954a4eb60
Author: Matthias Pohl <matth...@ververica.com>
AuthorDate: Fri Mar 11 18:18:52 2022 +0100

    [hotfix][runtime][test] Removes obsolete CompletableFuture usage
    
    The test never utilizes the CompletableFuture
    because it always sets an already completed
    CompletableFuture.
---
 ...eeperLeaderRetrievalConnectionHandlingTest.java | 83 +++++++++-------------
 1 file changed, 35 insertions(+), 48 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalConnectionHandlingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalConnectionHandlingTest.java
index f385df4..f705103 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalConnectionHandlingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalConnectionHandlingTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandlerExtension;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
-import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLoggerExtension;
 import org.apache.flink.util.function.BiConsumerWithException;
 import org.apache.flink.util.function.FunctionWithException;
@@ -43,10 +42,10 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.time.Duration;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -92,23 +91,19 @@ class ZooKeeperLeaderRetrievalConnectionHandlingTest {
                                         
fatalErrorHandlerResource.getTestingFatalErrorHandler()),
                 (leaderRetrievalDriver, queueLeaderElectionListener) -> {
                     // do the testing
-                    final CompletableFuture<String> firstAddress =
-                            
queueLeaderElectionListener.next(Duration.ofMillis(50));
-                    assertThat(firstAddress)
+                    
assertThat(queueLeaderElectionListener.next(Duration.ofMillis(50)))
                             .as("No results are expected, yet, since no leader 
was elected.")
-                            .isNull();
+                            .isNotPresent();
 
                     getZooKeeper().restart();
 
                     // QueueLeaderElectionListener will be notified with an 
empty leader when ZK
                     // connection is suspended
-                    final CompletableFuture<String> secondAddress =
-                            queueLeaderElectionListener.next();
+                    final String secondAddress =
+                            
queueLeaderElectionListener.next().getLeaderAddress();
                     assertThat(secondAddress)
-                            .as("The next result must not be missing.")
-                            .isNotNull()
                             .as("The next result is expected to be null.")
-                            .isCompletedWithValue(null);
+                            .isNull();
                 });
     }
 
@@ -131,20 +126,18 @@ class ZooKeeperLeaderRetrievalConnectionHandlingTest {
                             UUID.randomUUID());
 
                     // do the testing
-                    CompletableFuture<String> firstAddress = 
queueLeaderElectionListener.next();
+                    String firstAddress = 
queueLeaderElectionListener.next().getLeaderAddress();
                     assertThat(firstAddress)
                             .as(
                                     "The first result is expected to be the 
initially set leader address.")
-                            .isCompletedWithValue(leaderAddress);
+                            .isEqualTo(leaderAddress);
 
                     getZooKeeper().restart();
 
-                    CompletableFuture<String> secondAddress = 
queueLeaderElectionListener.next();
+                    String secondAddress = 
queueLeaderElectionListener.next().getLeaderAddress();
                     assertThat(secondAddress)
-                            .as("The next result must not be missing.")
-                            .isNotNull()
                             .as("The next result is expected to be null.")
-                            .isCompletedWithValue(null);
+                            .isNull();
                 });
     }
 
@@ -169,16 +162,17 @@ class ZooKeeperLeaderRetrievalConnectionHandlingTest {
                             UUID.randomUUID());
 
                     // do the testing
-                    CompletableFuture<String> firstAddress = 
queueLeaderElectionListener.next();
+                    String firstAddress = 
queueLeaderElectionListener.next().getLeaderAddress();
                     assertThat(firstAddress)
                             .as(
                                     "The first result is expected to be the 
initially set leader address.")
-                            .isCompletedWithValue(leaderAddress);
+                            .isEqualTo(leaderAddress);
 
                     getZooKeeper().close();
 
                     // make sure that no new leader information is published
-                    
assertThat(queueLeaderElectionListener.next(Duration.ofMillis(100L))).isNull();
+                    
assertThat(queueLeaderElectionListener.next(Duration.ofMillis(100L)))
+                            .isNotPresent();
                 });
     }
 
@@ -206,18 +200,14 @@ class ZooKeeperLeaderRetrievalConnectionHandlingTest {
 
                     getZooKeeper().stop();
 
-                    final CompletableFuture<String> connectionSuspension =
-                            queueLeaderElectionListener.next();
-
-                    // wait until the ZK connection is suspended
-                    connectionSuspension.join();
+                    queueLeaderElectionListener.next();
 
                     getZooKeeper().restart();
 
                     // new old leader information should be announced
-                    final CompletableFuture<String> connectionReconnect =
+                    final LeaderInformation connectionReconnect =
                             queueLeaderElectionListener.next();
-                    
assertThat(connectionReconnect).isCompletedWithValue(leaderAddress);
+                    
assertThat(connectionReconnect.getLeaderAddress()).isEqualTo(leaderAddress);
                 });
     }
 
@@ -269,11 +259,7 @@ class ZooKeeperLeaderRetrievalConnectionHandlingTest {
 
                     getZooKeeper().stop();
 
-                    final CompletableFuture<String> connectionSuspension =
-                            queueLeaderElectionListener.next();
-
-                    // wait until the ZK connection is suspended
-                    connectionSuspension.join();
+                    queueLeaderElectionListener.next();
 
                     getZooKeeper().restart();
 
@@ -287,10 +273,12 @@ class ZooKeeperLeaderRetrievalConnectionHandlingTest {
                     // check that we find the new leader information eventually
                     CommonTestUtils.waitUntilCondition(
                             () -> {
-                                final String afterConnectionReconnect =
-                                        
queueLeaderElectionListener.next().get();
-                                return afterConnectionReconnect != null
-                                        && 
afterConnectionReconnect.equals(newLeaderAddress);
+                                final LeaderInformation 
afterConnectionReconnect =
+                                        queueLeaderElectionListener.next();
+                                return 
afterConnectionReconnect.getLeaderAddress() != null
+                                        && afterConnectionReconnect
+                                                .getLeaderAddress()
+                                                .equals(newLeaderAddress);
                             },
                             Deadline.fromNow(Duration.ofSeconds(30L)));
                 });
@@ -322,7 +310,7 @@ class ZooKeeperLeaderRetrievalConnectionHandlingTest {
 
     private static class QueueLeaderElectionListener implements 
LeaderRetrievalEventHandler {
 
-        private final BlockingQueue<CompletableFuture<String>> queue;
+        private final BlockingQueue<LeaderInformation> queue;
 
         public QueueLeaderElectionListener(int expectedCalls) {
             this.queue = new ArrayBlockingQueue<>(expectedCalls);
@@ -330,26 +318,25 @@ class ZooKeeperLeaderRetrievalConnectionHandlingTest {
 
         @Override
         public void notifyLeaderAddress(LeaderInformation leaderInformation) {
-            final String leaderAddress = leaderInformation.getLeaderAddress();
             try {
-                queue.put(CompletableFuture.completedFuture(leaderAddress));
+                queue.put(leaderInformation);
             } catch (InterruptedException e) {
                 throw new IllegalStateException(e);
             }
         }
 
-        public CompletableFuture<String> next() {
-            return Preconditions.checkNotNull(next(null));
+        public LeaderInformation next() {
+            try {
+                return queue.take();
+            } catch (InterruptedException e) {
+                throw new IllegalStateException(e);
+            }
         }
 
-        @Nullable
-        public CompletableFuture<String> next(@Nullable Duration timeout) {
+        public Optional<LeaderInformation> next(Duration timeout) {
             try {
-                if (timeout == null) {
-                    return queue.take();
-                } else {
-                    return this.queue.poll(timeout.toMillis(), 
TimeUnit.MILLISECONDS);
-                }
+                return Optional.ofNullable(
+                        this.queue.poll(timeout.toMillis(), 
TimeUnit.MILLISECONDS));
             } catch (InterruptedException e) {
                 throw new IllegalStateException(e);
             }

Reply via email to