This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ccf27ff845836d74bcb94a9b5baf4bc171ea2ff3
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Thu Mar 7 17:28:33 2019 +0100

    [FLINK-11855] Fix race condition in 
EmbeddedLeaderService#GrantLeadershipCall
    
    Fix the race condition between executing 
EmbeddedLeaderService#GrantLeadershipCall
    and a concurrent shutdown of the leader service by making 
GrantLeadershipCall not
    accessing mutable state outside of a lock.
    
    This closes #7935.
---
 .../nonha/embedded/EmbeddedLeaderService.java      |  46 +++++----
 .../nonha/embedded/EmbeddedHaServicesTest.java     |  44 +--------
 .../nonha/embedded/EmbeddedLeaderServiceTest.java  | 103 +++++++++++++++++++++
 .../nonha/embedded/TestingLeaderContender.java     |  87 +++++++++++++++++
 4 files changed, 221 insertions(+), 59 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java
index 97b80dc..e3e98e1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java
@@ -102,6 +102,13 @@ public class EmbeddedLeaderService {
                }
        }
 
+       @VisibleForTesting
+       public boolean isShutdown() {
+               synchronized (lock) {
+                       return shutdown;
+               }
+       }
+
        private void fatalError(Throwable error) {
                LOG.error("Embedded leader election service encountered a fatal 
error. Shutting down service.", error);
 
@@ -171,7 +178,11 @@ public class EmbeddedLeaderService {
                                service.contender = contender;
                                service.running = true;
 
-                               updateLeader();
+                               updateLeader().whenComplete((aVoid, throwable) 
-> {
+                                       if (throwable != null) {
+                                               fatalError(throwable);
+                                       }
+                               });
                        }
                        catch (Throwable t) {
                                fatalError(t);
@@ -210,7 +221,11 @@ public class EmbeddedLeaderService {
                                        currentLeaderSessionId = null;
                                }
 
-                               updateLeader();
+                               updateLeader().whenComplete((aVoid, throwable) 
-> {
+                                       if (throwable != null) {
+                                               fatalError(throwable);
+                                       }
+                               });
                        }
                        catch (Throwable t) {
                                fatalError(t);
@@ -280,11 +295,12 @@ public class EmbeddedLeaderService {
 
                                currentLeaderSessionId = leaderSessionId;
                                currentLeaderProposed = leaderService;
+                               currentLeaderProposed.isLeader = true;
 
                                LOG.info("Proposing leadership to contender {} 
@ {}",
                                                leaderService.contender, 
leaderService.contender.getAddress());
 
-                               return execute(new 
GrantLeadershipCall(leaderService, leaderSessionId, LOG));
+                               return execute(new 
GrantLeadershipCall(leaderService.contender, leaderSessionId, LOG));
                        }
                } else {
                        return CompletableFuture.completedFuture(null);
@@ -373,7 +389,8 @@ public class EmbeddedLeaderService {
                                }
 
                                LOG.info("Revoking leadership of {}.", 
leaderService.contender);
-                               CompletableFuture<Void> 
revokeLeadershipCallFuture = execute(new RevokeLeadershipCall(leaderService));
+                               leaderService.isLeader = false;
+                               CompletableFuture<Void> 
revokeLeadershipCallFuture = execute(new 
RevokeLeadershipCall(leaderService.contender));
 
                                CompletableFuture<Void> 
notifyAllListenersFuture = notifyAllListeners(null, null);
 
@@ -506,33 +523,28 @@ public class EmbeddedLeaderService {
 
        private static class GrantLeadershipCall implements Runnable {
 
-               private final EmbeddedLeaderElectionService 
leaderElectionService;
+               private final LeaderContender contender;
                private final UUID leaderSessionId;
                private final Logger logger;
 
                GrantLeadershipCall(
-                               EmbeddedLeaderElectionService 
leaderElectionService,
+                               LeaderContender contender,
                                UUID leaderSessionId,
                                Logger logger) {
 
-                       this.leaderElectionService = 
checkNotNull(leaderElectionService);
+                       this.contender = checkNotNull(contender);
                        this.leaderSessionId = checkNotNull(leaderSessionId);
                        this.logger = checkNotNull(logger);
                }
 
                @Override
                public void run() {
-                       leaderElectionService.isLeader = true;
-
-                       final LeaderContender contender = 
leaderElectionService.contender;
-
                        try {
                                contender.grantLeadership(leaderSessionId);
                        }
                        catch (Throwable t) {
                                logger.warn("Error granting leadership to 
contender", t);
                                contender.handleError(t instanceof Exception ? 
(Exception) t : new Exception(t));
-                               leaderElectionService.isLeader = false;
                        }
                }
        }
@@ -540,17 +552,15 @@ public class EmbeddedLeaderService {
        private static class RevokeLeadershipCall implements Runnable {
 
                @Nonnull
-               private final EmbeddedLeaderElectionService 
leaderElectionService;
+               private final LeaderContender contender;
 
-               RevokeLeadershipCall(@Nonnull EmbeddedLeaderElectionService 
leaderElectionService) {
-                       this.leaderElectionService = leaderElectionService;
+               RevokeLeadershipCall(@Nonnull LeaderContender contender) {
+                       this.contender = contender;
                }
 
                @Override
                public void run() {
-                       leaderElectionService.isLeader = false;
-
-                       leaderElectionService.contender.revokeLeadership();
+                       contender.revokeLeadership();
                }
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java
index 2455bf0..7f06d60 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java
@@ -24,7 +24,6 @@ import 
org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
@@ -33,9 +32,6 @@ import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
 import java.util.UUID;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static junit.framework.TestCase.assertTrue;
 import static org.hamcrest.Matchers.is;
@@ -179,12 +175,11 @@ public class EmbeddedHaServicesTest extends TestLogger {
        @Test
        public void testConcurrentLeadershipOperations() throws Exception {
                final LeaderElectionService dispatcherLeaderElectionService = 
embeddedHaServices.getDispatcherLeaderElectionService();
-               final ArrayBlockingQueue<UUID> offeredSessionIds = new 
ArrayBlockingQueue<>(2);
-               final TestingLeaderContender leaderContender = new 
TestingLeaderContender(offeredSessionIds);
+               final TestingLeaderContender leaderContender = new 
TestingLeaderContender();
 
                dispatcherLeaderElectionService.start(leaderContender);
 
-               final UUID oldLeaderSessionId = offeredSessionIds.take();
+               final UUID oldLeaderSessionId = 
leaderContender.getLeaderSessionFuture().get();
 
                
assertThat(dispatcherLeaderElectionService.hasLeadership(oldLeaderSessionId), 
is(true));
 
@@ -192,7 +187,7 @@ public class EmbeddedHaServicesTest extends TestLogger {
                
assertThat(dispatcherLeaderElectionService.hasLeadership(oldLeaderSessionId), 
is(false));
 
                
embeddedHaServices.getDispatcherLeaderService().grantLeadership();
-               final UUID newLeaderSessionId = offeredSessionIds.take();
+               final UUID newLeaderSessionId = 
leaderContender.getLeaderSessionFuture().get();
 
                
assertThat(dispatcherLeaderElectionService.hasLeadership(newLeaderSessionId), 
is(true));
 
@@ -204,37 +199,4 @@ public class EmbeddedHaServicesTest extends TestLogger {
                leaderContender.tryRethrowException();
        }
 
-       private static final class TestingLeaderContender implements 
LeaderContender {
-
-               private final BlockingQueue<UUID> offeredSessionIds;
-
-               private final AtomicReference<Exception> occurredException;
-
-               private TestingLeaderContender(BlockingQueue<UUID> 
offeredSessionIds) {
-                       this.offeredSessionIds = offeredSessionIds;
-                       occurredException = new AtomicReference<>(null);
-               }
-
-               @Override
-               public void grantLeadership(UUID leaderSessionID) {
-                       offeredSessionIds.offer(leaderSessionID);
-               }
-
-               @Override
-               public void revokeLeadership() {}
-
-               @Override
-               public String getAddress() {
-                       return "foobar";
-               }
-
-               @Override
-               public void handleError(Exception exception) {
-                       occurredException.compareAndSet(null, exception);
-               }
-
-               public void tryRethrowException() throws Exception {
-                       
ExceptionUtils.tryRethrowException(occurredException.get());
-               }
-       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderServiceTest.java
new file mode 100644
index 0000000..6ecc967
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderServiceTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha.embedded;
+
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.hamcrest.Matchers.is;
+
+/**
+ * Tests for the {@link EmbeddedLeaderService}.
+ */
+public class EmbeddedLeaderServiceTest extends TestLogger {
+
+       /**
+        * Tests that the {@link EmbeddedLeaderService} can handle a concurrent 
grant
+        * leadership call and a shutdown.
+        */
+       @Test
+       public void testConcurrentGrantLeadershipAndShutdown() throws Exception 
{
+               final EmbeddedLeaderService embeddedLeaderService = new 
EmbeddedLeaderService(TestingUtils.defaultExecutor());
+
+               try {
+                       final LeaderElectionService leaderElectionService = 
embeddedLeaderService.createLeaderElectionService();
+
+                       final TestingLeaderContender contender = new 
TestingLeaderContender();
+
+                       leaderElectionService.start(contender);
+                       leaderElectionService.stop();
+
+                       try {
+                               // check that no exception occurred
+                               contender.getLeaderSessionFuture().get(10L, 
TimeUnit.MILLISECONDS);
+                       } catch (TimeoutException ignored) {
+                               // we haven't participated in the leader 
election
+                       }
+
+                       // the election service should still be running
+                       Assert.assertThat(embeddedLeaderService.isShutdown(), 
is(false));
+               } finally {
+                       embeddedLeaderService.shutdown();
+               }
+       }
+
+       /**
+        * Tests that the {@link EmbeddedLeaderService} can handle a concurrent 
revoke
+        * leadership call and a shutdown.
+        */
+       @Test
+       public void testConcurrentRevokeLeadershipAndShutdown() throws 
Exception {
+               final EmbeddedLeaderService embeddedLeaderService = new 
EmbeddedLeaderService(TestingUtils.defaultExecutor());
+
+               try {
+                       final LeaderElectionService leaderElectionService = 
embeddedLeaderService.createLeaderElectionService();
+
+                       final TestingLeaderContender contender = new 
TestingLeaderContender();
+
+                       leaderElectionService.start(contender);
+
+                       // wait for the leadership
+                       contender.getLeaderSessionFuture().get();
+
+                       final CompletableFuture<Void> revokeLeadershipFuture = 
embeddedLeaderService.revokeLeadership();
+                       leaderElectionService.stop();
+
+                       try {
+                               // check that no exception occurred
+                               revokeLeadershipFuture.get(10L, 
TimeUnit.MILLISECONDS);
+                       } catch (TimeoutException ignored) {
+                               // the leader election service has been stopped 
before revoking could be executed
+                       }
+
+                       // the election service should still be running
+                       Assert.assertThat(embeddedLeaderService.isShutdown(), 
is(false));
+               } finally {
+                       embeddedLeaderService.shutdown();
+               }
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/TestingLeaderContender.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/TestingLeaderContender.java
new file mode 100644
index 0000000..0b28671
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/TestingLeaderContender.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha.embedded;
+
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@link LeaderContender} implementation for testing purposes.
+ */
+final class TestingLeaderContender implements LeaderContender {
+
+       private final Object lock = new Object();
+
+       private CompletableFuture<UUID> leaderSessionFuture;
+
+       TestingLeaderContender() {
+               leaderSessionFuture = new CompletableFuture<>();
+       }
+
+       @Override
+       public void grantLeadership(UUID leaderSessionID) {
+               synchronized (lock) {
+                       if (!leaderSessionFuture.isCompletedExceptionally()) {
+                               if 
(!leaderSessionFuture.complete(leaderSessionID)) {
+                                       leaderSessionFuture = 
CompletableFuture.completedFuture(leaderSessionID);
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public void revokeLeadership() {
+               synchronized (lock) {
+                       if (leaderSessionFuture.isDone() && 
!leaderSessionFuture.isCompletedExceptionally()) {
+                               leaderSessionFuture = new CompletableFuture<>();
+                       }
+               }
+       }
+
+       @Override
+       public String getAddress() {
+               return "foobar";
+       }
+
+       @Override
+       public void handleError(Exception exception) {
+               synchronized (lock) {
+                       if (!(leaderSessionFuture.isCompletedExceptionally() || 
leaderSessionFuture.completeExceptionally(exception))) {
+                               leaderSessionFuture = 
FutureUtils.completedExceptionally(exception);
+                       }
+               }
+       }
+
+       public void tryRethrowException() {
+               synchronized (lock) {
+                       if (leaderSessionFuture.isCompletedExceptionally()) {
+                               leaderSessionFuture.getNow(null);
+                       }
+               }
+       }
+
+       CompletableFuture<UUID> getLeaderSessionFuture() {
+               synchronized (lock) {
+                       return leaderSessionFuture;
+               }
+       }
+}

Reply via email to