wangyang0918 commented on a change in pull request #13644:
URL: https://github.com/apache/flink/pull/13644#discussion_r516404322



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
##########
@@ -160,6 +161,39 @@ public void testRetryCancellation() throws Exception {
                }
        }
 
+       /**
+        * Test that {@link FutureUtils#retry} should stop at non-retryable 
exception.
+        */
+       @Test
+       public void testStopAtNonRetryableException() {
+               final int retries = 10;
+               final int notRetry = 3;
+               final AtomicInteger atomicInteger = new AtomicInteger(0);
+               final String notRetryExceptionMsg = "Non-retryable exception";
+               CompletableFuture<Boolean> retryFuture = FutureUtils.retry(
+                       () -> CompletableFuture.supplyAsync(
+                               () -> {
+                                       if (atomicInteger.incrementAndGet() == 
notRetry) {
+                                               // throw non-retryable exception
+                                               throw new 
CompletionException(new FlinkRuntimeException(notRetryExceptionMsg));
+                                       } else {
+                                               throw new 
CompletionException(new FlinkException("Test exception"));
+                                       }
+                               },
+                               TestingUtils.defaultExecutor()),
+                       retries,
+                       throwable -> ExceptionUtils.findThrowable(throwable, 
FlinkException.class).isPresent(),
+                       TestingUtils.defaultExecutor());
+
+               try {
+                       retryFuture.get();
+                       fail("Exception should be thrown.");
+               } catch (Exception ex) {
+                       assertThat(ExceptionUtils.findThrowableWithMessage(ex, 
notRetryExceptionMsg).isPresent(), is(true));

Review comment:
       Nice suggestion. I will change other similar exception `assert`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Objects;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Default implementation for leader election service. Composed with different 
{@link LeaderElectionDriver}, we could
+ * perform a leader election for the contender, and then persist the leader 
information to various storage.
+ */
+public class DefaultLeaderElectionService implements LeaderElectionService, 
LeaderElectionEventHandler {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultLeaderElectionService.class);
+
+       private final Object lock = new Object();
+
+       private final LeaderElectionDriverFactory leaderElectionDriverFactory;
+
+       /** The leader contender which applies for leadership. */
+       @GuardedBy("lock")
+       private volatile LeaderContender leaderContender;
+
+       @GuardedBy("lock")
+       private volatile UUID issuedLeaderSessionID;
+
+       @GuardedBy("lock")
+       private volatile UUID confirmedLeaderSessionID;
+
+       @GuardedBy("lock")
+       private volatile String confirmedLeaderAddress;
+
+       @GuardedBy("lock")
+       private volatile boolean running;
+
+       @GuardedBy("lock")
+       private LeaderElectionDriver leaderElectionDriver;
+
+       public DefaultLeaderElectionService(LeaderElectionDriverFactory 
leaderElectionDriverFactory) {
+               this.leaderElectionDriverFactory = 
checkNotNull(leaderElectionDriverFactory);
+
+               leaderContender = null;
+
+               issuedLeaderSessionID = null;
+               confirmedLeaderSessionID = null;
+               confirmedLeaderAddress = null;
+
+               running = false;
+       }
+
+       @Override
+       public final void start(LeaderContender contender) throws Exception {
+               checkNotNull(contender, "Contender must not be null.");
+               Preconditions.checkState(leaderContender == null, "Contender 
was already set.");
+
+               synchronized (lock) {
+                       leaderContender = contender;
+                       leaderElectionDriver = 
leaderElectionDriverFactory.createLeaderElectionDriver(
+                               this, new LeaderElectionFatalErrorHandler(), 
leaderContender.getDescription());
+                       LOG.info("Starting DefaultLeaderElectionService with 
{}.", leaderElectionDriver);
+
+                       running = true;
+               }
+       }
+
+       @Override
+       public final void stop() throws Exception {
+               LOG.info("Stopping DefaultLeaderElectionService.");
+
+               synchronized (lock) {
+                       if (!running) {
+                               return;
+                       }
+                       running = false;
+                       clearConfirmedLeaderInformation();
+                       leaderElectionDriver.close();
+               }
+       }
+
+       @Override
+       public void confirmLeadership(UUID leaderSessionID, String 
leaderAddress) {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug(
+                               "Confirm leader session ID {} for leader {}.",
+                               leaderSessionID,
+                               leaderAddress);
+               }
+
+               checkNotNull(leaderSessionID);
+
+               synchronized (lock) {
+                       if (hasLeadership(leaderSessionID)) {
+                               if (running) {
+                                       
confirmLeaderInformation(leaderSessionID, leaderAddress);
+                               } else {
+                                       if (LOG.isDebugEnabled()) {
+                                               LOG.debug("Ignoring the leader 
session Id {} confirmation, since the " +
+                                                       "LeaderElectionService 
has already been stopped.", leaderSessionID);
+                                       }
+                               }
+                       } else {
+                               // Received an old confirmation call
+                               if 
(!leaderSessionID.equals(this.issuedLeaderSessionID)) {
+                                       if (LOG.isDebugEnabled()) {
+                                               LOG.debug("Receive an old 
confirmation call of leader session ID {}, " +
+                                                       "current issued session 
ID is {}", leaderSessionID, issuedLeaderSessionID);
+                                       }
+                               } else {
+                                       LOG.warn("The leader session ID {} was 
confirmed even though the " +
+                                               "corresponding JobManager was 
not elected as the leader.", leaderSessionID);
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
+               synchronized (lock) {
+                       if (running) {
+                               return leaderElectionDriver.hasLeadership() && 
leaderSessionId.equals(issuedLeaderSessionID);
+                       } else {
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("hasLeadership is called 
after the service is stopped, returning false.");
+                               }
+                               return false;
+                       }
+               }
+       }
+
+       /**
+        * Returns the current leader session ID or null, if the contender is 
not the leader.
+        *
+        * @return The last leader session ID or null, if the contender is not 
the leader
+        */
+       @VisibleForTesting
+       @Nullable
+       public UUID getLeaderSessionID() {
+               return confirmedLeaderSessionID;
+       }
+
+       @GuardedBy("lock")
+       private void confirmLeaderInformation(UUID leaderSessionID, String 
leaderAddress) {
+               confirmedLeaderSessionID = leaderSessionID;
+               confirmedLeaderAddress = leaderAddress;
+               leaderElectionDriver.writeLeaderInformation(
+                       LeaderInformation.known(confirmedLeaderSessionID, 
confirmedLeaderAddress));
+       }
+
+       @GuardedBy("lock")
+       private void clearConfirmedLeaderInformation() {
+               confirmedLeaderSessionID = null;
+               confirmedLeaderAddress = null;
+       }
+
+       @Override
+       @GuardedBy("lock")
+       public void onGrantLeadership() {
+               synchronized (lock) {
+                       if (running) {
+                               issuedLeaderSessionID = UUID.randomUUID();
+                               clearConfirmedLeaderInformation();
+
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug(
+                                               "Grant leadership to contender 
{} with session ID {}.",
+                                               
leaderContender.getDescription(),
+                                               issuedLeaderSessionID);
+                               }
+
+                               
leaderContender.grantLeadership(issuedLeaderSessionID);

Review comment:
       Yes. We may have a potential deadlock here. And callback of 
`leaderContender` should not happen under the `lock`. I will move all the 
callback out of `lock`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionDriver.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+/**
+ * A {@link LeaderElectionDriver} is responsible for performing the leader 
election and storing the leader information.
+ * All the leader internal state is guarded by lock in {@link 
LeaderElectionService}. Different driver
+ * implementations do not need to care about the lock. And it should use 
{@link LeaderElectionEventHandler}
+ * if it want to respond to the leader change events.
+ *
+ * <p><strong>Important</strong>: The {@link LeaderElectionDriver} could not 
guarantee that there is no
+ * {@link LeaderElectionEventHandler} callbacks happen after {@link #close()}.
+ */
+public interface LeaderElectionDriver {
+
+       /**
+        * Write the current leader information to external persistent 
storage(e.g. Zookeeper, Kubernetes ConfigMap). This
+        * is a blocking IO operation.
+        *
+        * @param leaderInformation current leader information. It could be 
{@link LeaderInformation#empty()}, which means
+        * the caller want to clear the leader information on external storage.
+        */
+       void writeLeaderInformation(LeaderInformation leaderInformation);

Review comment:
       `writeLeaderInformation` will only take effect when the current driver 
still have the leadership. The caller do not need to make sure of it. I will 
add this to the interface description.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionDriver.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+/**
+ * A {@link LeaderElectionDriver} is responsible for performing the leader 
election and storing the leader information.
+ * All the leader internal state is guarded by lock in {@link 
LeaderElectionService}. Different driver
+ * implementations do not need to care about the lock. And it should use 
{@link LeaderElectionEventHandler}
+ * if it want to respond to the leader change events.
+ *
+ * <p><strong>Important</strong>: The {@link LeaderElectionDriver} could not 
guarantee that there is no
+ * {@link LeaderElectionEventHandler} callbacks happen after {@link #close()}.
+ */
+public interface LeaderElectionDriver {
+
+       /**
+        * Write the current leader information to external persistent 
storage(e.g. Zookeeper, Kubernetes ConfigMap). This
+        * is a blocking IO operation.
+        *
+        * @param leaderInformation current leader information. It could be 
{@link LeaderInformation#empty()}, which means
+        * the caller want to clear the leader information on external storage.
+        */
+       void writeLeaderInformation(LeaderInformation leaderInformation);

Review comment:
       "The write operation takes effect only when the driver still has the 
leadership." The caller do not need to make sure of it. I will add this to the 
interface description.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriver.java
##########
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.api.UnhandledErrorListener;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.ChildData;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCache;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCacheListener;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatchListener;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionState;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.CreateMode;
+import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
+import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.data.Stat;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link LeaderElectionDriver} implementation for Zookeeper. The leading 
JobManager is elected using
+ * ZooKeeper. The current leader's address as well as its leader session ID is 
published via
+ * ZooKeeper.
+ */
+public class ZooKeeperLeaderElectionDriver implements LeaderElectionDriver, 
LeaderLatchListener, NodeCacheListener, UnhandledErrorListener {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperLeaderElectionDriver.class);
+
+       /** Client to the ZooKeeper quorum. */
+       private final CuratorFramework client;
+
+       /** Curator recipe for leader election. */
+       private final LeaderLatch leaderLatch;
+
+       /** Curator recipe to watch a given ZooKeeper node for changes. */
+       private final NodeCache cache;
+
+       /** ZooKeeper path of the node which stores the current leader 
information. */
+       private final String leaderPath;
+
+       private final ConnectionStateListener listener = (client, newState) -> 
handleStateChange(newState);
+
+       private final LeaderElectionEventHandler leaderElectionEventHandler;
+
+       private final FatalErrorHandler fatalErrorHandler;
+
+       private final String leaderContenderDescription;
+
+       private volatile boolean running;
+
+       /**
+        * Creates a ZooKeeperLeaderElectionDriver object.
+        *
+        * @param client Client which is connected to the ZooKeeper quorum
+        * @param latchPath ZooKeeper node path for the leader election latch
+        * @param leaderPath ZooKeeper node path for the node which stores the 
current leader information
+        * @param leaderElectionEventHandler Event handler for processing 
leader change events
+        * @param fatalErrorHandler Fatal error handler
+        * @param leaderContenderDescription Leader contender description
+        */
+       public ZooKeeperLeaderElectionDriver(
+                       CuratorFramework client,
+                       String latchPath,
+                       String leaderPath,
+                       LeaderElectionEventHandler leaderElectionEventHandler,
+                       FatalErrorHandler fatalErrorHandler,
+                       String leaderContenderDescription) throws Exception {
+               this.client = checkNotNull(client);
+               this.leaderPath = checkNotNull(leaderPath);
+               this.leaderElectionEventHandler = 
checkNotNull(leaderElectionEventHandler);
+               this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
+               this.leaderContenderDescription = 
checkNotNull(leaderContenderDescription);
+
+               leaderLatch = new LeaderLatch(client, checkNotNull(latchPath));
+               cache = new NodeCache(client, leaderPath);
+
+               client.getUnhandledErrorListenable().addListener(this);
+
+               leaderLatch.addListener(this);
+               leaderLatch.start();
+
+               cache.getListenable().addListener(this);
+               cache.start();
+
+               client.getConnectionStateListenable().addListener(listener);
+
+               running = true;
+       }
+
+       @Override
+       public void close() throws Exception{
+               if (!running) {
+                       return;
+               }
+               running = false;
+
+               LOG.info("Closing {}", this);
+
+               client.getUnhandledErrorListenable().removeListener(this);
+
+               client.getConnectionStateListenable().removeListener(listener);
+
+               Exception exception = null;
+
+               try {
+                       cache.close();
+               } catch (Exception e) {
+                       exception = e;
+               }
+
+               try {
+                       leaderLatch.close();
+               } catch (Exception e) {
+                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+               }
+
+               if (exception != null) {
+                       throw new Exception("Could not properly stop the 
ZooKeeperLeaderElectionDriver.", exception);
+               }
+       }
+
+       @Override
+       public boolean hasLeadership() {
+               assert(running);
+               return leaderLatch.hasLeadership();
+       }
+
+       @Override
+       public void isLeader() {
+               leaderElectionEventHandler.onGrantLeadership();
+       }
+
+       @Override
+       public void notLeader() {
+               leaderElectionEventHandler.onRevokeLeadership();
+       }
+
+       @Override
+       public void nodeChanged() throws Exception {
+               if (leaderLatch.hasLeadership()) {
+                       ChildData childData = cache.getCurrentData();
+                       if (childData != null) {
+                               final byte[] data = childData.getData();
+                               if (data != null && data.length > 0) {
+                                       final ByteArrayInputStream bais = new 
ByteArrayInputStream(data);
+                                       final ObjectInputStream ois = new 
ObjectInputStream(bais);
+
+                                       final String leaderAddress = 
ois.readUTF();
+                                       final UUID leaderSessionID = (UUID) 
ois.readObject();
+
+                                       
leaderElectionEventHandler.onLeaderInformationChange(
+                                               
LeaderInformation.known(leaderSessionID, leaderAddress));
+                                       return;
+                               }
+                       }
+                       
leaderElectionEventHandler.onLeaderInformationChange(LeaderInformation.empty());
+               }
+       }
+
+       /**
+        * Writes the current leader's address as well the given leader session 
ID to ZooKeeper.
+        */
+       @Override
+       public void writeLeaderInformation(LeaderInformation leaderInformation) 
{
+               assert(running);
+               // this method does not have to be synchronized because the 
curator framework client
+               // is thread-safe. We do not write the empty data to ZooKeeper 
here. Because check-leadership-and-update
+               // is not a transactional operation. We may wrongly clear the 
data written by new leader.
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Write leader information: {}.", 
leaderInformation);
+               }
+               if (leaderInformation.equals(LeaderInformation.empty())) {
+                       return;

Review comment:
       I think the normal behavior should be that once the current 
`LeaderElectionDriver` is not the leader, it is responsible for cleaning up the 
leader information on the external storage(Kubernetes, ZooKeeper). And it 
should only be executed when a new leader has not write his leader information.
   
   For ZooKeeper, we are using an ephemeral node for leader information and it 
will be cleaned up automatically when ZooKeeper client timeout. So it is 
reasonable to not do the clean up here.
   
   However, for Kubernetes, we should and could do this. Since check the 
leadership(aka check the annotation on the leader ConfigMap) and remove leader 
information is a transactional operation, we could guarantee that we will not 
overwrite a new leader information.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriver.java
##########
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.api.UnhandledErrorListener;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.ChildData;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCache;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCacheListener;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatchListener;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionState;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.CreateMode;
+import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
+import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.data.Stat;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link LeaderElectionDriver} implementation for Zookeeper. The leading 
JobManager is elected using
+ * ZooKeeper. The current leader's address as well as its leader session ID is 
published via
+ * ZooKeeper.
+ */
+public class ZooKeeperLeaderElectionDriver implements LeaderElectionDriver, 
LeaderLatchListener, NodeCacheListener, UnhandledErrorListener {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperLeaderElectionDriver.class);
+
+       /** Client to the ZooKeeper quorum. */
+       private final CuratorFramework client;
+
+       /** Curator recipe for leader election. */
+       private final LeaderLatch leaderLatch;
+
+       /** Curator recipe to watch a given ZooKeeper node for changes. */
+       private final NodeCache cache;
+
+       /** ZooKeeper path of the node which stores the current leader 
information. */
+       private final String leaderPath;
+
+       private final ConnectionStateListener listener = (client, newState) -> 
handleStateChange(newState);
+
+       private final LeaderElectionEventHandler leaderElectionEventHandler;
+
+       private final FatalErrorHandler fatalErrorHandler;
+
+       private final String leaderContenderDescription;
+
+       private volatile boolean running;
+
+       /**
+        * Creates a ZooKeeperLeaderElectionDriver object.
+        *
+        * @param client Client which is connected to the ZooKeeper quorum
+        * @param latchPath ZooKeeper node path for the leader election latch
+        * @param leaderPath ZooKeeper node path for the node which stores the 
current leader information
+        * @param leaderElectionEventHandler Event handler for processing 
leader change events
+        * @param fatalErrorHandler Fatal error handler
+        * @param leaderContenderDescription Leader contender description
+        */
+       public ZooKeeperLeaderElectionDriver(
+                       CuratorFramework client,
+                       String latchPath,
+                       String leaderPath,
+                       LeaderElectionEventHandler leaderElectionEventHandler,
+                       FatalErrorHandler fatalErrorHandler,
+                       String leaderContenderDescription) throws Exception {
+               this.client = checkNotNull(client);
+               this.leaderPath = checkNotNull(leaderPath);
+               this.leaderElectionEventHandler = 
checkNotNull(leaderElectionEventHandler);
+               this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
+               this.leaderContenderDescription = 
checkNotNull(leaderContenderDescription);
+
+               leaderLatch = new LeaderLatch(client, checkNotNull(latchPath));
+               cache = new NodeCache(client, leaderPath);
+
+               client.getUnhandledErrorListenable().addListener(this);
+
+               leaderLatch.addListener(this);
+               leaderLatch.start();
+
+               cache.getListenable().addListener(this);
+               cache.start();
+
+               client.getConnectionStateListenable().addListener(listener);
+
+               running = true;
+       }
+
+       @Override
+       public void close() throws Exception{
+               if (!running) {
+                       return;
+               }
+               running = false;
+
+               LOG.info("Closing {}", this);
+
+               client.getUnhandledErrorListenable().removeListener(this);
+
+               client.getConnectionStateListenable().removeListener(listener);
+
+               Exception exception = null;
+
+               try {
+                       cache.close();
+               } catch (Exception e) {
+                       exception = e;
+               }
+
+               try {
+                       leaderLatch.close();
+               } catch (Exception e) {
+                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+               }
+
+               if (exception != null) {
+                       throw new Exception("Could not properly stop the 
ZooKeeperLeaderElectionDriver.", exception);
+               }
+       }
+
+       @Override
+       public boolean hasLeadership() {
+               assert(running);
+               return leaderLatch.hasLeadership();
+       }
+
+       @Override
+       public void isLeader() {
+               leaderElectionEventHandler.onGrantLeadership();
+       }
+
+       @Override
+       public void notLeader() {
+               leaderElectionEventHandler.onRevokeLeadership();
+       }
+
+       @Override
+       public void nodeChanged() throws Exception {
+               if (leaderLatch.hasLeadership()) {
+                       ChildData childData = cache.getCurrentData();
+                       if (childData != null) {
+                               final byte[] data = childData.getData();
+                               if (data != null && data.length > 0) {
+                                       final ByteArrayInputStream bais = new 
ByteArrayInputStream(data);
+                                       final ObjectInputStream ois = new 
ObjectInputStream(bais);
+
+                                       final String leaderAddress = 
ois.readUTF();
+                                       final UUID leaderSessionID = (UUID) 
ois.readObject();
+
+                                       
leaderElectionEventHandler.onLeaderInformationChange(
+                                               
LeaderInformation.known(leaderSessionID, leaderAddress));
+                                       return;
+                               }
+                       }
+                       
leaderElectionEventHandler.onLeaderInformationChange(LeaderInformation.empty());
+               }
+       }
+
+       /**
+        * Writes the current leader's address as well the given leader session 
ID to ZooKeeper.
+        */
+       @Override
+       public void writeLeaderInformation(LeaderInformation leaderInformation) 
{
+               assert(running);
+               // this method does not have to be synchronized because the 
curator framework client
+               // is thread-safe. We do not write the empty data to ZooKeeper 
here. Because check-leadership-and-update
+               // is not a transactional operation. We may wrongly clear the 
data written by new leader.
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Write leader information: {}.", 
leaderInformation);
+               }
+               if (leaderInformation.equals(LeaderInformation.empty())) {
+                       return;

Review comment:
       I will add more description on the interface 
`LeaderElectionDriver#writeLeaderInformation`.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
##########
@@ -520,6 +519,47 @@ public void testEphemeralZooKeeperNodes() throws Exception 
{
                }
        }
 
+       @Test
+       public void testNotLeader() throws Exception {
+
+               final TestingLeaderElectionEventHandler electionEventHandler =
+                       new TestingLeaderElectionEventHandler(TEST_LEADER);
+               final TestingLeaderRetrievalEventHandler retrievalEventHandler 
= new TestingLeaderRetrievalEventHandler();
+               LeaderElectionDriver leaderElectionDriver = null;
+               LeaderRetrievalDriver leaderRetrievalDriver = null;
+               try {
+
+                       leaderElectionDriver =
+                               
ZooKeeperUtils.createLeaderElectionDriverFactory(client, configuration)
+                                       
.createLeaderElectionDriver(electionEventHandler, 
electionEventHandler::handleError, TEST_URL);
+                       electionEventHandler.init(leaderElectionDriver);
+
+                       electionEventHandler.waitForLeader(timeout);
+                       
assertThat(electionEventHandler.getConfirmedLeaderInformation(), 
is(TEST_LEADER));
+
+                       // Leader is revoked
+                       ((ZooKeeperLeaderElectionDriver) 
leaderElectionDriver).notLeader();
+                       electionEventHandler.waitForRevokeLeader(timeout);
+                       
assertThat(electionEventHandler.getConfirmedLeaderInformation(), 
is(LeaderInformation.empty()));
+                       // The data on ZooKeeper it not be cleared

Review comment:
       Since in the currently ZooKeeper HA implementation, check the leadership 
and then update the ZooKeeper node(aka clean up the leader information here) is 
not a transactional operation. So we may wrongly update the ZooKeeper node with 
empty data. What I have done is do nothing and the ephemeral node disappear 
automatically.
   
   I add this test to verify this behavior and make sure when the behavior 
changed we could find it.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
##########
@@ -520,6 +519,47 @@ public void testEphemeralZooKeeperNodes() throws Exception 
{
                }
        }
 
+       @Test
+       public void testNotLeader() throws Exception {
+
+               final TestingLeaderElectionEventHandler electionEventHandler =
+                       new TestingLeaderElectionEventHandler(TEST_LEADER);
+               final TestingLeaderRetrievalEventHandler retrievalEventHandler 
= new TestingLeaderRetrievalEventHandler();
+               LeaderElectionDriver leaderElectionDriver = null;
+               LeaderRetrievalDriver leaderRetrievalDriver = null;
+               try {
+
+                       leaderElectionDriver =
+                               
ZooKeeperUtils.createLeaderElectionDriverFactory(client, configuration)
+                                       
.createLeaderElectionDriver(electionEventHandler, 
electionEventHandler::handleError, TEST_URL);
+                       electionEventHandler.init(leaderElectionDriver);
+
+                       electionEventHandler.waitForLeader(timeout);
+                       
assertThat(electionEventHandler.getConfirmedLeaderInformation(), 
is(TEST_LEADER));
+
+                       // Leader is revoked
+                       ((ZooKeeperLeaderElectionDriver) 
leaderElectionDriver).notLeader();
+                       electionEventHandler.waitForRevokeLeader(timeout);
+                       
assertThat(electionEventHandler.getConfirmedLeaderInformation(), 
is(LeaderInformation.empty()));
+                       // The data on ZooKeeper it not be cleared

Review comment:
       Since in the current ZooKeeper HA implementation, check the leadership 
and then update the ZooKeeper node(aka clean up the leader information here) is 
not a transactional operation. So we may wrongly update the ZooKeeper node with 
empty data. What I have done is do nothing and the ephemeral node disappear 
automatically.
   
   I add this test to verify this behavior and make sure when the behavior 
changed we could find it.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
##########
@@ -520,6 +519,47 @@ public void testEphemeralZooKeeperNodes() throws Exception 
{
                }
        }
 
+       @Test
+       public void testNotLeader() throws Exception {
+
+               final TestingLeaderElectionEventHandler electionEventHandler =
+                       new TestingLeaderElectionEventHandler(TEST_LEADER);
+               final TestingLeaderRetrievalEventHandler retrievalEventHandler 
= new TestingLeaderRetrievalEventHandler();
+               LeaderElectionDriver leaderElectionDriver = null;
+               LeaderRetrievalDriver leaderRetrievalDriver = null;
+               try {
+
+                       leaderElectionDriver =
+                               
ZooKeeperUtils.createLeaderElectionDriverFactory(client, configuration)
+                                       
.createLeaderElectionDriver(electionEventHandler, 
electionEventHandler::handleError, TEST_URL);
+                       electionEventHandler.init(leaderElectionDriver);
+
+                       electionEventHandler.waitForLeader(timeout);
+                       
assertThat(electionEventHandler.getConfirmedLeaderInformation(), 
is(TEST_LEADER));
+
+                       // Leader is revoked
+                       ((ZooKeeperLeaderElectionDriver) 
leaderElectionDriver).notLeader();
+                       electionEventHandler.waitForRevokeLeader(timeout);
+                       
assertThat(electionEventHandler.getConfirmedLeaderInformation(), 
is(LeaderInformation.empty()));
+                       // The data on ZooKeeper it not be cleared

Review comment:
       Since in the current ZooKeeper HA implementation, check the leadership 
and then update the ZooKeeper node(aka clean up the leader information here) is 
not a transactional operation. So we may wrongly update the ZooKeeper node with 
empty data. What I have done is doing nothing and the ephemeral node disappear 
automatically.
   
   I add this test to verify this behavior and make sure when the behavior 
changed we could find it.

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientITCase.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient;
+
+import org.apache.flink.kubernetes.KubernetesResource;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+
+import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * IT Tests for {@link 
org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient} with real K8s 
server and client.
+ */
+public class Fabric8FlinkKubeClientITCase {
+
+       @ClassRule
+       public static KubernetesResource kubernetesResource = new 
KubernetesResource();
+
+       private static final String TEST_CONFIG_MAP_NAME = "test-config-map";
+
+       private static final long TIMEOUT = 120L * 1000L;
+
+       private static final Map<String, String> data = new HashMap<String, 
String>() {
+               {
+                       put("key1", "0");
+                       put("key2", "0");
+                       put("key3", "0");
+               }
+       };
+
+       private FlinkKubeClient flinkKubeClient;
+
+       @Before
+       public void setup() throws Exception {
+               flinkKubeClient = kubernetesResource.getFlinkKubeClient();
+               flinkKubeClient.createConfigMap(new KubernetesConfigMap(
+                       new ConfigMapBuilder()
+                               .withNewMetadata()
+                               .withName(TEST_CONFIG_MAP_NAME)
+                               .endMetadata()
+                               .withData(data)
+                               .build())).get();
+       }
+
+       @After
+       public void teardown() throws Exception {
+               flinkKubeClient.deleteConfigMap(TEST_CONFIG_MAP_NAME).get();
+       }
+
+       /**
+        * {@link 
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient#checkAndUpdateConfigMap} 
is a transactional
+        * operation, we should definitely guarantee that the concurrent 
modification could work.
+        */
+       @Test
+       public void testCheckAndUpdateConfigMapConcurrently() throws Exception {
+               // Start multiple instances to update ConfigMap concurrently
+               final List<CompletableFuture<Void>> futures = new ArrayList<>();
+               final int target = 10;
+               final int updateIntervalMs = 100;
+               for (String key : data.keySet()) {
+                       
futures.add(FutureUtils.runAfterwardsAsync(FutureUtils.completedVoidFuture(), 
() -> {

Review comment:
       Because we assume to increase the number one by one and check the target 
finally. Then we have to call `flinkKubeClient.checkAndUpdateConfigMap().get` 
and it will throw exception here. We need to execute a `RunnableWithException` 
not `Runnable`. Right?

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
##########
@@ -249,4 +271,131 @@ public void testStopAndCleanupCluster() throws Exception {
                this.flinkKubeClient.stopAndCleanupCluster(CLUSTER_ID);
                
assertTrue(this.kubeClient.apps().deployments().inNamespace(NAMESPACE).list().getItems().isEmpty());
        }
+
+       @Test
+       public void testCreateConfigMap() throws Exception {
+               final KubernetesConfigMap configMap = buildTestingConfigMap();
+               this.flinkKubeClient.createConfigMap(configMap).get();
+               final Optional<KubernetesConfigMap> currentOpt = 
this.flinkKubeClient.getConfigMap(TESTING_CONFIG_MAP_NAME);
+               assertThat(currentOpt.isPresent(), is(true));
+               
assertThat(currentOpt.get().getData().get(TESTING_CONFIG_MAP_KEY), 
is(TESTING_CONFIG_MAP_VALUE));
+       }
+
+       @Test
+       public void testCreateConfigMapAlreadyExisting() throws Exception {
+               final KubernetesConfigMap configMap = buildTestingConfigMap();
+               this.flinkKubeClient.createConfigMap(configMap).get();
+
+               
mockCreateConfigMapAlreadyExisting(configMap.getInternalResource());
+               configMap.getData().put(TESTING_CONFIG_MAP_KEY, 
TESTING_CONFIG_MAP_NEW_VALUE);
+               try {
+                       this.flinkKubeClient.createConfigMap(configMap).get();
+                       fail("Exception should be thrown.");
+               } catch (Exception ex) {
+                       assertThat(ex.getMessage(),
+                               containsString("Failed to create ConfigMap " + 
TESTING_CONFIG_MAP_NAME));

Review comment:
       I will fix all the `assert` with exception in this pattern in this PR 
and following PRs.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import 
org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.extended.leaderelection.LeaderCallbacks;
+import 
io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectionConfigBuilder;
+import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector;
+import 
io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.ConfigMapLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Represent {@link KubernetesLeaderElector} in kubernetes. {@link 
LeaderElector#run()} is a blocking call. It should be
+ * run in the IO executor, not the main thread. The lifecycle is bound to 
single leader election. Once the leadership
+ * is revoked, as well as the {@link LeaderCallbackHandler#notLeader()} is 
called, the {@link LeaderElector#run()} will
+ * finish. To start another round of election, we need to trigger again.
+ *
+ * <p>{@link LeaderElector#run()} is responsible for creating the leader 
ConfigMap and continuously update the
+ * annotation. The annotation key is {@link #LEADER_ANNOTATION_KEY} and the 
value is in the following json format.
+ * metadata:
+ *   annotations:
+ *     control-plane.alpha.kubernetes.io/leader: 
'{"holderIdentity":"623e39fb-70c3-44f1-811f-561ec4a28d75","leaseDuration":15.000000000,"acquireTime":"2020-10-20T04:06:31.431000Z","renewTime":"2020-10-22T08:51:36.843000Z","leaderTransitions":37981}'
+ */
+public class KubernetesLeaderElector extends 
LeaderElector<NamespacedKubernetesClient> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesLeaderElector.class);
+       @VisibleForTesting
+       public static final String LEADER_ANNOTATION_KEY = 
"control-plane.alpha.kubernetes.io/leader";
+
+       private final ExecutorService executorService = 
Executors.newSingleThreadExecutor(
+               new 
ExecutorThreadFactory("KubernetesLeaderElector-ExecutorService"));
+
+       public KubernetesLeaderElector(
+                       NamespacedKubernetesClient kubernetesClient,
+                       String namespace,
+                       KubernetesLeaderElectionConfiguration leaderConfig,
+                       LeaderCallbackHandler leaderCallbackHandler) {
+               super(kubernetesClient, new LeaderElectionConfigBuilder()
+                       .withName(leaderConfig.getConfigMapName())
+                       .withLeaseDuration(leaderConfig.getLeaseDuration())
+                       .withLock(new ConfigMapLock(namespace, 
leaderConfig.getConfigMapName(), leaderConfig.getLockIdentity()))
+                       .withRenewDeadline(leaderConfig.getRenewDeadline())
+                       .withRetryPeriod(leaderConfig.getRetryPeriod())
+                       .withLeaderCallbacks(new LeaderCallbacks(
+                               leaderCallbackHandler::isLeader,
+                               leaderCallbackHandler::notLeader,
+                               newLeader -> LOG.info("New leader elected {} 
for {}.", newLeader, leaderConfig.getConfigMapName())
+                       ))
+                       .build());
+               LOG.info("Create KubernetesLeaderElector {} with lock identity 
{}.",
+                       leaderConfig.getConfigMapName(), 
leaderConfig.getLockIdentity());
+       }
+
+       @Override
+       public void run() {
+               CompletableFuture.runAsync(super::run, executorService);

Review comment:
       We do not expect to have exception in the `LeaderElector#run`. 
Otherwise, it is fatal error handled by `FatalExitExceptionHandler`.

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElectorITCase.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.KubernetesResource;
+import 
org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
+import org.apache.flink.kubernetes.kubeclient.DefaultKubeClientFactory;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.KubeClientFactory;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+
+/**
+ * IT Tests for the {@link KubernetesLeaderElector}. Start multiple leader 
contenders currently, one should elect
+ * successfully. And if current leader dies, a new one could take over.
+ */
+public class KubernetesLeaderElectorITCase {
+
+       @ClassRule
+       public static KubernetesResource kubernetesResource = new 
KubernetesResource();
+
+       private static final long TIMEOUT = 120L * 1000L;
+
+       private final KubeClientFactory kubeClientFactory = new 
DefaultKubeClientFactory();
+
+       private static final String LEADER_CONFIGMAP_NAME_PREFIX = 
"leader-test-cluster";
+
+       @Test
+       public void testMultipleKubernetesLeaderElectors() throws Exception {
+               final Configuration configuration = 
kubernetesResource.getConfiguration();
+               final ExecutorService executorService = 
kubernetesResource.getExecutorService();
+
+               final BlockingQueue<String> leaderStore = new 
LinkedBlockingQueue<>();
+               final String leaderConfigMapName = LEADER_CONFIGMAP_NAME_PREFIX 
+ System.currentTimeMillis();
+               final int leaderNum = 3;
+
+               final KubernetesLeaderElector[] leaderElectors = new 
KubernetesLeaderElector[leaderNum];
+               // We use different Kubernetes clients for different leader 
electors.
+               final FlinkKubeClient[] kubeClients = new 
FlinkKubeClient[leaderNum];
+               final TestingLeaderCallbackHandler[] leaderCallbackHandlers = 
new TestingLeaderCallbackHandler[leaderNum];
+
+               try {
+                       for (int i = 0; i < leaderNum; i++) {
+                               kubeClients[i] = 
kubeClientFactory.fromConfiguration(configuration, executorService);
+                               leaderCallbackHandlers[i] = new 
TestingLeaderCallbackHandler(leaderStore, UUID.randomUUID().toString());
+                               final KubernetesLeaderElectionConfiguration 
leaderConfig = new KubernetesLeaderElectionConfiguration(
+                                       leaderConfigMapName, 
leaderCallbackHandlers[i].getLockIdentity(), configuration);
+                               leaderElectors[i] = 
kubeClients[i].createLeaderElector(leaderConfig, leaderCallbackHandlers[i]);
+
+                               // Start the leader electors to contend the 
leader
+                               leaderElectors[i].run();
+                       }
+
+                       // Wait for the first leader
+                       final String firstLockIdentity = 
leaderStore.poll(TIMEOUT, TimeUnit.MILLISECONDS);
+
+                       for (int i = 0; i < leaderNum; i++) {
+                               if 
(leaderCallbackHandlers[i].getLockIdentity().equals(firstLockIdentity)) {
+                                       // Check the callback is called.
+                                       
assertThat(leaderCallbackHandlers[i].hasLeadership(), is(true));
+                                       // Current leader died
+                                       leaderElectors[i].stop();

Review comment:
       From the following piece of codes, we could find that `notLeader` will 
be called when the renew timeout of is interrupted. I will update the test to 
also verify the `notLeader` is called.
   
   ```
     /**
      * Starts the leader election loop
      */
     public void run() {
       LOGGER.debug("Leader election started");
       if (!acquire()) {
         return;
       }
       leaderElectionConfig.getLeaderCallbacks().onStartLeading();
       renewWithTimeout();
       leaderElectionConfig.getLeaderCallbacks().onStopLeading();
     }
   ```

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/TestingLeaderCallbackHandler.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import java.util.concurrent.BlockingQueue;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Testing implementation for {@link 
KubernetesLeaderElector.LeaderCallbackHandler}.
+ */
+public class TestingLeaderCallbackHandler extends 
KubernetesLeaderElector.LeaderCallbackHandler {
+
+       private final BlockingQueue<String> leaderStore;
+       private final String lockIdentity;
+       private boolean isLeader;
+
+       public TestingLeaderCallbackHandler(BlockingQueue<String> leaderStore, 
String lockIdentity) {
+               this.leaderStore = leaderStore;
+               this.lockIdentity = lockIdentity;
+       }
+
+       @Override
+       public void isLeader() {
+               isLeader = true;
+               leaderStore.poll();
+               leaderStore.offer(lockIdentity);
+               assertThat(leaderStore.size(), is(1));

Review comment:
       Make sense. I will make the queue private and add `waitForXXX` method.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriver.java
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import 
org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
+import 
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.leaderelection.LeaderElectionDriver;
+import org.apache.flink.runtime.leaderelection.LeaderElectionEventHandler;
+import org.apache.flink.runtime.leaderelection.LeaderElectionException;
+import org.apache.flink.runtime.leaderelection.LeaderInformation;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+
+import static 
org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
+import static org.apache.flink.kubernetes.utils.Constants.LEADER_ADDRESS_KEY;
+import static 
org.apache.flink.kubernetes.utils.Constants.LEADER_SESSION_ID_KEY;
+import static 
org.apache.flink.kubernetes.utils.KubernetesUtils.checkConfigMaps;
+import static 
org.apache.flink.kubernetes.utils.KubernetesUtils.getLeaderInformationFromConfigMap;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link LeaderElectionDriver} implementation for Kubernetes. The active 
leader is elected using Kubernetes.
+ * The current leader's address as well as its leader session ID is published 
via Kubernetes ConfigMap.
+ * Note that the contending lock and leader storage are using the same 
ConfigMap. And every component(e.g.
+ * ResourceManager, Dispatcher, RestEndpoint, JobManager for each job) will 
have a separate ConfigMap.
+ */
+public class KubernetesLeaderElectionDriver implements LeaderElectionDriver {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesLeaderElectionDriver.class);
+
+       private final FlinkKubeClient kubeClient;
+
+       private final String configMapName;
+
+       private final String lockIdentity;
+
+       private final KubernetesLeaderElector leaderElector;
+
+       // Labels will be used to clean up the ha related ConfigMaps.
+       private final Map<String, String> configMapLabels;
+
+       private final LeaderElectionEventHandler leaderElectionEventHandler;
+
+       private final KubernetesWatch kubernetesWatch;
+
+       private final FatalErrorHandler fatalErrorHandler;
+
+       private volatile boolean running;
+
+       public KubernetesLeaderElectionDriver(
+                       FlinkKubeClient kubeClient,
+                       KubernetesLeaderElectionConfiguration leaderConfig,
+                       LeaderElectionEventHandler leaderElectionEventHandler,
+                       FatalErrorHandler fatalErrorHandler) {
+
+               this.kubeClient = checkNotNull(kubeClient, "Kubernetes client");
+               checkNotNull(leaderConfig, "Leader election configuration");
+               this.leaderElectionEventHandler = 
checkNotNull(leaderElectionEventHandler, "LeaderElectionEventHandler");
+               this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
+
+               this.configMapName = leaderConfig.getConfigMapName();
+               this.lockIdentity = leaderConfig.getLockIdentity();
+               this.leaderElector = 
kubeClient.createLeaderElector(leaderConfig, new LeaderCallbackHandlerImpl());
+               this.configMapLabels = KubernetesUtils.getConfigMapLabels(
+                       leaderConfig.getClusterId(), 
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY);
+
+               leaderElector.run();
+               kubernetesWatch = kubeClient.watchConfigMaps(configMapName, new 
ConfigMapCallbackHandlerImpl());
+               running = true;
+       }
+
+       @Override
+       public void close() {
+               if (!running) {
+                       return;
+               }
+               running = false;
+
+               LOG.info("Closing {}.", this);
+               leaderElector.stop();
+               kubernetesWatch.close();
+       }
+
+       @Override
+       public void writeLeaderInformation(LeaderInformation leaderInformation) 
{
+               assert(running);
+               final UUID confirmedLeaderSessionID = 
leaderInformation.getLeaderSessionID();
+               final String confirmedLeaderAddress = 
leaderInformation.getLeaderAddress();
+               try {
+                       kubeClient.checkAndUpdateConfigMap(
+                               configMapName,
+                               configMap -> {
+                                       if 
(KubernetesLeaderElector.hasLeadership(configMap, lockIdentity)) {
+                                               // Get the updated ConfigMap 
with new leader information
+                                               if (confirmedLeaderAddress == 
null) {
+                                                       
configMap.getData().remove(LEADER_ADDRESS_KEY);
+                                               } else {
+                                                       
configMap.getData().put(LEADER_ADDRESS_KEY, confirmedLeaderAddress);
+                                               }
+                                               if (confirmedLeaderSessionID == 
null) {
+                                                       
configMap.getData().remove(LEADER_SESSION_ID_KEY);
+                                               } else {
+                                                       
configMap.getData().put(LEADER_SESSION_ID_KEY, 
confirmedLeaderSessionID.toString());
+                                               }
+                                               
configMap.getLabels().putAll(configMapLabels);
+                                               return Optional.of(configMap);
+                                       }
+                                       return Optional.empty();
+                               }).get();
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug(
+                                       "Successfully wrote leader information: 
Leader={}, session ID={}.",
+                                       confirmedLeaderAddress,
+                                       confirmedLeaderSessionID);
+                       }
+               } catch (Exception e) {
+                       fatalErrorHandler.onFatalError(
+                               new KubernetesException("Could not write leader 
information since ConfigMap " + configMapName
+                                       + " does not exist.", e));
+               }
+       }
+
+       @Override
+       public boolean hasLeadership() {
+               assert(running);
+               final Optional<KubernetesConfigMap> configMapOpt = 
kubeClient.getConfigMap(configMapName);
+               if (configMapOpt.isPresent()) {
+                       return 
KubernetesLeaderElector.hasLeadership(configMapOpt.get(), lockIdentity);
+               } else {
+                       fatalErrorHandler.onFatalError(
+                               new KubernetesException("ConfigMap " + 
configMapName + " does not exist.", null));
+                       return false;
+               }
+       }
+
+       private class LeaderCallbackHandlerImpl extends 
KubernetesLeaderElector.LeaderCallbackHandler {
+
+               @Override
+               public void isLeader() {
+                       leaderElectionEventHandler.onGrantLeadership();
+               }
+
+               @Override
+               public void notLeader() {
+                       leaderElectionEventHandler.onRevokeLeadership();
+                       // Continue to contend the leader
+                       leaderElector.run();
+               }
+       }
+
+       private class ConfigMapCallbackHandlerImpl implements 
FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap> {
+               @Override
+               public void onAdded(List<KubernetesConfigMap> configMaps) {
+                       // noop
+               }
+
+               @Override
+               public void onModified(List<KubernetesConfigMap> configMaps) {
+                       // We should only receive events for the watched 
ConfigMap
+                       final KubernetesConfigMap configMap = 
checkConfigMaps(configMaps, configMapName);
+
+                       if (KubernetesLeaderElector.hasLeadership(configMap, 
lockIdentity)) {
+                               
leaderElectionEventHandler.onLeaderInformationChange(getLeaderInformationFromConfigMap(configMap));

Review comment:
       Copy what I have added in the description of 
`LeaderElectionEventHandler#onLeaderInformationChange`. Does it make sense? Or 
you still believe that we should keep a local cache in 
`KubernetesLeaderElectionDriver` to check whether the leader information is 
truly changed.
   
   ```
   Duplicated leader change events could happen, so the implementation should 
check whether the passed leader information is really different with internal 
confirmed leader information.
   ```

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriverTest.java
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.runtime.leaderelection.LeaderInformation;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+
+import static 
org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
+import static 
org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_KEY;
+import static org.apache.flink.kubernetes.utils.Constants.LEADER_ADDRESS_KEY;
+import static 
org.apache.flink.kubernetes.utils.Constants.LEADER_SESSION_ID_KEY;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link KubernetesLeaderElectionDriver}.
+ */
+public class KubernetesLeaderElectionDriverTest extends 
KubernetesHighAvailabilityTestBase {
+
+       @Test
+       public void testIsLeader() throws Exception {
+               new Context() {{
+                       runTest(
+                               () -> {
+                                       // Grant leadership
+                                       leaderCallbackGrantLeadership();
+                                       
assertThat(electionEventHandler.isLeader(), is(true));
+                                       
assertThat(electionEventHandler.getConfirmedLeaderInformation(), 
is(LEADER_INFORMATION));
+                               });
+               }};
+       }
+
+       @Test
+       public void testNotLeader() throws Exception {
+               new Context() {{
+                       runTest(
+                               () -> {
+                                       leaderCallbackGrantLeadership();
+                                       // Revoke leadership
+                                       getLeaderCallback().notLeader();
+
+                                       
electionEventHandler.waitForRevokeLeader(TIMEOUT);
+                                       
assertThat(electionEventHandler.isLeader(), is(false));
+                                       
assertThat(electionEventHandler.getConfirmedLeaderInformation(), 
is(LeaderInformation.empty()));
+                                       // The ConfigMap should also be cleared
+                                       
assertThat(getLeaderConfigMap().getData().get(LEADER_ADDRESS_KEY), 
is(nullValue()));
+                                       
assertThat(getLeaderConfigMap().getData().get(LEADER_SESSION_ID_KEY), 
is(nullValue()));

Review comment:
       Because we do not have new leader elected and the lock identity 
annotation on the ConfigMap is still the old one. So the 
`KubernetesLeaderElector#hasLeadership` could pass the check. I think this is 
the expected behavior. Once the leader ConfigMap is over written by a new 
leader. It should return false.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Objects;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Default implementation for leader election service. Composed with different 
{@link LeaderElectionDriver}, we could
+ * perform a leader election for the contender, and then persist the leader 
information to various storage.
+ */
+public class DefaultLeaderElectionService implements LeaderElectionService, 
LeaderElectionEventHandler {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultLeaderElectionService.class);
+
+       private final Object lock = new Object();
+
+       private final LeaderElectionDriverFactory leaderElectionDriverFactory;
+
+       /** The leader contender which applies for leadership. */
+       @GuardedBy("lock")
+       private volatile LeaderContender leaderContender;
+
+       @GuardedBy("lock")
+       private volatile UUID issuedLeaderSessionID;
+
+       @GuardedBy("lock")
+       private volatile UUID confirmedLeaderSessionID;
+
+       @GuardedBy("lock")
+       private volatile String confirmedLeaderAddress;
+
+       @GuardedBy("lock")
+       private volatile boolean running;
+
+       @GuardedBy("lock")
+       private LeaderElectionDriver leaderElectionDriver;
+
+       public DefaultLeaderElectionService(LeaderElectionDriverFactory 
leaderElectionDriverFactory) {
+               this.leaderElectionDriverFactory = 
checkNotNull(leaderElectionDriverFactory);
+
+               leaderContender = null;
+
+               issuedLeaderSessionID = null;
+               confirmedLeaderSessionID = null;
+               confirmedLeaderAddress = null;
+
+               running = false;
+       }
+
+       @Override
+       public final void start(LeaderContender contender) throws Exception {
+               checkNotNull(contender, "Contender must not be null.");
+               Preconditions.checkState(leaderContender == null, "Contender 
was already set.");
+
+               synchronized (lock) {
+                       leaderContender = contender;
+                       leaderElectionDriver = 
leaderElectionDriverFactory.createLeaderElectionDriver(
+                               this, new LeaderElectionFatalErrorHandler(), 
leaderContender.getDescription());
+                       LOG.info("Starting DefaultLeaderElectionService with 
{}.", leaderElectionDriver);
+
+                       running = true;
+               }
+       }
+
+       @Override
+       public final void stop() throws Exception {
+               LOG.info("Stopping DefaultLeaderElectionService.");
+
+               synchronized (lock) {
+                       if (!running) {
+                               return;
+                       }
+                       running = false;
+                       clearConfirmedLeaderInformation();
+                       leaderElectionDriver.close();
+               }
+       }
+
+       @Override
+       public void confirmLeadership(UUID leaderSessionID, String 
leaderAddress) {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug(
+                               "Confirm leader session ID {} for leader {}.",
+                               leaderSessionID,
+                               leaderAddress);
+               }
+
+               checkNotNull(leaderSessionID);
+
+               synchronized (lock) {
+                       if (hasLeadership(leaderSessionID)) {
+                               if (running) {
+                                       
confirmLeaderInformation(leaderSessionID, leaderAddress);
+                               } else {
+                                       if (LOG.isDebugEnabled()) {
+                                               LOG.debug("Ignoring the leader 
session Id {} confirmation, since the " +
+                                                       "LeaderElectionService 
has already been stopped.", leaderSessionID);
+                                       }
+                               }
+                       } else {
+                               // Received an old confirmation call
+                               if 
(!leaderSessionID.equals(this.issuedLeaderSessionID)) {
+                                       if (LOG.isDebugEnabled()) {
+                                               LOG.debug("Receive an old 
confirmation call of leader session ID {}, " +
+                                                       "current issued session 
ID is {}", leaderSessionID, issuedLeaderSessionID);
+                                       }
+                               } else {
+                                       LOG.warn("The leader session ID {} was 
confirmed even though the " +
+                                               "corresponding JobManager was 
not elected as the leader.", leaderSessionID);
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
+               synchronized (lock) {
+                       if (running) {
+                               return leaderElectionDriver.hasLeadership() && 
leaderSessionId.equals(issuedLeaderSessionID);
+                       } else {
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("hasLeadership is called 
after the service is stopped, returning false.");
+                               }
+                               return false;
+                       }
+               }
+       }
+
+       /**
+        * Returns the current leader session ID or null, if the contender is 
not the leader.
+        *
+        * @return The last leader session ID or null, if the contender is not 
the leader
+        */
+       @VisibleForTesting
+       @Nullable
+       public UUID getLeaderSessionID() {
+               return confirmedLeaderSessionID;
+       }
+
+       @GuardedBy("lock")
+       private void confirmLeaderInformation(UUID leaderSessionID, String 
leaderAddress) {
+               confirmedLeaderSessionID = leaderSessionID;
+               confirmedLeaderAddress = leaderAddress;
+               leaderElectionDriver.writeLeaderInformation(
+                       LeaderInformation.known(confirmedLeaderSessionID, 
confirmedLeaderAddress));
+       }
+
+       @GuardedBy("lock")
+       private void clearConfirmedLeaderInformation() {
+               confirmedLeaderSessionID = null;
+               confirmedLeaderAddress = null;
+       }
+
+       @Override
+       @GuardedBy("lock")
+       public void onGrantLeadership() {
+               synchronized (lock) {
+                       if (running) {
+                               issuedLeaderSessionID = UUID.randomUUID();
+                               clearConfirmedLeaderInformation();
+
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug(
+                                               "Grant leadership to contender 
{} with session ID {}.",
+                                               
leaderContender.getDescription(),
+                                               issuedLeaderSessionID);
+                               }
+
+                               
leaderContender.grantLeadership(issuedLeaderSessionID);
+                       } else {
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("Ignoring the grant 
leadership notification since the {} has " +
+                                               "already been closed.", 
leaderElectionDriver);
+                               }
+                       }
+               }
+       }
+
+       @Override
+       @GuardedBy("lock")
+       public void onRevokeLeadership() {
+               synchronized (lock) {
+                       if (running) {
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug(
+                                               "Revoke leadership of {} 
({}@{}).",
+                                               
leaderContender.getDescription(),
+                                               confirmedLeaderSessionID,
+                                               confirmedLeaderAddress);
+                               }
+
+                               issuedLeaderSessionID = null;
+                               clearConfirmedLeaderInformation();
+
+                               leaderContender.revokeLeadership();
+
+                               // Clear the old leader information on the 
external storage
+                               
leaderElectionDriver.writeLeaderInformation(LeaderInformation.empty());

Review comment:
       The reason why I want to do this is to process the following situation.
   * Old leader lost leadership and new leader is not launched very fast(maybe 
120s or longer).
   * Old leader should clear the leader information on ConfigMap so that the 
listener could know this event.
   
   For Kubernetes, it could work. Because no new leader has been elected and 
the lock identity annotation is still the old one.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to