This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7fd696764ad6642ced76e96e4228547954a4eb60 Author: Matthias Pohl <matth...@ververica.com> AuthorDate: Fri Mar 11 18:18:52 2022 +0100 [hotfix][runtime][test] Removes obsolete CompletableFuture usage The test never utilizes the CompletableFuture because it always sets an already completed CompletableFuture. --- ...eeperLeaderRetrievalConnectionHandlingTest.java | 83 +++++++++------------- 1 file changed, 35 insertions(+), 48 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalConnectionHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalConnectionHandlingTest.java index f385df4..f705103 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalConnectionHandlingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalConnectionHandlingTest.java @@ -25,7 +25,6 @@ import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.runtime.util.TestingFatalErrorHandlerExtension; import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.runtime.zookeeper.ZooKeeperExtension; -import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLoggerExtension; import org.apache.flink.util.function.BiConsumerWithException; import org.apache.flink.util.function.FunctionWithException; @@ -43,10 +42,10 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; import java.time.Duration; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; @@ -92,23 +91,19 @@ class ZooKeeperLeaderRetrievalConnectionHandlingTest { fatalErrorHandlerResource.getTestingFatalErrorHandler()), (leaderRetrievalDriver, queueLeaderElectionListener) -> { // do the testing - final CompletableFuture<String> firstAddress = - queueLeaderElectionListener.next(Duration.ofMillis(50)); - assertThat(firstAddress) + assertThat(queueLeaderElectionListener.next(Duration.ofMillis(50))) .as("No results are expected, yet, since no leader was elected.") - .isNull(); + .isNotPresent(); getZooKeeper().restart(); // QueueLeaderElectionListener will be notified with an empty leader when ZK // connection is suspended - final CompletableFuture<String> secondAddress = - queueLeaderElectionListener.next(); + final String secondAddress = + queueLeaderElectionListener.next().getLeaderAddress(); assertThat(secondAddress) - .as("The next result must not be missing.") - .isNotNull() .as("The next result is expected to be null.") - .isCompletedWithValue(null); + .isNull(); }); } @@ -131,20 +126,18 @@ class ZooKeeperLeaderRetrievalConnectionHandlingTest { UUID.randomUUID()); // do the testing - CompletableFuture<String> firstAddress = queueLeaderElectionListener.next(); + String firstAddress = queueLeaderElectionListener.next().getLeaderAddress(); assertThat(firstAddress) .as( "The first result is expected to be the initially set leader address.") - .isCompletedWithValue(leaderAddress); + .isEqualTo(leaderAddress); getZooKeeper().restart(); - CompletableFuture<String> secondAddress = queueLeaderElectionListener.next(); + String secondAddress = queueLeaderElectionListener.next().getLeaderAddress(); assertThat(secondAddress) - .as("The next result must not be missing.") - .isNotNull() .as("The next result is expected to be null.") - .isCompletedWithValue(null); + .isNull(); }); } @@ -169,16 +162,17 @@ class ZooKeeperLeaderRetrievalConnectionHandlingTest { UUID.randomUUID()); // do the testing - CompletableFuture<String> firstAddress = queueLeaderElectionListener.next(); + String firstAddress = queueLeaderElectionListener.next().getLeaderAddress(); assertThat(firstAddress) .as( "The first result is expected to be the initially set leader address.") - .isCompletedWithValue(leaderAddress); + .isEqualTo(leaderAddress); getZooKeeper().close(); // make sure that no new leader information is published - assertThat(queueLeaderElectionListener.next(Duration.ofMillis(100L))).isNull(); + assertThat(queueLeaderElectionListener.next(Duration.ofMillis(100L))) + .isNotPresent(); }); } @@ -206,18 +200,14 @@ class ZooKeeperLeaderRetrievalConnectionHandlingTest { getZooKeeper().stop(); - final CompletableFuture<String> connectionSuspension = - queueLeaderElectionListener.next(); - - // wait until the ZK connection is suspended - connectionSuspension.join(); + queueLeaderElectionListener.next(); getZooKeeper().restart(); // new old leader information should be announced - final CompletableFuture<String> connectionReconnect = + final LeaderInformation connectionReconnect = queueLeaderElectionListener.next(); - assertThat(connectionReconnect).isCompletedWithValue(leaderAddress); + assertThat(connectionReconnect.getLeaderAddress()).isEqualTo(leaderAddress); }); } @@ -269,11 +259,7 @@ class ZooKeeperLeaderRetrievalConnectionHandlingTest { getZooKeeper().stop(); - final CompletableFuture<String> connectionSuspension = - queueLeaderElectionListener.next(); - - // wait until the ZK connection is suspended - connectionSuspension.join(); + queueLeaderElectionListener.next(); getZooKeeper().restart(); @@ -287,10 +273,12 @@ class ZooKeeperLeaderRetrievalConnectionHandlingTest { // check that we find the new leader information eventually CommonTestUtils.waitUntilCondition( () -> { - final String afterConnectionReconnect = - queueLeaderElectionListener.next().get(); - return afterConnectionReconnect != null - && afterConnectionReconnect.equals(newLeaderAddress); + final LeaderInformation afterConnectionReconnect = + queueLeaderElectionListener.next(); + return afterConnectionReconnect.getLeaderAddress() != null + && afterConnectionReconnect + .getLeaderAddress() + .equals(newLeaderAddress); }, Deadline.fromNow(Duration.ofSeconds(30L))); }); @@ -322,7 +310,7 @@ class ZooKeeperLeaderRetrievalConnectionHandlingTest { private static class QueueLeaderElectionListener implements LeaderRetrievalEventHandler { - private final BlockingQueue<CompletableFuture<String>> queue; + private final BlockingQueue<LeaderInformation> queue; public QueueLeaderElectionListener(int expectedCalls) { this.queue = new ArrayBlockingQueue<>(expectedCalls); @@ -330,26 +318,25 @@ class ZooKeeperLeaderRetrievalConnectionHandlingTest { @Override public void notifyLeaderAddress(LeaderInformation leaderInformation) { - final String leaderAddress = leaderInformation.getLeaderAddress(); try { - queue.put(CompletableFuture.completedFuture(leaderAddress)); + queue.put(leaderInformation); } catch (InterruptedException e) { throw new IllegalStateException(e); } } - public CompletableFuture<String> next() { - return Preconditions.checkNotNull(next(null)); + public LeaderInformation next() { + try { + return queue.take(); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } } - @Nullable - public CompletableFuture<String> next(@Nullable Duration timeout) { + public Optional<LeaderInformation> next(Duration timeout) { try { - if (timeout == null) { - return queue.take(); - } else { - return this.queue.poll(timeout.toMillis(), TimeUnit.MILLISECONDS); - } + return Optional.ofNullable( + this.queue.poll(timeout.toMillis(), TimeUnit.MILLISECONDS)); } catch (InterruptedException e) { throw new IllegalStateException(e); }