This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e0a980  [FLINK-12056][coordination] Remove legacy methods in 
LeaderRetrievalUtils
0e0a980 is described below

commit 0e0a980411ba5aceac8b4733242647f3777c899d
Author: Zili Chen <wander4...@gmail.com>
AuthorDate: Tue Apr 2 17:38:56 2019 +0800

    [FLINK-12056][coordination] Remove legacy methods in LeaderRetrievalUtils
---
 .../flink/runtime/util/LeaderRetrievalUtils.java   | 116 ---------------------
 1 file changed, 116 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
index 9c8f7bd..69dfbf7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
@@ -22,10 +22,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
@@ -33,10 +30,6 @@ import 
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.net.ConnectionUtils;
 import org.apache.flink.util.FlinkException;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.dispatch.Mapper;
-import akka.dispatch.OnComplete;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,41 +47,6 @@ import scala.concurrent.duration.FiniteDuration;
 public class LeaderRetrievalUtils {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(LeaderRetrievalUtils.class);
-       
-       /**
-        * Retrieves the current leader gateway using the given {@link 
LeaderRetrievalService}. If the
-        * current leader could not be retrieved after the given timeout, then a
-        * {@link LeaderRetrievalException} is thrown.
-        *
-        * @param leaderRetrievalService {@link LeaderRetrievalService} which 
is used for the leader retrieval
-        * @param actorSystem ActorSystem which is used for the {@link 
LeaderRetrievalListener} implementation
-        * @param timeout Timeout value for the retrieval call
-        * @return The current leader gateway
-        * @throws LeaderRetrievalException If the actor gateway could not be 
retrieved or the timeout has been exceeded
-        */
-       public static ActorGateway retrieveLeaderGateway(
-                       LeaderRetrievalService leaderRetrievalService,
-                       ActorSystem actorSystem,
-                       FiniteDuration timeout)
-               throws LeaderRetrievalException {
-               LeaderGatewayListener listener = new 
LeaderGatewayListener(actorSystem, timeout);
-
-               try {
-                       leaderRetrievalService.start(listener);
-
-                       Future<ActorGateway> actorGatewayFuture = 
listener.getActorGatewayFuture();
-
-                       return Await.result(actorGatewayFuture, timeout);
-               } catch (Exception e) {
-                       throw new LeaderRetrievalException("Could not retrieve 
the leader gateway.", e);
-               } finally {
-                       try {
-                               leaderRetrievalService.stop();
-                       } catch (Exception fe) {
-                               LOG.warn("Could not stop the leader retrieval 
service.", fe);
-                       }
-               }
-       }
 
        /**
         * Retrieves the leader akka url and the current leader session ID. The 
values are stored in a
@@ -142,21 +100,6 @@ public class LeaderRetrievalUtils {
                }
        }
 
-       /**
-        * Retrieves the current leader session id of the component identified 
by the given leader
-        * retrieval service.
-        *
-        * @param leaderRetrievalService Leader retrieval service to be used 
for the leader retrieval
-        * @param timeout Timeout for the leader retrieval
-        * @return The leader session id of the retrieved leader
-        * @throws LeaderRetrievalException if the leader retrieval operation 
fails (including a timeout)
-        */
-       public static UUID retrieveLeaderSessionId(
-                       LeaderRetrievalService leaderRetrievalService,
-                       FiniteDuration timeout) throws LeaderRetrievalException 
{
-               return retrieveLeaderConnectionInfo(leaderRetrievalService, 
timeout).getLeaderSessionID();
-       }
-
        public static InetAddress findConnectingAddress(
                        LeaderRetrievalService leaderRetrievalService,
                        Time timeout) throws LeaderRetrievalException {
@@ -191,65 +134,6 @@ public class LeaderRetrievalUtils {
        }
 
        /**
-        * Helper class which is used by the retrieveLeaderGateway method as the
-        * {@link LeaderRetrievalListener}.
-        */
-       public static class LeaderGatewayListener implements 
LeaderRetrievalListener {
-
-               private final ActorSystem actorSystem;
-               private final FiniteDuration timeout;
-               private final Object lock = new Object();
-
-               private final Promise<ActorGateway> futureActorGateway = new 
scala.concurrent.impl.Promise.DefaultPromise<ActorGateway>();
-
-               public LeaderGatewayListener(ActorSystem actorSystem, 
FiniteDuration timeout) {
-                       this.actorSystem = actorSystem;
-                       this.timeout = timeout;
-               }
-
-               private void completePromise(ActorGateway gateway) {
-                       synchronized (lock) {
-                               if (!futureActorGateway.isCompleted()) {
-                                       futureActorGateway.success(gateway);
-                               }
-                       }
-               }
-
-               @Override
-               public void notifyLeaderAddress(final String leaderAddress, 
final UUID leaderSessionID) {
-                       if(leaderAddress != null && !leaderAddress.equals("") 
&& !futureActorGateway.isCompleted()) {
-                               AkkaUtils.getActorRefFuture(leaderAddress, 
actorSystem, timeout)
-                                       .map(new Mapper<ActorRef, 
ActorGateway>() {
-                                               public ActorGateway 
apply(ActorRef ref) {
-                                                       return new 
AkkaActorGateway(ref, leaderSessionID);
-                                               }
-                                       }, actorSystem.dispatcher())
-                                       .onComplete(new 
OnComplete<ActorGateway>() {
-                                               @Override
-                                               public void 
onComplete(Throwable failure, ActorGateway success) throws Throwable {
-                                                       if (failure == null) {
-                                                               
completePromise(success);
-                                                       } else {
-                                                               
LOG.debug("Could not retrieve the leader for address " + leaderAddress + ".", 
failure);
-                                                       }
-                                               }
-                                       }, actorSystem.dispatcher());
-                       }
-               }
-
-               @Override
-               public void handleError(Exception exception) {
-                       if (!futureActorGateway.isCompleted()) {
-                               futureActorGateway.failure(exception);
-                       }
-               }
-
-               public Future<ActorGateway> getActorGatewayFuture() {
-                       return futureActorGateway.future();
-               }
-       }
-
-       /**
         * Helper class which is used by the retrieveLeaderConnectionInfo 
method to retrieve the
         * leader's akka URL and the current leader session ID.
         */

Reply via email to