This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ccf27ff845836d74bcb94a9b5baf4bc171ea2ff3 Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Thu Mar 7 17:28:33 2019 +0100 [FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall Fix the race condition between executing EmbeddedLeaderService#GrantLeadershipCall and a concurrent shutdown of the leader service by making GrantLeadershipCall not accessing mutable state outside of a lock. This closes #7935. --- .../nonha/embedded/EmbeddedLeaderService.java | 46 +++++---- .../nonha/embedded/EmbeddedHaServicesTest.java | 44 +-------- .../nonha/embedded/EmbeddedLeaderServiceTest.java | 103 +++++++++++++++++++++ .../nonha/embedded/TestingLeaderContender.java | 87 +++++++++++++++++ 4 files changed, 221 insertions(+), 59 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java index 97b80dc..e3e98e1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java @@ -102,6 +102,13 @@ public class EmbeddedLeaderService { } } + @VisibleForTesting + public boolean isShutdown() { + synchronized (lock) { + return shutdown; + } + } + private void fatalError(Throwable error) { LOG.error("Embedded leader election service encountered a fatal error. Shutting down service.", error); @@ -171,7 +178,11 @@ public class EmbeddedLeaderService { service.contender = contender; service.running = true; - updateLeader(); + updateLeader().whenComplete((aVoid, throwable) -> { + if (throwable != null) { + fatalError(throwable); + } + }); } catch (Throwable t) { fatalError(t); @@ -210,7 +221,11 @@ public class EmbeddedLeaderService { currentLeaderSessionId = null; } - updateLeader(); + updateLeader().whenComplete((aVoid, throwable) -> { + if (throwable != null) { + fatalError(throwable); + } + }); } catch (Throwable t) { fatalError(t); @@ -280,11 +295,12 @@ public class EmbeddedLeaderService { currentLeaderSessionId = leaderSessionId; currentLeaderProposed = leaderService; + currentLeaderProposed.isLeader = true; LOG.info("Proposing leadership to contender {} @ {}", leaderService.contender, leaderService.contender.getAddress()); - return execute(new GrantLeadershipCall(leaderService, leaderSessionId, LOG)); + return execute(new GrantLeadershipCall(leaderService.contender, leaderSessionId, LOG)); } } else { return CompletableFuture.completedFuture(null); @@ -373,7 +389,8 @@ public class EmbeddedLeaderService { } LOG.info("Revoking leadership of {}.", leaderService.contender); - CompletableFuture<Void> revokeLeadershipCallFuture = execute(new RevokeLeadershipCall(leaderService)); + leaderService.isLeader = false; + CompletableFuture<Void> revokeLeadershipCallFuture = execute(new RevokeLeadershipCall(leaderService.contender)); CompletableFuture<Void> notifyAllListenersFuture = notifyAllListeners(null, null); @@ -506,33 +523,28 @@ public class EmbeddedLeaderService { private static class GrantLeadershipCall implements Runnable { - private final EmbeddedLeaderElectionService leaderElectionService; + private final LeaderContender contender; private final UUID leaderSessionId; private final Logger logger; GrantLeadershipCall( - EmbeddedLeaderElectionService leaderElectionService, + LeaderContender contender, UUID leaderSessionId, Logger logger) { - this.leaderElectionService = checkNotNull(leaderElectionService); + this.contender = checkNotNull(contender); this.leaderSessionId = checkNotNull(leaderSessionId); this.logger = checkNotNull(logger); } @Override public void run() { - leaderElectionService.isLeader = true; - - final LeaderContender contender = leaderElectionService.contender; - try { contender.grantLeadership(leaderSessionId); } catch (Throwable t) { logger.warn("Error granting leadership to contender", t); contender.handleError(t instanceof Exception ? (Exception) t : new Exception(t)); - leaderElectionService.isLeader = false; } } } @@ -540,17 +552,15 @@ public class EmbeddedLeaderService { private static class RevokeLeadershipCall implements Runnable { @Nonnull - private final EmbeddedLeaderElectionService leaderElectionService; + private final LeaderContender contender; - RevokeLeadershipCall(@Nonnull EmbeddedLeaderElectionService leaderElectionService) { - this.leaderElectionService = leaderElectionService; + RevokeLeadershipCall(@Nonnull LeaderContender contender) { + this.contender = contender; } @Override public void run() { - leaderElectionService.isLeader = false; - - leaderElectionService.contender.revokeLeadership(); + contender.revokeLeadership(); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java index 2455bf0..7f06d60 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java @@ -24,7 +24,6 @@ import org.apache.flink.runtime.leaderelection.LeaderContender; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; -import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; import org.junit.After; @@ -33,9 +32,6 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; import java.util.UUID; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.atomic.AtomicReference; import static junit.framework.TestCase.assertTrue; import static org.hamcrest.Matchers.is; @@ -179,12 +175,11 @@ public class EmbeddedHaServicesTest extends TestLogger { @Test public void testConcurrentLeadershipOperations() throws Exception { final LeaderElectionService dispatcherLeaderElectionService = embeddedHaServices.getDispatcherLeaderElectionService(); - final ArrayBlockingQueue<UUID> offeredSessionIds = new ArrayBlockingQueue<>(2); - final TestingLeaderContender leaderContender = new TestingLeaderContender(offeredSessionIds); + final TestingLeaderContender leaderContender = new TestingLeaderContender(); dispatcherLeaderElectionService.start(leaderContender); - final UUID oldLeaderSessionId = offeredSessionIds.take(); + final UUID oldLeaderSessionId = leaderContender.getLeaderSessionFuture().get(); assertThat(dispatcherLeaderElectionService.hasLeadership(oldLeaderSessionId), is(true)); @@ -192,7 +187,7 @@ public class EmbeddedHaServicesTest extends TestLogger { assertThat(dispatcherLeaderElectionService.hasLeadership(oldLeaderSessionId), is(false)); embeddedHaServices.getDispatcherLeaderService().grantLeadership(); - final UUID newLeaderSessionId = offeredSessionIds.take(); + final UUID newLeaderSessionId = leaderContender.getLeaderSessionFuture().get(); assertThat(dispatcherLeaderElectionService.hasLeadership(newLeaderSessionId), is(true)); @@ -204,37 +199,4 @@ public class EmbeddedHaServicesTest extends TestLogger { leaderContender.tryRethrowException(); } - private static final class TestingLeaderContender implements LeaderContender { - - private final BlockingQueue<UUID> offeredSessionIds; - - private final AtomicReference<Exception> occurredException; - - private TestingLeaderContender(BlockingQueue<UUID> offeredSessionIds) { - this.offeredSessionIds = offeredSessionIds; - occurredException = new AtomicReference<>(null); - } - - @Override - public void grantLeadership(UUID leaderSessionID) { - offeredSessionIds.offer(leaderSessionID); - } - - @Override - public void revokeLeadership() {} - - @Override - public String getAddress() { - return "foobar"; - } - - @Override - public void handleError(Exception exception) { - occurredException.compareAndSet(null, exception); - } - - public void tryRethrowException() throws Exception { - ExceptionUtils.tryRethrowException(occurredException.get()); - } - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderServiceTest.java new file mode 100644 index 0000000..6ecc967 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderServiceTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability.nonha.embedded; + +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.Matchers.is; + +/** + * Tests for the {@link EmbeddedLeaderService}. + */ +public class EmbeddedLeaderServiceTest extends TestLogger { + + /** + * Tests that the {@link EmbeddedLeaderService} can handle a concurrent grant + * leadership call and a shutdown. + */ + @Test + public void testConcurrentGrantLeadershipAndShutdown() throws Exception { + final EmbeddedLeaderService embeddedLeaderService = new EmbeddedLeaderService(TestingUtils.defaultExecutor()); + + try { + final LeaderElectionService leaderElectionService = embeddedLeaderService.createLeaderElectionService(); + + final TestingLeaderContender contender = new TestingLeaderContender(); + + leaderElectionService.start(contender); + leaderElectionService.stop(); + + try { + // check that no exception occurred + contender.getLeaderSessionFuture().get(10L, TimeUnit.MILLISECONDS); + } catch (TimeoutException ignored) { + // we haven't participated in the leader election + } + + // the election service should still be running + Assert.assertThat(embeddedLeaderService.isShutdown(), is(false)); + } finally { + embeddedLeaderService.shutdown(); + } + } + + /** + * Tests that the {@link EmbeddedLeaderService} can handle a concurrent revoke + * leadership call and a shutdown. + */ + @Test + public void testConcurrentRevokeLeadershipAndShutdown() throws Exception { + final EmbeddedLeaderService embeddedLeaderService = new EmbeddedLeaderService(TestingUtils.defaultExecutor()); + + try { + final LeaderElectionService leaderElectionService = embeddedLeaderService.createLeaderElectionService(); + + final TestingLeaderContender contender = new TestingLeaderContender(); + + leaderElectionService.start(contender); + + // wait for the leadership + contender.getLeaderSessionFuture().get(); + + final CompletableFuture<Void> revokeLeadershipFuture = embeddedLeaderService.revokeLeadership(); + leaderElectionService.stop(); + + try { + // check that no exception occurred + revokeLeadershipFuture.get(10L, TimeUnit.MILLISECONDS); + } catch (TimeoutException ignored) { + // the leader election service has been stopped before revoking could be executed + } + + // the election service should still be running + Assert.assertThat(embeddedLeaderService.isShutdown(), is(false)); + } finally { + embeddedLeaderService.shutdown(); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/TestingLeaderContender.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/TestingLeaderContender.java new file mode 100644 index 0000000..0b28671 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/TestingLeaderContender.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability.nonha.embedded; + +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.leaderelection.LeaderContender; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +/** + * {@link LeaderContender} implementation for testing purposes. + */ +final class TestingLeaderContender implements LeaderContender { + + private final Object lock = new Object(); + + private CompletableFuture<UUID> leaderSessionFuture; + + TestingLeaderContender() { + leaderSessionFuture = new CompletableFuture<>(); + } + + @Override + public void grantLeadership(UUID leaderSessionID) { + synchronized (lock) { + if (!leaderSessionFuture.isCompletedExceptionally()) { + if (!leaderSessionFuture.complete(leaderSessionID)) { + leaderSessionFuture = CompletableFuture.completedFuture(leaderSessionID); + } + } + } + } + + @Override + public void revokeLeadership() { + synchronized (lock) { + if (leaderSessionFuture.isDone() && !leaderSessionFuture.isCompletedExceptionally()) { + leaderSessionFuture = new CompletableFuture<>(); + } + } + } + + @Override + public String getAddress() { + return "foobar"; + } + + @Override + public void handleError(Exception exception) { + synchronized (lock) { + if (!(leaderSessionFuture.isCompletedExceptionally() || leaderSessionFuture.completeExceptionally(exception))) { + leaderSessionFuture = FutureUtils.completedExceptionally(exception); + } + } + } + + public void tryRethrowException() { + synchronized (lock) { + if (leaderSessionFuture.isCompletedExceptionally()) { + leaderSessionFuture.getNow(null); + } + } + } + + CompletableFuture<UUID> getLeaderSessionFuture() { + synchronized (lock) { + return leaderSessionFuture; + } + } +}