Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5312#discussion_r162696516
  
    --- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
    @@ -61,46 +69,77 @@
     import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody;
     import 
org.apache.flink.runtime.rest.messages.queue.AsynchronouslyCreatedResource;
     import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
    +import org.apache.flink.runtime.rest.util.RestClientException;
     import org.apache.flink.runtime.util.ExecutorThreadFactory;
    +import org.apache.flink.util.ExceptionUtils;
     import org.apache.flink.util.ExecutorUtils;
     import org.apache.flink.util.FlinkException;
     import org.apache.flink.util.Preconditions;
     import org.apache.flink.util.SerializedThrowable;
    -import org.apache.flink.util.function.SupplierWithException;
    +import org.apache.flink.util.function.CheckedSupplier;
    +
    +import 
org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
    +
    +import akka.actor.AddressFromURIString;
     
     import javax.annotation.Nullable;
     
     import java.io.IOException;
     import java.net.InetSocketAddress;
    +import java.net.MalformedURLException;
     import java.net.URL;
    -import java.util.ArrayList;
     import java.util.Collection;
     import java.util.Collections;
     import java.util.List;
    +import java.util.UUID;
     import java.util.concurrent.CompletableFuture;
     import java.util.concurrent.CompletionException;
     import java.util.concurrent.ExecutionException;
     import java.util.concurrent.ExecutorService;
     import java.util.concurrent.Executors;
    +import java.util.concurrent.ScheduledExecutorService;
     import java.util.concurrent.TimeUnit;
     import java.util.concurrent.TimeoutException;
    +import java.util.function.Predicate;
    +import java.util.function.Supplier;
    +import java.util.stream.Collectors;
     
    -import static java.util.Objects.requireNonNull;
    +import scala.Option;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
     
     /**
      * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
      */
     public class RestClusterClient<T> extends ClusterClient<T> {
     
    +   private static final long AWAIT_LEADER_TIMEOUT = 10_000;
    +
    +   private static final int MAX_RETRIES = 20;
    +
    +   private static final Time RETRY_DELAY = Time.seconds(3);
    --- End diff --
    
    These values should be made configurable.


---

Reply via email to