Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5312#discussion_r162696516 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -61,46 +69,77 @@ import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody; import org.apache.flink.runtime.rest.messages.queue.AsynchronouslyCreatedResource; import org.apache.flink.runtime.rest.messages.queue.QueueStatus; +import org.apache.flink.runtime.rest.util.RestClientException; import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedThrowable; -import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.CheckedSupplier; + +import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException; + +import akka.actor.AddressFromURIString; import javax.annotation.Nullable; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.MalformedURLException; import java.net.URL; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collectors; -import static java.util.Objects.requireNonNull; +import scala.Option; + +import static org.apache.flink.util.Preconditions.checkArgument; /** * A {@link ClusterClient} implementation that communicates via HTTP REST requests. */ public class RestClusterClient<T> extends ClusterClient<T> { + private static final long AWAIT_LEADER_TIMEOUT = 10_000; + + private static final int MAX_RETRIES = 20; + + private static final Time RETRY_DELAY = Time.seconds(3); --- End diff -- These values should be made configurable.
---