This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 587fa2819e [rest] Add http conf and ExponentialHttpRetryInterceptor to 
handle retry In RESTCatalog (#4929)
587fa2819e is described below

commit 587fa2819eb2df36dd03ecbf3b96483472a610b6
Author: jerry <[email protected]>
AuthorDate: Fri Jan 17 13:09:33 2025 +0800

    [rest] Add http conf and ExponentialHttpRetryInterceptor to handle retry In 
RESTCatalog (#4929)
---
 .../rest/ExponentialHttpRetryInterceptor.java      | 175 +++++++++++++++++++++
 .../java/org/apache/paimon/rest/HttpClient.java    |  29 +++-
 .../org/apache/paimon/rest/HttpClientOptions.java  |  28 ++--
 .../org/apache/paimon/rest/RESTCatalogOptions.java |  18 ++-
 .../rest/ExponentialHttpRetryInterceptorTest.java  | 136 ++++++++++++++++
 .../org/apache/paimon/rest/HttpClientTest.java     |  14 +-
 6 files changed, 377 insertions(+), 23 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptor.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptor.java
new file mode 100644
index 0000000000..dd16e47fc5
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptor.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableSet;
+import org.apache.paimon.shade.guava30.com.google.common.net.HttpHeaders;
+
+import okhttp3.Interceptor;
+import okhttp3.Request;
+import okhttp3.Response;
+
+import javax.net.ssl.SSLException;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.ConnectException;
+import java.net.NoRouteToHostException;
+import java.net.UnknownHostException;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Defines exponential HTTP request retry interceptor.
+ *
+ * <p>The following retrievable IOException
+ *
+ * <ul>
+ *   <li>InterruptedIOException
+ *   <li>UnknownHostException
+ *   <li>ConnectException
+ *   <li>NoRouteToHostException
+ *   <li>SSLException
+ * </ul>
+ *
+ * <p>The following retrievable HTTP status codes are defined:
+ *
+ * <ul>
+ *   <li>TOO_MANY_REQUESTS (429)
+ *   <li>BAD_GATEWAY (502)
+ *   <li>SERVICE_UNAVAILABLE (503)
+ *   <li>GATEWAY_TIMEOUT (504)
+ * </ul>
+ *
+ * <p>The following retrievable HTTP method which is idempotent are defined:
+ *
+ * <ul>
+ *   <li>GET
+ *   <li>HEAD
+ *   <li>PUT
+ *   <li>DELETE
+ *   <li>TRACE
+ *   <li>OPTIONS
+ * </ul>
+ */
+public class ExponentialHttpRetryInterceptor implements Interceptor {
+
+    private final int maxRetries;
+    private final Set<Class<? extends IOException>> nonRetriableExceptions;
+    private final Set<Integer> retrievableCodes;
+    private final Set<String> retrievableMethods;
+
+    public ExponentialHttpRetryInterceptor(int maxRetries) {
+        this.maxRetries = maxRetries;
+        this.retrievableMethods =
+                ImmutableSet.of("GET", "HEAD", "PUT", "DELETE", "TRACE", 
"OPTIONS");
+        this.retrievableCodes = ImmutableSet.of(429, 502, 503, 504);
+        this.nonRetriableExceptions =
+                ImmutableSet.of(
+                        InterruptedIOException.class,
+                        UnknownHostException.class,
+                        ConnectException.class,
+                        NoRouteToHostException.class,
+                        SSLException.class);
+    }
+
+    @Override
+    public Response intercept(Chain chain) throws IOException {
+        Request request = chain.request();
+        Response response = null;
+
+        for (int retryCount = 1; ; retryCount++) {
+            try {
+                response = chain.proceed(request);
+            } catch (IOException e) {
+                if (needRetry(request.method(), e, retryCount)) {
+                    wait(response, retryCount);
+                    continue;
+                }
+            }
+            if (needRetry(response, retryCount)) {
+                if (response != null) {
+                    response.close();
+                }
+                wait(response, retryCount);
+            } else {
+                return response;
+            }
+        }
+    }
+
+    public boolean needRetry(Response response, int execCount) {
+        if (execCount > maxRetries) {
+            return false;
+        }
+        return response == null
+                || (!response.isSuccessful() && 
retrievableCodes.contains(response.code()));
+    }
+
+    public boolean needRetry(String method, IOException e, int execCount) {
+        if (execCount > maxRetries) {
+            return false;
+        }
+        if (!retrievableMethods.contains(method)) {
+            return false;
+        }
+        if (nonRetriableExceptions.contains(e.getClass())) {
+            return false;
+        } else {
+            for (Class<? extends IOException> rejectException : 
nonRetriableExceptions) {
+                if (rejectException.isInstance(e)) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
+    public long getRetryIntervalInMilliseconds(Response response, int 
execCount) {
+        // a server may send a 429 / 503 with a Retry-After header
+        // 
https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After
+        String retryAfterStrInSecond =
+                response == null ? null : 
response.header(HttpHeaders.RETRY_AFTER);
+        Long retryAfter = null;
+        if (retryAfterStrInSecond != null) {
+            try {
+                retryAfter = Long.parseLong(retryAfterStrInSecond) * 1000;
+            } catch (Throwable ignore) {
+            }
+
+            if (retryAfter != null && retryAfter > 0) {
+                return retryAfter;
+            }
+        }
+
+        int delayMillis = 1000 * (int) Math.min(Math.pow(2.0, (long) execCount 
- 1.0), 64.0);
+        int jitter = ThreadLocalRandom.current().nextInt(Math.max(1, (int) 
(delayMillis * 0.1)));
+
+        return delayMillis + jitter;
+    }
+
+    private void wait(Response response, int retryCount) throws 
InterruptedIOException {
+        try {
+            Thread.sleep(getRetryIntervalInMilliseconds(response, retryCount));
+        } catch (final InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new InterruptedIOException();
+        }
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
index 08d6c8a050..5a13a51ef7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
@@ -26,6 +26,7 @@ import org.apache.paimon.utils.StringUtils;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 
+import okhttp3.ConnectionPool;
 import okhttp3.Dispatcher;
 import okhttp3.Headers;
 import okhttp3.MediaType;
@@ -40,6 +41,7 @@ import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
 
 import static okhttp3.ConnectionSpec.CLEARTEXT;
 import static okhttp3.ConnectionSpec.COMPATIBLE_TLS;
@@ -52,6 +54,7 @@ public class HttpClient implements RESTClient {
 
     private static final String THREAD_NAME = 
"REST-CATALOG-HTTP-CLIENT-THREAD-POOL";
     private static final MediaType MEDIA_TYPE = 
MediaType.parse("application/json");
+    private static final int CONNECTION_KEEP_ALIVE_DURATION_MS = 300_000;
 
     private final OkHttpClient okHttpClient;
     private final String uri;
@@ -191,14 +194,30 @@ public class HttpClient implements RESTClient {
         BlockingQueue<Runnable> workQueue = new SynchronousQueue<>();
         ExecutorService executorService =
                 createCachedThreadPool(httpClientOptions.threadPoolSize(), 
THREAD_NAME, workQueue);
-
+        ConnectionPool connectionPool =
+                new ConnectionPool(
+                        httpClientOptions.maxConnections(),
+                        CONNECTION_KEEP_ALIVE_DURATION_MS,
+                        TimeUnit.MILLISECONDS);
+        Dispatcher dispatcher = new Dispatcher(executorService);
+        // set max requests per host use max connections
+        dispatcher.setMaxRequestsPerHost(httpClientOptions.maxConnections());
         OkHttpClient.Builder builder =
                 new OkHttpClient.Builder()
-                        .dispatcher(new Dispatcher(executorService))
+                        .dispatcher(dispatcher)
                         .retryOnConnectionFailure(true)
-                        .connectionSpecs(Arrays.asList(MODERN_TLS, 
COMPATIBLE_TLS, CLEARTEXT));
-        httpClientOptions.connectTimeout().ifPresent(builder::connectTimeout);
-        httpClientOptions.readTimeout().ifPresent(builder::readTimeout);
+                        .connectionPool(connectionPool)
+                        .connectionSpecs(Arrays.asList(MODERN_TLS, 
COMPATIBLE_TLS, CLEARTEXT))
+                        .addInterceptor(
+                                new ExponentialHttpRetryInterceptor(
+                                        httpClientOptions.maxRetries()));
+        httpClientOptions
+                .connectTimeout()
+                .ifPresent(
+                        timeoutDuration -> {
+                            builder.connectTimeout(timeoutDuration);
+                            builder.readTimeout(timeoutDuration);
+                        });
 
         return builder.build();
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClientOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClientOptions.java
index 00ae1a529e..548a989568 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClientOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClientOptions.java
@@ -30,26 +30,30 @@ public class HttpClientOptions {
 
     private final String uri;
     @Nullable private final Duration connectTimeout;
-    @Nullable private final Duration readTimeout;
     private final int threadPoolSize;
+    private final int maxConnections;
+    private final int maxRetries;
 
     public HttpClientOptions(
             String uri,
             @Nullable Duration connectTimeout,
-            @Nullable Duration readTimeout,
-            int threadPoolSize) {
+            int threadPoolSize,
+            int maxConnections,
+            int maxRetries) {
         this.uri = uri;
         this.connectTimeout = connectTimeout;
-        this.readTimeout = readTimeout;
         this.threadPoolSize = threadPoolSize;
+        this.maxConnections = maxConnections;
+        this.maxRetries = maxRetries;
     }
 
     public static HttpClientOptions create(Options options) {
         return new HttpClientOptions(
                 options.get(RESTCatalogOptions.URI),
                 options.get(RESTCatalogOptions.CONNECTION_TIMEOUT),
-                options.get(RESTCatalogOptions.READ_TIMEOUT),
-                options.get(RESTCatalogOptions.THREAD_POOL_SIZE));
+                options.get(RESTCatalogOptions.THREAD_POOL_SIZE),
+                options.get(RESTCatalogOptions.MAX_CONNECTIONS),
+                options.get(RESTCatalogOptions.MAX_RETIES));
     }
 
     public String uri() {
@@ -60,11 +64,15 @@ public class HttpClientOptions {
         return Optional.ofNullable(connectTimeout);
     }
 
-    public Optional<Duration> readTimeout() {
-        return Optional.ofNullable(readTimeout);
-    }
-
     public int threadPoolSize() {
         return threadPoolSize;
     }
+
+    public int maxConnections() {
+        return maxConnections;
+    }
+
+    public int maxRetries() {
+        return Math.max(maxRetries, 0);
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
index 1af64def4f..843228fa07 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
@@ -35,14 +35,20 @@ public class RESTCatalogOptions {
     public static final ConfigOption<Duration> CONNECTION_TIMEOUT =
             ConfigOptions.key("rest.client.connection-timeout")
                     .durationType()
-                    .noDefaultValue()
+                    .defaultValue(Duration.ofSeconds(180))
                     .withDescription("REST Catalog http client connect 
timeout.");
 
-    public static final ConfigOption<Duration> READ_TIMEOUT =
-            ConfigOptions.key("rest.client.read-timeout")
-                    .durationType()
-                    .noDefaultValue()
-                    .withDescription("REST Catalog http client read timeout.");
+    public static final ConfigOption<Integer> MAX_CONNECTIONS =
+            ConfigOptions.key("rest.client.max-connections")
+                    .intType()
+                    .defaultValue(100)
+                    .withDescription("REST Catalog http client's max 
connections.");
+
+    public static final ConfigOption<Integer> MAX_RETIES =
+            ConfigOptions.key("rest.client.max-retries")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription("REST Catalog http client's max retry 
times.");
 
     public static final ConfigOption<Integer> THREAD_POOL_SIZE =
             ConfigOptions.key("rest.client.num-threads")
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptorTest.java
new file mode 100644
index 0000000000..6510371f2d
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptorTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest;
+
+import okhttp3.Protocol;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.apache.hc.core5.http.HttpHeaders;
+import org.junit.jupiter.api.Test;
+
+import javax.net.ssl.SSLException;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.ConnectException;
+import java.net.NoRouteToHostException;
+import java.net.UnknownHostException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link ExponentialHttpRetryInterceptor}. */
+class ExponentialHttpRetryInterceptorTest {
+
+    private final int maxRetries = 5;
+    private final ExponentialHttpRetryInterceptor interceptor =
+            new ExponentialHttpRetryInterceptor(maxRetries);
+
+    @Test
+    void testNeedRetryByMethod() {
+
+        assertThat(interceptor.needRetry("GET", new IOException(), 
1)).isTrue();
+        assertThat(interceptor.needRetry("HEAD", new IOException(), 
1)).isTrue();
+        assertThat(interceptor.needRetry("PUT", new IOException(), 
1)).isTrue();
+        assertThat(interceptor.needRetry("DELETE", new IOException(), 
1)).isTrue();
+        assertThat(interceptor.needRetry("TRACE", new IOException(), 
1)).isTrue();
+        assertThat(interceptor.needRetry("OPTIONS", new IOException(), 
1)).isTrue();
+
+        assertThat(interceptor.needRetry("POST", new IOException(), 
1)).isFalse();
+        assertThat(interceptor.needRetry("PATCH", new IOException(), 
1)).isFalse();
+        assertThat(interceptor.needRetry("CONNECT", new IOException(), 
1)).isFalse();
+        assertThat(interceptor.needRetry("GET", new IOException(), maxRetries 
+ 1)).isFalse();
+    }
+
+    @Test
+    void testNeedRetryByException() {
+
+        assertThat(interceptor.needRetry("GET", new InterruptedIOException(), 
1)).isFalse();
+        assertThat(interceptor.needRetry("GET", new UnknownHostException(), 
1)).isFalse();
+        assertThat(interceptor.needRetry("GET", new ConnectException(), 
1)).isFalse();
+        assertThat(interceptor.needRetry("GET", new NoRouteToHostException(), 
1)).isFalse();
+        assertThat(interceptor.needRetry("GET", new SSLException("error"), 
1)).isFalse();
+
+        assertThat(interceptor.needRetry("GET", new IOException("error"), 
1)).isTrue();
+        assertThat(interceptor.needRetry("GET", new IOException("error"), 
maxRetries + 1))
+                .isFalse();
+    }
+
+    @Test
+    void testRetryByResponse() {
+
+        assertThat(interceptor.needRetry(createResponse(429), 1)).isTrue();
+        assertThat(interceptor.needRetry(createResponse(503), 1)).isTrue();
+        assertThat(interceptor.needRetry(createResponse(502), 1)).isTrue();
+        assertThat(interceptor.needRetry(createResponse(504), 1)).isTrue();
+
+        assertThat(interceptor.needRetry(createResponse(500), 1)).isFalse();
+        assertThat(interceptor.needRetry(createResponse(404), 1)).isFalse();
+        assertThat(interceptor.needRetry(createResponse(200), 1)).isFalse();
+    }
+
+    @Test
+    void invalidRetryAfterHeader() {
+        Response response = createResponse(429, "Stuff");
+
+        assertThat(interceptor.getRetryIntervalInMilliseconds(response, 
3)).isBetween(4000L, 5000L);
+    }
+
+    @Test
+    void validRetryAfterHeader() {
+        long retryAfter = 3;
+        Response response = createResponse(429, retryAfter + "");
+        assertThat(interceptor.getRetryIntervalInMilliseconds(response, 3))
+                .isEqualTo(retryAfter * 1000);
+    }
+
+    @Test
+    void exponentialRetry() {
+        ExponentialHttpRetryInterceptor interceptor = new 
ExponentialHttpRetryInterceptor(10);
+        Response response = createResponse(429, "Stuff");
+
+        // note that the upper limit includes ~10% variability
+        assertThat(interceptor.getRetryIntervalInMilliseconds(response, 
0)).isEqualTo(0);
+        assertThat(interceptor.getRetryIntervalInMilliseconds(response, 
1)).isBetween(1000L, 2000L);
+        assertThat(interceptor.getRetryIntervalInMilliseconds(response, 
2)).isBetween(2000L, 3000L);
+        assertThat(interceptor.getRetryIntervalInMilliseconds(response, 
3)).isBetween(4000L, 5000L);
+        assertThat(interceptor.getRetryIntervalInMilliseconds(response, 
4)).isBetween(8000L, 9000L);
+        assertThat(interceptor.getRetryIntervalInMilliseconds(response, 5))
+                .isBetween(16000L, 18000L);
+        assertThat(interceptor.getRetryIntervalInMilliseconds(response, 6))
+                .isBetween(32000L, 36000L);
+        assertThat(interceptor.getRetryIntervalInMilliseconds(response, 7))
+                .isBetween(64000L, 72000L);
+        assertThat(interceptor.getRetryIntervalInMilliseconds(response, 10))
+                .isBetween(64000L, 72000L);
+    }
+
+    private static Response createResponse(int httpCode) {
+        return createResponse(httpCode, "");
+    }
+
+    private static Response createResponse(int httpCode, String retryAfter) {
+        return new Response.Builder()
+                .code(httpCode)
+                .message("message")
+                .protocol(Protocol.HTTP_1_1)
+                .request(new Request.Builder().url("http://localhost";).build())
+                .addHeader(HttpHeaders.RETRY_AFTER, retryAfter)
+                .build();
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java
index 161dbaf3bb..05078cf805 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java
@@ -56,8 +56,7 @@ public class HttpClientTest {
         server.start();
         errorHandler = DefaultErrorHandler.getInstance();
         HttpClientOptions httpClientOptions =
-                new HttpClientOptions(
-                        server.getBaseUrl(), Duration.ofSeconds(3), 
Duration.ofSeconds(3), 1);
+                new HttpClientOptions(server.getBaseUrl(), 
Duration.ofSeconds(3), 1, 10, 2);
         mockResponseData = new MockRESTData(MOCK_PATH);
         mockResponseDataStr = server.createResponseBody(mockResponseData);
         errorResponseStr =
@@ -116,4 +115,15 @@ public class HttpClientTest {
         server.enqueueResponse(errorResponseStr, 400);
         assertThrows(BadRequestException.class, () -> 
httpClient.delete(MOCK_PATH, headers));
     }
+
+    @Test
+    public void testRetry() {
+        HttpClient httpClient =
+                new HttpClient(
+                        new HttpClientOptions(
+                                server.getBaseUrl(), Duration.ofSeconds(30), 
1, 10, 2));
+        server.enqueueResponse(mockResponseDataStr, 429);
+        server.enqueueResponse(mockResponseDataStr, 200);
+        assertDoesNotThrow(() -> httpClient.get(MOCK_PATH, MockRESTData.class, 
headers));
+    }
 }

Reply via email to