This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2439452e1d [rest] Make http client reusable to reduce connection cost
(#5145)
2439452e1d is described below
commit 2439452e1d97ae5e5ab1efa66280ef48cf8d0bd9
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Feb 25 14:08:08 2025 +0800
[rest] Make http client reusable to reduce connection cost (#5145)
---
docs/content/concepts/rest-catalog.md | 44 ++++++------
.../java/org/apache/paimon/rest/HttpClient.java | 78 +++++-----------------
.../org/apache/paimon/rest/HttpClientOptions.java | 78 ----------------------
.../java/org/apache/paimon/rest/RESTCatalog.java | 2 +-
.../org/apache/paimon/rest/RESTCatalogOptions.java | 30 ---------
.../org/apache/paimon/rest/HttpClientTest.java | 10 +--
.../org/apache/paimon/rest/RESTCatalogTest.java | 3 -
.../org/apache/paimon/flink/RESTCatalogITCase.java | 1 -
8 files changed, 43 insertions(+), 203 deletions(-)
diff --git a/docs/content/concepts/rest-catalog.md
b/docs/content/concepts/rest-catalog.md
index 55613f5107..7092b44941 100644
--- a/docs/content/concepts/rest-catalog.md
+++ b/docs/content/concepts/rest-catalog.md
@@ -51,23 +51,23 @@ Paimon REST Catalog provides a lightweight implementation
to access the catalog
```sql
CREATE CATALOG `paimon-rest-catalog`
WITH (
-'type' = 'paimon',
-'uri' = '<catalog server url>',
-'metastore' = 'rest',
-'token.provider' = 'bear'
-'token' = '<token>'
+ 'type' = 'paimon',
+ 'uri' = '<catalog server url>',
+ 'metastore' = 'rest',
+ 'token.provider' = 'bear'
+ 'token' = '<token>'
);
```
- DLF ak
```sql
CREATE CATALOG `paimon-rest-catalog`
WITH (
-'type' = 'paimon',
-'uri' = '<catalog server url>',
-'metastore' = 'rest',
-'token.provider' = 'dlf',
-'dlf.accessKeyId'='<accessKeyId>',
-'dlf.accessKeySecret'='<accessKeySecret>',
+ 'type' = 'paimon',
+ 'uri' = '<catalog server url>',
+ 'metastore' = 'rest',
+ 'token.provider' = 'dlf',
+ 'dlf.accessKeyId'='<accessKeyId>',
+ 'dlf.accessKeySecret'='<accessKeySecret>',
);
```
@@ -75,13 +75,13 @@ WITH (
```sql
CREATE CATALOG `paimon-rest-catalog`
WITH (
-'type' = 'paimon',
-'uri' = '<catalog server url>',
-'metastore' = 'rest',
-'token.provider' = 'dlf',
-'dlf.accessKeyId'='<accessKeyId>',
-'dlf.accessKeySecret'='<accessKeySecret>',
-'dlf.securityToken'='<securityToken>'
+ 'type' = 'paimon',
+ 'uri' = '<catalog server url>',
+ 'metastore' = 'rest',
+ 'token.provider' = 'dlf',
+ 'dlf.accessKeyId'='<accessKeyId>',
+ 'dlf.accessKeySecret'='<accessKeySecret>',
+ 'dlf.securityToken'='<securityToken>'
);
```
@@ -89,10 +89,10 @@ WITH (
```sql
CREATE CATALOG `paimon-rest-catalog`
WITH (
-'type' = 'paimon',
-'uri' = '<catalog server url>',
-'metastore' = 'rest',
-'token.provider' = 'dlf'
+ 'type' = 'paimon',
+ 'uri' = '<catalog server url>',
+ 'metastore' = 'rest',
+ 'token.provider' = 'dlf'
);
```
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
index 8aa3da86d8..bc708d2d5b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
@@ -19,7 +19,6 @@
package org.apache.paimon.rest;
import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.options.Options;
import org.apache.paimon.rest.auth.RESTAuthFunction;
import org.apache.paimon.rest.auth.RESTAuthParameter;
import org.apache.paimon.rest.exceptions.RESTException;
@@ -29,8 +28,6 @@ import org.apache.paimon.utils.StringUtils;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
-import okhttp3.ConnectionPool;
-import okhttp3.Dispatcher;
import okhttp3.Headers;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
@@ -38,16 +35,12 @@ import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
-import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -55,31 +48,31 @@ import static okhttp3.ConnectionSpec.CLEARTEXT;
import static okhttp3.ConnectionSpec.COMPATIBLE_TLS;
import static okhttp3.ConnectionSpec.MODERN_TLS;
import static org.apache.paimon.rest.RESTObjectMapper.OBJECT_MAPPER;
-import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
/** HTTP client for REST catalog. */
public class HttpClient implements RESTClient {
- private static final String THREAD_NAME =
"REST-CATALOG-HTTP-CLIENT-THREAD-POOL";
+ private static final OkHttpClient HTTP_CLIENT =
+ new OkHttpClient.Builder()
+ .retryOnConnectionFailure(true)
+ .connectionSpecs(Arrays.asList(MODERN_TLS, COMPATIBLE_TLS,
CLEARTEXT))
+ .addInterceptor(new ExponentialHttpRetryInterceptor(5))
+ .connectTimeout(Duration.ofMinutes(3))
+ .readTimeout(Duration.ofMinutes(3))
+ .build();
+
private static final MediaType MEDIA_TYPE =
MediaType.parse("application/json");
- private static final int CONNECTION_KEEP_ALIVE_DURATION_MS = 300_000;
- private final OkHttpClient okHttpClient;
private final String uri;
private ErrorHandler errorHandler;
- public HttpClient(Options options) {
- this(HttpClientOptions.create(options));
- }
-
- public HttpClient(HttpClientOptions httpClientOptions) {
- if (httpClientOptions.uri() != null &&
httpClientOptions.uri().endsWith("/")) {
- this.uri = httpClientOptions.uri().substring(0,
httpClientOptions.uri().length() - 1);
+ public HttpClient(String uri) {
+ if (uri != null && uri.endsWith("/")) {
+ this.uri = uri.substring(0, uri.length() - 1);
} else {
- this.uri = httpClientOptions.uri();
+ this.uri = uri;
}
- this.okHttpClient = createHttpClient(httpClientOptions);
this.errorHandler = DefaultErrorHandler.getInstance();
}
@@ -160,14 +153,8 @@ public class HttpClient implements RESTClient {
}
}
- @Override
- public void close() throws IOException {
- okHttpClient.dispatcher().cancelAll();
- okHttpClient.connectionPool().evictAll();
- }
-
private <T extends RESTResponse> T exec(Request request, Class<T>
responseType) {
- try (Response response = okHttpClient.newCall(request).execute()) {
+ try (Response response = HTTP_CLIENT.newCall(request).execute()) {
String responseBodyStr = response.body() != null ?
response.body().string() : null;
if (!response.isSuccessful()) {
ErrorResponse error;
@@ -203,38 +190,6 @@ public class HttpClient implements RESTClient {
return RequestBody.create(body.getBytes(StandardCharsets.UTF_8),
MEDIA_TYPE);
}
- private static OkHttpClient createHttpClient(HttpClientOptions
httpClientOptions) {
- BlockingQueue<Runnable> workQueue = new SynchronousQueue<>();
- ExecutorService executorService =
- createCachedThreadPool(httpClientOptions.threadPoolSize(),
THREAD_NAME, workQueue);
- ConnectionPool connectionPool =
- new ConnectionPool(
- httpClientOptions.maxConnections(),
- CONNECTION_KEEP_ALIVE_DURATION_MS,
- TimeUnit.MILLISECONDS);
- Dispatcher dispatcher = new Dispatcher(executorService);
- // set max requests per host use max connections
- dispatcher.setMaxRequestsPerHost(httpClientOptions.maxConnections());
- OkHttpClient.Builder builder =
- new OkHttpClient.Builder()
- .dispatcher(dispatcher)
- .retryOnConnectionFailure(true)
- .connectionPool(connectionPool)
- .connectionSpecs(Arrays.asList(MODERN_TLS,
COMPATIBLE_TLS, CLEARTEXT))
- .addInterceptor(
- new ExponentialHttpRetryInterceptor(
- httpClientOptions.maxRetries()));
- httpClientOptions
- .connectTimeout()
- .ifPresent(
- timeoutDuration -> {
- builder.connectTimeout(timeoutDuration);
- builder.readTimeout(timeoutDuration);
- });
-
- return builder.build();
- }
-
private String getRequestUrl(String path) {
return StringUtils.isNullOrWhitespaceOnly(path) ? uri : uri + path;
}
@@ -274,4 +229,7 @@ public class HttpClient implements RESTClient {
));
return Pair.of(resourcePath, parameters);
}
+
+ @Override
+ public void close() {}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClientOptions.java
b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClientOptions.java
deleted file mode 100644
index 548a989568..0000000000
--- a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClientOptions.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.rest;
-
-import org.apache.paimon.options.Options;
-
-import javax.annotation.Nullable;
-
-import java.time.Duration;
-import java.util.Optional;
-
-/** Options for Http Client. */
-public class HttpClientOptions {
-
- private final String uri;
- @Nullable private final Duration connectTimeout;
- private final int threadPoolSize;
- private final int maxConnections;
- private final int maxRetries;
-
- public HttpClientOptions(
- String uri,
- @Nullable Duration connectTimeout,
- int threadPoolSize,
- int maxConnections,
- int maxRetries) {
- this.uri = uri;
- this.connectTimeout = connectTimeout;
- this.threadPoolSize = threadPoolSize;
- this.maxConnections = maxConnections;
- this.maxRetries = maxRetries;
- }
-
- public static HttpClientOptions create(Options options) {
- return new HttpClientOptions(
- options.get(RESTCatalogOptions.URI),
- options.get(RESTCatalogOptions.CONNECTION_TIMEOUT),
- options.get(RESTCatalogOptions.THREAD_POOL_SIZE),
- options.get(RESTCatalogOptions.MAX_CONNECTIONS),
- options.get(RESTCatalogOptions.MAX_RETIES));
- }
-
- public String uri() {
- return uri;
- }
-
- public Optional<Duration> connectTimeout() {
- return Optional.ofNullable(connectTimeout);
- }
-
- public int threadPoolSize() {
- return threadPoolSize;
- }
-
- public int maxConnections() {
- return maxConnections;
- }
-
- public int maxRetries() {
- return Math.max(maxRetries, 0);
- }
-}
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 5e43762ac8..fedaa7251d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -118,7 +118,7 @@ public class RESTCatalog implements Catalog {
}
public RESTCatalog(CatalogContext context, boolean configRequired) {
- this.client = new HttpClient(context.options());
+ this.client = new
HttpClient(context.options().get(RESTCatalogOptions.URI));
AuthSession catalogAuth = createAuthSession(context.options(),
tokenRefreshExecutor());
Options options = context.options();
Map<String, String> baseHeaders = Collections.emptyMap();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
index 310e3335ec..c9dd0fcfac 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
@@ -32,30 +32,6 @@ public class RESTCatalogOptions {
.noDefaultValue()
.withDescription("REST Catalog server's uri.");
- public static final ConfigOption<Duration> CONNECTION_TIMEOUT =
- ConfigOptions.key("rest.client.connection-timeout")
- .durationType()
- .defaultValue(Duration.ofSeconds(180))
- .withDescription("REST Catalog http client connect
timeout.");
-
- public static final ConfigOption<Integer> MAX_CONNECTIONS =
- ConfigOptions.key("rest.client.max-connections")
- .intType()
- .defaultValue(100)
- .withDescription("REST Catalog http client's max
connections.");
-
- public static final ConfigOption<Integer> MAX_RETIES =
- ConfigOptions.key("rest.client.max-retries")
- .intType()
- .defaultValue(5)
- .withDescription("REST Catalog http client's max retry
times.");
-
- public static final ConfigOption<Integer> THREAD_POOL_SIZE =
- ConfigOptions.key("rest.client.num-threads")
- .intType()
- .defaultValue(1)
- .withDescription("REST Catalog http client thread num.");
-
public static final ConfigOption<String> TOKEN =
ConfigOptions.key("token")
.stringType()
@@ -98,12 +74,6 @@ public class RESTCatalogOptions {
.noDefaultValue()
.withDescription("REST Catalog auth DLF security token");
- public static final ConfigOption<String> DLF_ROLE_SESSION_NAME =
- ConfigOptions.key("dlf.roleSessionName")
- .stringType()
- .noDefaultValue()
- .withDescription("REST Catalog auth DLF role session
name");
-
public static final ConfigOption<Boolean> DATA_TOKEN_ENABLED =
ConfigOptions.key("data-token.enabled")
.booleanType()
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java
index 8600637c88..9fdbd529ea 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java
@@ -34,7 +34,6 @@ import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
-import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -63,14 +62,12 @@ public class HttpClientTest {
server = new TestHttpWebServer(MOCK_PATH);
server.start();
errorHandler = DefaultErrorHandler.getInstance();
- HttpClientOptions httpClientOptions =
- new HttpClientOptions(server.getBaseUrl(),
Duration.ofSeconds(3), 1, 10, 2);
mockResponseData = new MockRESTData(MOCK_PATH);
mockResponseDataStr = server.createResponseBody(mockResponseData);
errorResponseStr =
server.createResponseBody(
new ErrorResponse(ErrorResponseResourceType.DATABASE,
"test", "test", 400));
- httpClient = new HttpClient(httpClientOptions);
+ httpClient = new HttpClient(server.getBaseUrl());
httpClient.setErrorHandler(errorHandler);
AuthProvider authProvider = new BearTokenAuthProvider(TOKEN);
AuthSession authSession = new AuthSession(authProvider);
@@ -134,10 +131,7 @@ public class HttpClientTest {
@Test
public void testRetry() {
- HttpClient httpClient =
- new HttpClient(
- new HttpClientOptions(
- server.getBaseUrl(), Duration.ofSeconds(30),
1, 10, 2));
+ HttpClient httpClient = new HttpClient(server.getBaseUrl());
server.enqueueResponse(mockResponseDataStr, 429);
server.enqueueResponse(mockResponseDataStr, 200);
assertDoesNotThrow(() -> httpClient.get(MOCK_PATH, MockRESTData.class,
restAuthFunction));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index 64389378bf..a0d17057a2 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -66,7 +66,6 @@ class RESTCatalogTest extends CatalogTestBase {
options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
options.set(RESTCatalogOptions.TOKEN, initToken);
options.set(RESTCatalogOptions.TOKEN_PROVIDER,
AuthProviderEnum.BEAR.identifier());
- options.set(RESTCatalogOptions.THREAD_POOL_SIZE, 1);
this.catalog = new RESTCatalog(CatalogContext.create(options));
}
@@ -90,7 +89,6 @@ class RESTCatalogTest extends CatalogTestBase {
options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
options.set(RESTCatalogOptions.TOKEN, "aaaaa");
options.set(RESTCatalogOptions.TOKEN_PROVIDER,
AuthProviderEnum.BEAR.identifier());
- options.set(RESTCatalogOptions.THREAD_POOL_SIZE, 1);
options.set(CatalogOptions.METASTORE, RESTCatalogFactory.IDENTIFIER);
assertThatThrownBy(() -> new
RESTCatalog(CatalogContext.create(options)))
.isInstanceOf(NotAuthorizedException.class);
@@ -134,7 +132,6 @@ class RESTCatalogTest extends CatalogTestBase {
Options options = new Options();
options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
options.set(RESTCatalogOptions.TOKEN, initToken);
- options.set(RESTCatalogOptions.THREAD_POOL_SIZE, 1);
options.set(RESTCatalogOptions.DATA_TOKEN_ENABLED, true);
options.set(RESTCatalogOptions.TOKEN_PROVIDER,
AuthProviderEnum.BEAR.identifier());
this.catalog = new RESTCatalog(CatalogContext.create(options));
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
index 145fcd0ba3..dc202ec872 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
@@ -101,7 +101,6 @@ class RESTCatalogITCase extends CatalogITCaseBase {
options.put("metastore", "rest");
options.put(RESTCatalogOptions.URI.key(), serverUrl);
options.put(RESTCatalogOptions.TOKEN.key(), initToken);
- options.put(RESTCatalogOptions.THREAD_POOL_SIZE.key(), "" + 1);
options.put(RESTCatalogOptions.TOKEN_PROVIDER.key(),
AuthProviderEnum.BEAR.identifier());
return options;
}