This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 0e0a980 [FLINK-12056][coordination] Remove legacy methods in LeaderRetrievalUtils 0e0a980 is described below commit 0e0a980411ba5aceac8b4733242647f3777c899d Author: Zili Chen <wander4...@gmail.com> AuthorDate: Tue Apr 2 17:38:56 2019 +0800 [FLINK-12056][coordination] Remove legacy methods in LeaderRetrievalUtils --- .../flink/runtime/util/LeaderRetrievalUtils.java | 116 --------------------- 1 file changed, 116 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java index 9c8f7bd..69dfbf7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java @@ -22,10 +22,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; @@ -33,10 +30,6 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.net.ConnectionUtils; import org.apache.flink.util.FlinkException; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.dispatch.Mapper; -import akka.dispatch.OnComplete; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,41 +47,6 @@ import scala.concurrent.duration.FiniteDuration; public class LeaderRetrievalUtils { private static final Logger LOG = LoggerFactory.getLogger(LeaderRetrievalUtils.class); - - /** - * Retrieves the current leader gateway using the given {@link LeaderRetrievalService}. If the - * current leader could not be retrieved after the given timeout, then a - * {@link LeaderRetrievalException} is thrown. - * - * @param leaderRetrievalService {@link LeaderRetrievalService} which is used for the leader retrieval - * @param actorSystem ActorSystem which is used for the {@link LeaderRetrievalListener} implementation - * @param timeout Timeout value for the retrieval call - * @return The current leader gateway - * @throws LeaderRetrievalException If the actor gateway could not be retrieved or the timeout has been exceeded - */ - public static ActorGateway retrieveLeaderGateway( - LeaderRetrievalService leaderRetrievalService, - ActorSystem actorSystem, - FiniteDuration timeout) - throws LeaderRetrievalException { - LeaderGatewayListener listener = new LeaderGatewayListener(actorSystem, timeout); - - try { - leaderRetrievalService.start(listener); - - Future<ActorGateway> actorGatewayFuture = listener.getActorGatewayFuture(); - - return Await.result(actorGatewayFuture, timeout); - } catch (Exception e) { - throw new LeaderRetrievalException("Could not retrieve the leader gateway.", e); - } finally { - try { - leaderRetrievalService.stop(); - } catch (Exception fe) { - LOG.warn("Could not stop the leader retrieval service.", fe); - } - } - } /** * Retrieves the leader akka url and the current leader session ID. The values are stored in a @@ -142,21 +100,6 @@ public class LeaderRetrievalUtils { } } - /** - * Retrieves the current leader session id of the component identified by the given leader - * retrieval service. - * - * @param leaderRetrievalService Leader retrieval service to be used for the leader retrieval - * @param timeout Timeout for the leader retrieval - * @return The leader session id of the retrieved leader - * @throws LeaderRetrievalException if the leader retrieval operation fails (including a timeout) - */ - public static UUID retrieveLeaderSessionId( - LeaderRetrievalService leaderRetrievalService, - FiniteDuration timeout) throws LeaderRetrievalException { - return retrieveLeaderConnectionInfo(leaderRetrievalService, timeout).getLeaderSessionID(); - } - public static InetAddress findConnectingAddress( LeaderRetrievalService leaderRetrievalService, Time timeout) throws LeaderRetrievalException { @@ -191,65 +134,6 @@ public class LeaderRetrievalUtils { } /** - * Helper class which is used by the retrieveLeaderGateway method as the - * {@link LeaderRetrievalListener}. - */ - public static class LeaderGatewayListener implements LeaderRetrievalListener { - - private final ActorSystem actorSystem; - private final FiniteDuration timeout; - private final Object lock = new Object(); - - private final Promise<ActorGateway> futureActorGateway = new scala.concurrent.impl.Promise.DefaultPromise<ActorGateway>(); - - public LeaderGatewayListener(ActorSystem actorSystem, FiniteDuration timeout) { - this.actorSystem = actorSystem; - this.timeout = timeout; - } - - private void completePromise(ActorGateway gateway) { - synchronized (lock) { - if (!futureActorGateway.isCompleted()) { - futureActorGateway.success(gateway); - } - } - } - - @Override - public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) { - if(leaderAddress != null && !leaderAddress.equals("") && !futureActorGateway.isCompleted()) { - AkkaUtils.getActorRefFuture(leaderAddress, actorSystem, timeout) - .map(new Mapper<ActorRef, ActorGateway>() { - public ActorGateway apply(ActorRef ref) { - return new AkkaActorGateway(ref, leaderSessionID); - } - }, actorSystem.dispatcher()) - .onComplete(new OnComplete<ActorGateway>() { - @Override - public void onComplete(Throwable failure, ActorGateway success) throws Throwable { - if (failure == null) { - completePromise(success); - } else { - LOG.debug("Could not retrieve the leader for address " + leaderAddress + ".", failure); - } - } - }, actorSystem.dispatcher()); - } - } - - @Override - public void handleError(Exception exception) { - if (!futureActorGateway.isCompleted()) { - futureActorGateway.failure(exception); - } - } - - public Future<ActorGateway> getActorGatewayFuture() { - return futureActorGateway.future(); - } - } - - /** * Helper class which is used by the retrieveLeaderConnectionInfo method to retrieve the * leader's akka URL and the current leader session ID. */