>From Michael Blow <[email protected]>:
Michael Blow has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21103?usp=email )
Change subject: [NO ISSUE][*DB][STO] Configure & honor max idle & max lifetime
for cloud connections
......................................................................
[NO ISSUE][*DB][STO] Configure & honor max idle & max lifetime for cloud
connections
Change-Id: Ibf1777b5d879af0d2e1b2a857c72d0654d6e055f
---
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
M
asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/azure/LSMAzBlobStorageTest.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ICloudProperties.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob/BlobUtils.java
6 files changed, 94 insertions(+), 46 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/03/21103/1
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
index b993e55..daea8d2 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
@@ -51,19 +51,26 @@
private final int requestsMaxPendingHttpConnections;
private final int requestsHttpConnectionAcquireTimeout;
private final AccessTier accessTier;
+ private final int maxIdleSeconds;
+ private final int maxLifetimeSeconds;
public AzBlobStorageClientConfig(String region, String endpoint, String
prefix, boolean anonymousAuth,
long profilerLogInterval, String bucket, int writeBufferSize) {
+ // TODO(mblow): using the same values as our defaults for blob storage
seems sus, refactor to configurable and
+ // and use reasonable defaults
this(region, endpoint, prefix, anonymousAuth, profilerLogInterval,
bucket, 1, 0, 0, writeBufferSize, false,
- null, CloudProperties.MAX_HTTP_CONNECTIONS,
CloudProperties.MAX_PENDING_HTTP_CONNECTIONS,
- CloudProperties.HTTP_CONNECTION_ACQUIRE_TIMEOUT);
+ null, CloudProperties.MAX_HTTP_CONNECTIONS_DEFAULT,
+ CloudProperties.MAX_PENDING_HTTP_CONNECTIONS_DEFAULT,
+ CloudProperties.HTTP_CONNECTION_ACQUIRE_TIMEOUT_DEFAULT,
+ CloudProperties.HTTP_CONNECTION_MAX_IDLE_SECONDS_DEFAULT,
+ CloudProperties.HTTP_CONNECTION_MAX_LIFETIME_SECONDS_DEFAULT);
}
public AzBlobStorageClientConfig(String region, String endpoint, String
prefix, boolean anonymousAuth,
long profilerLogInterval, String bucket, long tokenAcquireTimeout,
int writeMaxRequestsPerSeconds,
int readMaxRequestsPerSeconds, int writeBufferSize, boolean
storageDisableSSLVerify, AccessTier accessTier,
int requestsMaxHttpConnections, int
requestsMaxPendingHttpConnections,
- int requestsHttpConnectionAcquireTimeout) {
+ int requestsHttpConnectionAcquireTimeout, int maxIdleSeconds, int
maxLifetimeSeconds) {
this.region = Objects.requireNonNull(region, "region");
this.endpoint = endpoint;
this.prefix = Objects.requireNonNull(prefix, "prefix");
@@ -80,6 +87,8 @@
getRequestsMaxPendingHttpConnections(requestsMaxPendingHttpConnections);
this.requestsHttpConnectionAcquireTimeout =
requestsHttpConnectionAcquireTimeout;
this.accessTier = accessTier;
+ this.maxIdleSeconds = maxIdleSeconds;
+ this.maxLifetimeSeconds = maxLifetimeSeconds;
}
public static AzBlobStorageClientConfig of(ICloudProperties
cloudProperties) {
@@ -90,7 +99,9 @@
cloudProperties.getReadMaxRequestsPerSecond(),
cloudProperties.getWriteBufferSize(),
cloudProperties.isStorageDisableSSLVerify(),
INTERNAL_STORAGE_ACCESS_TIER,
cloudProperties.getRequestsMaxHttpConnections(),
cloudProperties.getRequestsMaxPendingHttpConnections(),
- cloudProperties.getRequestsHttpConnectionAcquireTimeout());
+ cloudProperties.getRequestsHttpConnectionAcquireTimeout(),
+ cloudProperties.getRequestsHttpConnectionMaxIdleSeconds(),
+ cloudProperties.getRequestsHttpConnectionMaxLifetimeSeconds());
}
public static AzBlobStorageClientConfig of(Map<String, String>
configuration, int writeBufferSize) {
@@ -179,4 +190,12 @@
}
return requestsMaxPendingHttpConnections;
}
+
+ public int getMaxIdleSeconds() {
+ return maxIdleSeconds;
+ }
+
+ public int getMaxLifetimeSeconds() {
+ return maxLifetimeSeconds;
+ }
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
index 2ee253d..49e911e 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
@@ -82,12 +82,10 @@
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import io.netty.handler.ssl.SslContext;
-import io.netty.handler.ssl.SslContextBuilder;
-import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
+import reactor.netty.resources.ConnectionProvider;
public class AzBlobStorageCloudClient implements ICloudClient {
@@ -398,41 +396,41 @@
}
private static BlobServiceClient buildClient(AzBlobStorageClientConfig
config) {
- BlobServiceClientBuilder blobServiceClientBuilder =
getBlobServiceClientBuilder(config);
+ BlobServiceClientBuilder blobServiceClientBuilder =
getBlobServiceClientBuilder(config, false);
return blobServiceClientBuilder.buildClient();
}
private static BlobServiceAsyncClient
buildAsyncClient(AzBlobStorageClientConfig config) {
- BlobServiceClientBuilder blobServiceClientBuilder =
getBlobServiceClientBuilder(config);
+ BlobServiceClientBuilder blobServiceClientBuilder =
getBlobServiceClientBuilder(config, true);
return blobServiceClientBuilder.buildAsyncClient();
}
- private static BlobServiceClientBuilder
getBlobServiceClientBuilder(AzBlobStorageClientConfig config) {
+ private static BlobServiceClientBuilder
getBlobServiceClientBuilder(AzBlobStorageClientConfig config,
+ boolean async) {
BlobServiceClientBuilder blobServiceClientBuilder = new
BlobServiceClientBuilder();
blobServiceClientBuilder.endpoint(getEndpoint(config));
blobServiceClientBuilder.httpLogOptions(AzureConstants.HTTP_LOG_OPTIONS);
configCredentialsToAzClient(blobServiceClientBuilder, config);
- // Disable SSL verification if the config property is set
+ ConnectionProvider.Builder builder =
ConnectionProvider.builder("azblob-client-" + (async ? "async" : "sync"))
+ .maxConnections(config.getRequestsMaxHttpConnections())
+
.pendingAcquireMaxCount(config.getRequestsMaxPendingHttpConnections());
+ if (config.getMaxIdleSeconds() > 0) {
+
builder.maxIdleTime(Duration.ofSeconds(config.getMaxIdleSeconds()));
+ }
+ if (config.getMaxLifetimeSeconds() > 0) {
+
builder.maxLifeTime(Duration.ofSeconds(config.getMaxLifetimeSeconds()));
+ }
+ if (config.getRequestsHttpConnectionAcquireTimeout() > 0) {
+
builder.pendingAcquireTimeout(Duration.ofSeconds(config.getRequestsHttpConnectionAcquireTimeout()));
+ }
+ ConnectionProvider connectionProvider = builder.build();
+
+ HttpClient httpClient = HttpClient.create(connectionProvider);
if (config.isStorageDisableSSLVerify()) {
- try {
- // Create SSL context that trusts all certificates
- SslContext sslContext =
-
SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
-
- // Create a base Reactor Netty HttpClient with SSL
verification disabled
- HttpClient baseHttpClient = HttpClient.create().secure(sslSpec
-> sslSpec.sslContext(sslContext));
-
- // Configure the Azure HTTP client with the base client
- blobServiceClientBuilder.httpClient(new
NettyAsyncHttpClientBuilder(baseHttpClient).build());
- } catch (Exception e) {
- throw new RuntimeException("Failed to disable SSL
verification", e);
- }
+ httpClient = disableSslVerify(httpClient);
}
- boolean disableSslVerify = config.isStorageDisableSSLVerify();
- if (disableSslVerify) {
- disableSslVerify(blobServiceClientBuilder);
- }
+ blobServiceClientBuilder.httpClient(new
NettyAsyncHttpClientBuilder(httpClient).build());
return blobServiceClientBuilder;
}
diff --git
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/azure/LSMAzBlobStorageTest.java
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/azure/LSMAzBlobStorageTest.java
index d734773..e876834 100644
---
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/azure/LSMAzBlobStorageTest.java
+++
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/azure/LSMAzBlobStorageTest.java
@@ -35,7 +35,6 @@
import org.apache.asterix.cloud.clients.ICloudGuardian;
import
org.apache.asterix.cloud.clients.azure.blobstorage.AzBlobStorageClientConfig;
import
org.apache.asterix.cloud.clients.azure.blobstorage.AzBlobStorageCloudClient;
-import org.apache.asterix.common.config.CloudProperties;
import org.apache.hyracks.util.StorageUtil;
import org.bouncycastle.asn1.x500.X500Name;
import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
@@ -118,8 +117,7 @@
URI blobStore = URI.create(blobServiceClient.getAccountUrl());
String endpoint = blobStore.getScheme() + "://" +
blobStore.getAuthority() + "/devstoreaccount1";
AzBlobStorageClientConfig config = new
AzBlobStorageClientConfig(MOCK_SERVER_REGION, endpoint, "", false, 0,
- PLAYGROUND_CONTAINER, 1, 0, 0, writeBufferSize, true, null,
CloudProperties.MAX_HTTP_CONNECTIONS,
- CloudProperties.MAX_PENDING_HTTP_CONNECTIONS,
CloudProperties.HTTP_CONNECTION_ACQUIRE_TIMEOUT);
+ PLAYGROUND_CONTAINER, 1, 0, 0, writeBufferSize, true, null,
1000, 10000, 120, 120, 0);
CLOUD_CLIENT = new AzBlobStorageCloudClient(config,
ICloudGuardian.NoOpCloudGuardian.INSTANCE);
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
index b1bafae..9dbdf9c 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
@@ -44,9 +44,13 @@
public class CloudProperties extends AbstractProperties implements
ICloudProperties {
- public static final int MAX_HTTP_CONNECTIONS = 1000;
- public static final int MAX_PENDING_HTTP_CONNECTIONS = 10000;
- public static final int HTTP_CONNECTION_ACQUIRE_TIMEOUT = 120;
+ // TODO(mblow): these should not be being used for external datasets-
extract separate properties
+ // for these and make these defaults private to this class
+ public static final int MAX_HTTP_CONNECTIONS_DEFAULT = 1000;
+ public static final int MAX_PENDING_HTTP_CONNECTIONS_DEFAULT = 10000;
+ public static final int HTTP_CONNECTION_ACQUIRE_TIMEOUT_DEFAULT = 120;
+ public static final int HTTP_CONNECTION_MAX_IDLE_SECONDS_DEFAULT = 120;
+ public static final int HTTP_CONNECTION_MAX_LIFETIME_SECONDS_DEFAULT = 0;
public CloudProperties(PropertiesAccessor accessor) {
super(accessor);
@@ -77,9 +81,13 @@
getRangedIntegerType(5, Integer.MAX_VALUE),
StorageUtil.getIntSizeInBytes(8,
StorageUtil.StorageUnit.MEGABYTE)),
CLOUD_EVICTION_PLAN_REEVALUATE_THRESHOLD(POSITIVE_INTEGER, 50),
- CLOUD_REQUESTS_MAX_HTTP_CONNECTIONS(POSITIVE_INTEGER,
MAX_HTTP_CONNECTIONS),
- CLOUD_REQUESTS_MAX_PENDING_HTTP_CONNECTIONS(POSITIVE_INTEGER,
MAX_PENDING_HTTP_CONNECTIONS),
- CLOUD_REQUESTS_HTTP_CONNECTION_ACQUIRE_TIMEOUT(POSITIVE_INTEGER,
HTTP_CONNECTION_ACQUIRE_TIMEOUT),
+ CLOUD_REQUESTS_MAX_HTTP_CONNECTIONS(POSITIVE_INTEGER,
MAX_HTTP_CONNECTIONS_DEFAULT),
+ CLOUD_REQUESTS_MAX_PENDING_HTTP_CONNECTIONS(POSITIVE_INTEGER,
MAX_PENDING_HTTP_CONNECTIONS_DEFAULT),
+ CLOUD_REQUESTS_HTTP_CONNECTION_ACQUIRE_TIMEOUT(POSITIVE_INTEGER,
HTTP_CONNECTION_ACQUIRE_TIMEOUT_DEFAULT),
+ CLOUD_REQUESTS_HTTP_CONNECTION_MAX_IDLE_SECONDS(NONNEGATIVE_INTEGER,
HTTP_CONNECTION_MAX_IDLE_SECONDS_DEFAULT),
+ CLOUD_REQUESTS_HTTP_CONNECTION_MAX_LIFETIME_SECONDS(
+ NONNEGATIVE_INTEGER,
+ HTTP_CONNECTION_MAX_LIFETIME_SECONDS_DEFAULT),
CLOUD_STORAGE_FORCE_PATH_STYLE(BOOLEAN, false),
CLOUD_STORAGE_DISABLE_SSL_VERIFY(BOOLEAN, false),
CLOUD_STORAGE_LIST_EVENTUALLY_CONSISTENT(BOOLEAN, false),
@@ -129,6 +137,8 @@
case CLOUD_REQUESTS_MAX_HTTP_CONNECTIONS:
case CLOUD_REQUESTS_MAX_PENDING_HTTP_CONNECTIONS:
case CLOUD_REQUESTS_HTTP_CONNECTION_ACQUIRE_TIMEOUT:
+ case CLOUD_REQUESTS_HTTP_CONNECTION_MAX_IDLE_SECONDS:
+ case CLOUD_REQUESTS_HTTP_CONNECTION_MAX_LIFETIME_SECONDS:
case CLOUD_STORAGE_FORCE_PATH_STYLE:
case CLOUD_STORAGE_DISABLE_SSL_VERIFY:
case CLOUD_STORAGE_LIST_EVENTUALLY_CONSISTENT:
@@ -200,6 +210,10 @@
return "The maximum number of HTTP connections allowed to
wait for a connection per node";
case CLOUD_REQUESTS_HTTP_CONNECTION_ACQUIRE_TIMEOUT:
return "The waiting time (in seconds) to acquire an HTTP
connection before failing the request";
+ case CLOUD_REQUESTS_HTTP_CONNECTION_MAX_IDLE_SECONDS:
+ return "The time (in seconds) after which an idle cloud
connection will be closed. (0 == unlimited idle)";
+ case CLOUD_REQUESTS_HTTP_CONNECTION_MAX_LIFETIME_SECONDS:
+ return "The time (in seconds) after which an cloud
connection will no longer be reused. (0 == unlimited lifetime)";
case CLOUD_STORAGE_FORCE_PATH_STYLE:
return "Indicates whether or not to force path style when
accessing the cloud storage";
case CLOUD_STORAGE_DISABLE_SSL_VERIFY:
@@ -334,6 +348,16 @@
return
accessor.getInt(Option.CLOUD_REQUESTS_HTTP_CONNECTION_ACQUIRE_TIMEOUT);
}
+ @Override
+ public int getRequestsHttpConnectionMaxIdleSeconds() {
+ return
accessor.getInt(Option.CLOUD_REQUESTS_HTTP_CONNECTION_MAX_IDLE_SECONDS);
+ }
+
+ @Override
+ public int getRequestsHttpConnectionMaxLifetimeSeconds() {
+ return
accessor.getInt(Option.CLOUD_REQUESTS_HTTP_CONNECTION_MAX_LIFETIME_SECONDS);
+ }
+
public boolean isStorageForcePathStyle() {
return accessor.getBoolean(Option.CLOUD_STORAGE_FORCE_PATH_STYLE);
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ICloudProperties.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ICloudProperties.java
index 0805bfa..dcda48b 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ICloudProperties.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ICloudProperties.java
@@ -79,6 +79,10 @@
int getRequestsHttpConnectionAcquireTimeout();
+ int getRequestsHttpConnectionMaxIdleSeconds();
+
+ int getRequestsHttpConnectionMaxLifetimeSeconds();
+
boolean isStorageForcePathStyle();
boolean isStorageDisableSSLVerify();
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob/BlobUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob/BlobUtils.java
index a695320..8b6b99f 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob/BlobUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob/BlobUtils.java
@@ -75,6 +75,7 @@
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import org.apache.hyracks.util.annotations.AiProvenance;
import reactor.netty.http.client.HttpClient;
public class BlobUtils {
@@ -130,7 +131,8 @@
}
if (disableSslVerify) {
- disableSslVerify(builder);
+ HttpClient httpClient = disableSslVerify(HttpClient.create());
+ builder.httpClient(new
NettyAsyncHttpClientBuilder(httpClient).build());
}
// Shared Key
@@ -221,18 +223,21 @@
}
}
- public static void disableSslVerify(BlobServiceClientBuilder builder) {
+ /**
+ * Returns a new {@link HttpClient} derived from {@code baseClient} with
SSL verification disabled.
+ * The caller is responsible for setting the resulting client on the
builder via
+ * {@code builder.httpClient(new
NettyAsyncHttpClientBuilder(result).build())}.
+ *
+ * @param baseClient the base Reactor Netty {@link HttpClient} to apply
SSL disabling on top of
+ * @return a new {@link HttpClient} with an insecure trust manager applied
+ */
+ @AiProvenance(agent = AiProvenance.Agent.CLAUDE_SONNET_4_6, tool =
AiProvenance.Tool.GITHUB_COPILOT, contributionKind =
AiProvenance.ContributionKind.DOC_GENERATED)
+ public static HttpClient disableSslVerify(HttpClient baseClient) {
try {
- // Create SSL context that trusts all certificates
SslContextBuilder sslContextBuilder =
SslContextBuilder.forClient();
sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);
SslContext sslContext = sslContextBuilder.build();
-
- // Create a base Reactor Netty HttpClient with SSL verification
disabled
- HttpClient baseHttpClient = HttpClient.create().secure(sslSpec ->
sslSpec.sslContext(sslContext));
-
- // Configure the Azure HTTP client with the base client
- builder.httpClient(new
NettyAsyncHttpClientBuilder(baseHttpClient).build());
+ return baseClient.secure(sslSpec ->
sslSpec.sslContext(sslContext));
} catch (Exception e) {
throw new RuntimeException("Failed to disable SSL verification",
e);
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21103?usp=email
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: phoenix
Gerrit-Change-Id: Ibf1777b5d879af0d2e1b2a857c72d0654d6e055f
Gerrit-Change-Number: 21103
Gerrit-PatchSet: 1
Gerrit-Owner: Michael Blow <[email protected]>