>From Murtadha Hubail <[email protected]>:
Murtadha Hubail has submitted this change. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21235?usp=email )
Change subject: [ASTERIXDB-3774][CLOUD] Adding HTTP idle/lifetime config in S3
and GCS cloud clients
......................................................................
[ASTERIXDB-3774][CLOUD] Adding HTTP idle/lifetime config in S3 and GCS cloud
clients
- user model changes: no
- storage format changes: no
- interface changes: no
Apply an idle timeout and a lifetime cap to the Apache HttpClient pool
backing the S3 and GCS clients: a pooled connection idle longer than
cloudRequestsHttpConnectionMaxIdleSeconds is evicted, and a connection
older than
cloudRequestsHttpConnectionMaxLifetimeSeconds is retired on
its next lease regardless of
activity.
Ex-ref: MB-71767
Change-Id: I52bfe3e855b654b48f4294a9cd52e59406555aeb
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21235
Tested-by: Murtadha Hubail <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
Integration-Tests: Jenkins <[email protected]>
---
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
A
asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/clients/aws/s3/S3ConnectionLifecycleTest.java
A
asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/clients/google/gcs/GCSConnectionLifecycleTest.java
7 files changed, 705 insertions(+), 8 deletions(-)
Approvals:
Murtadha Hubail: Looks good to me, approved; Verified
Jenkins: Verified
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
index 73237ce..6b98436 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
@@ -50,6 +50,8 @@
private final int requestsMaxHttpConnections;
private final int requestsMaxPendingHttpConnections;
private final int requestsHttpConnectionAcquireTimeout;
+ private final int maxIdleSeconds;
+ private final int maxLifetimeSeconds;
private final boolean forcePathStyle;
private final boolean disableSslVerify;
private final int s3ReadTimeoutInSeconds;
@@ -63,7 +65,7 @@
Collection<String> certificates, long profilerLogInterval, int
writeBufferSize,
S3ParallelDownloaderClientType parallelDownloaderClientType,
boolean roundRobinDnsResolver) {
this(region, endpoint, prefix, anonymousAuth, certificates,
profilerLogInterval, writeBufferSize, 1, 0, 0, 0,
- false, false, 0, 0, -1, parallelDownloaderClientType,
roundRobinDnsResolver, "", "",
+ false, false, 0, 0, 0, 0, -1, parallelDownloaderClientType,
roundRobinDnsResolver, "", "",
S3ChecksumBehavior.defaultForEndpoint(endpoint));
}
@@ -71,9 +73,10 @@
Collection<String> certificates, long profilerLogInterval, int
writeBufferSize, long tokenAcquireTimeout,
int writeMaxRequestsPerSeconds, int readMaxRequestsPerSeconds, int
requestsMaxHttpConnections,
boolean forcePathStyle, boolean disableSslVerify, int
requestsMaxPendingHttpConnections,
- int requestsHttpConnectionAcquireTimeout, int
s3ReadTimeoutInSeconds,
- S3ParallelDownloaderClientType parallelDownloaderClientType,
boolean roundRobinDnsResolver,
- String accessKeyId, String secretAccessKey, S3ChecksumBehavior
checksumBehavior) {
+ int requestsHttpConnectionAcquireTimeout, int maxIdleSeconds, int
maxLifetimeSeconds,
+ int s3ReadTimeoutInSeconds, S3ParallelDownloaderClientType
parallelDownloaderClientType,
+ boolean roundRobinDnsResolver, String accessKeyId, String
secretAccessKey,
+ S3ChecksumBehavior checksumBehavior) {
this.region = Objects.requireNonNull(region, "region");
this.endpoint = endpoint;
this.prefix = Objects.requireNonNull(prefix, "prefix");
@@ -87,6 +90,8 @@
this.requestsMaxHttpConnections = requestsMaxHttpConnections;
this.requestsMaxPendingHttpConnections =
requestsMaxPendingHttpConnections;
this.requestsHttpConnectionAcquireTimeout =
requestsHttpConnectionAcquireTimeout;
+ this.maxIdleSeconds = maxIdleSeconds;
+ this.maxLifetimeSeconds = maxLifetimeSeconds;
this.forcePathStyle = forcePathStyle;
this.disableSslVerify = disableSslVerify;
this.s3ReadTimeoutInSeconds = s3ReadTimeoutInSeconds;
@@ -105,7 +110,10 @@
cloudProperties.getWriteMaxRequestsPerSecond(),
cloudProperties.getReadMaxRequestsPerSecond(),
cloudProperties.getRequestsMaxHttpConnections(),
cloudProperties.isStorageForcePathStyle(),
cloudProperties.isStorageDisableSSLVerify(),
cloudProperties.getRequestsMaxPendingHttpConnections(),
- cloudProperties.getRequestsHttpConnectionAcquireTimeout(),
cloudProperties.getS3ReadTimeoutInSeconds(),
+ cloudProperties.getRequestsHttpConnectionAcquireTimeout(),
+ cloudProperties.getRequestsHttpConnectionMaxIdleSeconds(),
+ cloudProperties.getRequestsHttpConnectionMaxLifetimeSeconds(),
+ cloudProperties.getS3ReadTimeoutInSeconds(),
S3ParallelDownloaderClientType.valueOf(cloudProperties.getS3ParallelDownloaderClientType()),
cloudProperties.useRoundRobinDnsResolver(),
cloudProperties.getS3AccessKeyId(),
cloudProperties.getS3SecretAccessKey(),
cloudProperties.getS3ChecksumBehavior());
@@ -203,6 +211,14 @@
return requestsHttpConnectionAcquireTimeout;
}
+ public int getMaxIdleSeconds() {
+ return maxIdleSeconds;
+ }
+
+ public int getMaxLifetimeSeconds() {
+ return maxLifetimeSeconds;
+ }
+
public boolean isDisableSslVerify() {
return disableSslVerify;
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index e1fa4e9..e22579d 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -406,6 +406,12 @@
builder.endpointOverride(URI.create(config.getEndpoint()));
}
ApacheHttpClient.Builder apacheBuilder = ApacheHttpClient.builder();
+ if (config.getMaxIdleSeconds() > 0) {
+
apacheBuilder.connectionMaxIdleTime(Duration.ofSeconds(config.getMaxIdleSeconds()));
+ }
+ if (config.getMaxLifetimeSeconds() > 0) {
+
apacheBuilder.connectionTimeToLive(Duration.ofSeconds(config.getMaxLifetimeSeconds()));
+ }
if (config.isDisableSslVerify()) {
customHttpConfigBuilder.put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
true);
} else if (!config.getCertificates().isEmpty()) {
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
index a64e07b..39b7cb2 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
@@ -226,6 +226,13 @@
NettyNioAsyncHttpClient.Builder nettyBuilder =
NettyNioAsyncHttpClient.builder().eventLoopGroup(SHARED_EVENT_LOOP);
+ if (config.getMaxIdleSeconds() > 0) {
+
nettyBuilder.connectionMaxIdleTime(Duration.ofSeconds(config.getMaxIdleSeconds()));
+ }
+ if (config.getMaxLifetimeSeconds() > 0) {
+
nettyBuilder.connectionTimeToLive(Duration.ofSeconds(config.getMaxLifetimeSeconds()));
+ }
+
if (!config.isDisableSslVerify() &&
!config.getCertificates().isEmpty()) {
nettyBuilder.tlsTrustManagersProvider(S3TrustManagerProvider.create(config.getCertificates()));
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
index 5df3cfd..7e2216d 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
@@ -43,11 +43,13 @@
private final int readMaxRequestsPerSeconds;
private final int writeMaxRequestsPerSeconds;
private final int writeBufferSize;
+ private final int maxIdleSeconds;
+ private final int maxLifetimeSeconds;
private final String prefix;
private GCSClientConfig(String region, String endpoint, boolean
anonymousAuth, long profilerLogInterval,
long tokenAcquireTimeout, int writeMaxRequestsPerSeconds, int
readMaxRequestsPerSeconds,
- int writeBufferSize, String prefix) {
+ int writeBufferSize, int maxIdleSeconds, int maxLifetimeSeconds,
String prefix) {
this.region = region;
this.endpoint = endpoint;
this.anonymousAuth = anonymousAuth;
@@ -56,12 +58,14 @@
this.writeMaxRequestsPerSeconds = writeMaxRequestsPerSeconds;
this.readMaxRequestsPerSeconds = readMaxRequestsPerSeconds;
this.writeBufferSize = writeBufferSize;
+ this.maxIdleSeconds = maxIdleSeconds;
+ this.maxLifetimeSeconds = maxLifetimeSeconds;
this.prefix = prefix;
}
public GCSClientConfig(String region, String endpoint, boolean
anonymousAuth, long profilerLogInterval,
int writeBufferSize, String prefix) {
- this(region, endpoint, anonymousAuth, profilerLogInterval, 1, 0, 0,
writeBufferSize, prefix);
+ this(region, endpoint, anonymousAuth, profilerLogInterval, 1, 0, 0,
writeBufferSize, 0, 0, prefix);
}
public static GCSClientConfig of(ICloudProperties cloudProperties) {
@@ -69,7 +73,8 @@
cloudProperties.isStorageAnonymousAuth(),
cloudProperties.getProfilerLogInterval(),
cloudProperties.getTokenAcquireTimeout(),
cloudProperties.getWriteMaxRequestsPerSecond(),
cloudProperties.getReadMaxRequestsPerSecond(),
cloudProperties.getWriteBufferSize(),
- cloudProperties.getStoragePrefix());
+ cloudProperties.getRequestsHttpConnectionMaxIdleSeconds(),
+ cloudProperties.getRequestsHttpConnectionMaxLifetimeSeconds(),
cloudProperties.getStoragePrefix());
}
public static GCSClientConfig of(Map<String, String> configuration, int
writeBufferSize) {
@@ -123,6 +128,14 @@
return writeBufferSize;
}
+ public int getMaxIdleSeconds() {
+ return maxIdleSeconds;
+ }
+
+ public int getMaxLifetimeSeconds() {
+ return maxLifetimeSeconds;
+ }
+
public String getPrefix() {
return prefix;
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
index 79b30e5..b00439c 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
@@ -32,6 +32,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import org.apache.asterix.cloud.IWriteBufferProvider;
@@ -45,6 +46,7 @@
import org.apache.asterix.cloud.clients.profiler.RequestLimiterNoOpProfiler;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.util.CleanupUtils;
@@ -58,12 +60,15 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.api.client.http.apache.v2.ApacheHttpTransport;
import com.google.api.gax.paging.Page;
import com.google.cloud.BaseServiceException;
import com.google.cloud.ReadChannel;
+import com.google.cloud.http.HttpTransportOptions;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.HttpStorageOptions;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobListOption;
import com.google.cloud.storage.Storage.CopyRequest;
@@ -347,6 +352,19 @@
if (config.getEndpoint() != null && !config.getEndpoint().isEmpty()) {
builder.setHost(config.getEndpoint());
}
+ if (config.getMaxIdleSeconds() > 0 || config.getMaxLifetimeSeconds() >
0) {
+ HttpClientBuilder hcb =
ApacheHttpTransport.newDefaultHttpClientBuilder();
+ if (config.getMaxIdleSeconds() > 0) {
+ hcb.evictIdleConnections(config.getMaxIdleSeconds(),
TimeUnit.SECONDS);
+ }
+ if (config.getMaxLifetimeSeconds() > 0) {
+ hcb.setConnectionTimeToLive(config.getMaxLifetimeSeconds(),
TimeUnit.SECONDS);
+ }
+ ApacheHttpTransport transport = new
ApacheHttpTransport(hcb.build());
+ HttpTransportOptions transportOptions =
HttpStorageOptions.defaults().getDefaultTransportOptions()
+ .toBuilder().setHttpTransportFactory(() ->
transport).build();
+ builder.setTransportOptions(transportOptions);
+ }
return builder.build().getService();
}
diff --git
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/clients/aws/s3/S3ConnectionLifecycleTest.java
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/clients/aws/s3/S3ConnectionLifecycleTest.java
new file mode 100644
index 0000000..3e07a9b
--- /dev/null
+++
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/clients/aws/s3/S3ConnectionLifecycleTest.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.aws.s3;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.asterix.cloud.IWriteBufferProvider;
+import org.apache.asterix.cloud.WriterSingleBufferProvider;
+import org.apache.asterix.cloud.clients.ICloudGuardian;
+import org.apache.asterix.cloud.clients.ICloudWriter;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.annotations.AiProvenance;
+import org.apache.hyracks.util.annotations.AiProvenance.Agent;
+import org.apache.hyracks.util.annotations.AiProvenance.ContributionKind;
+import org.apache.hyracks.util.annotations.AiProvenance.Tool;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.findify.s3mock.S3Mock;
+import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
+
+/**
+ * Runtime test for MB-71767: verify that the {@code
cloudRequestsHttpConnectionMaxIdleSeconds} and
+ * {@code cloudRequestsHttpConnectionMaxLifetimeSeconds} settings actually
drive the underlying
+ * Apache HttpClient pool's eviction + TTL behaviour for the sync S3 path.
+ *
+ * Approach: spin up an in-process S3Mock, build a {@link S3CloudClient} with
small idle/lifetime
+ * values, drive concurrent uploads to populate the connection pool, then
observe established
+ * TCP sockets via {@code lsof} to confirm connections are torn down at the
configured times.
+ */
+@AiProvenance(agent = Agent.CLAUDE_OPUS_4_7, tool = Tool.ANTHROPIC_CLI,
contributionKind = ContributionKind.TEST_GENERATED, notes = "MB-71767: runtime
verification of HTTP idle/lifetime knobs on the sync S3 path; generated via
Claude Code")
+public class S3ConnectionLifecycleTest {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ private static final int MOCK_SERVER_PORT = 8002;
+ private static final String MOCK_SERVER_HOSTNAME = "http://127.0.0.1:" +
MOCK_SERVER_PORT;
+ private static final String MOCK_SERVER_REGION = "us-west-2";
+ private static final String PLAYGROUND_BUCKET = "lifecycle-playground";
+
+ private static final int CONCURRENCY = 8;
+ private static final int OBJ_BYTES = 256;
+
+ private static S3Mock s3MockServer;
+ private static S3Client setupClient;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ System.out.println("S3ConnectionLifecycleTest setup");
+ s3MockServer = new
S3Mock.Builder().withPort(MOCK_SERVER_PORT).withInMemoryBackend().build();
+ try {
+ s3MockServer.start();
+ } catch (Exception alreadyStarted) {
+ // ignore
+ }
+
+ setupClient = S3Client.builder().region(Region.of(MOCK_SERVER_REGION))
+ .credentialsProvider(AnonymousCredentialsProvider.create())
+
.endpointOverride(URI.create(MOCK_SERVER_HOSTNAME)).forcePathStyle(true).build();
+ try {
+
setupClient.deleteBucket(DeleteBucketRequest.builder().bucket(PLAYGROUND_BUCKET).build());
+ } catch (Exception ignore) {
+ }
+
setupClient.createBucket(CreateBucketRequest.builder().bucket(PLAYGROUND_BUCKET).build());
+
+ // lsof must be on PATH for this test to observe socket state. Skip
with a warning on other OS.
+ Assume.assumeTrue("lsof not on PATH; skipping socket observation
tests", lsofAvailable());
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ if (setupClient != null) {
+ setupClient.close();
+ }
+ if (s3MockServer != null) {
+ s3MockServer.shutdown();
+ }
+ }
+
+ /**
+ * idle=2s, lifetime=large: after a burst of concurrent uploads goes quiet
for >idle the AWS
+ * SDK v2 IdleConnectionReaper should close the pooled sockets. The reaper
has its own
+ * ~60s wake interval (independent of connectionMaxIdleTime), so we poll
for up to 70s
+ * and short-circuit as soon as we observe the drop.
+ */
+ @Test
+ public void a_idleEvictionClosesQuietConnections() throws Exception {
+ int idleSec = 2;
+ int lifetimeSec = 600;
+ S3CloudClient client = buildPatchedClient(idleSec, lifetimeSec);
+
+ // Populate the pool with a concurrent burst.
+ runConcurrent(CONCURRENCY, threadIdx -> writeOnce(client, "burst-" +
threadIdx));
+
+ int beforeIdle = countEstablishedToMock();
+ System.out.println("[idle test] after burst: established conns to mock
= " + beforeIdle);
+ assertGreaterThan("pool should be populated after burst", beforeIdle,
0);
+
+ // Go quiet for a long stretch — the AWS SDK v2 IdleConnectionReaper
wakes on its own
+ // schedule (often ~60s, independent of connectionMaxIdleTime). Sample
every 5s for
+ // up to 70s so we observe eviction whenever the reaper finally fires.
+ int totalSleepSec = 70;
+ int afterIdle = beforeIdle;
+ for (int waited = 0; waited < totalSleepSec; waited += 5) {
+ Thread.sleep(5_000L);
+ afterIdle = countEstablishedToMock();
+ System.out.println("[idle test] waited " + (waited + 5) + "s:
established conns to mock = " + afterIdle);
+ if (afterIdle < beforeIdle) {
+ break;
+ }
+ }
+ assertLessThan("idle eviction should have closed connections within "
+ totalSleepSec + "s. before="
+ + beforeIdle + " after=" + afterIdle, afterIdle, beforeIdle);
+ }
+
+ /**
+ * idle=large, lifetime=3s: keep firing requests so idle never trips, but
TTL must still retire
+ * the pooled connections. Verifies wall-clock-age semantics, not
idle-time semantics.
+ */
+ @Test
+ public void b_ttlRotatesConnectionsDespiteActivity() throws Exception {
+ int idleSec = 600;
+ int lifetimeSec = 3;
+ S3CloudClient client = buildPatchedClient(idleSec, lifetimeSec);
+
+ // Initial burst, snapshot source ports.
+ runConcurrent(CONCURRENCY, threadIdx -> writeOnce(client, "ttl-init-"
+ threadIdx));
+ Set<String> beforePorts = sourcePortsToMock();
+ System.out.println("[ttl test] after initial burst: source ports = " +
beforePorts);
+ assertGreaterThan("pool should be populated after burst",
beforePorts.size(), 0);
+
+ // Keep busy for > TTL — fire small bursts every 500ms.
+ long deadline = System.currentTimeMillis() + (lifetimeSec + 4) * 1000L;
+ int iters = 0;
+ while (System.currentTimeMillis() < deadline) {
+ final int iterSnapshot = iters;
+ runConcurrent(CONCURRENCY, threadIdx -> writeOnce(client,
"ttl-busy-" + iterSnapshot + "-" + threadIdx));
+ Thread.sleep(500);
+ iters++;
+ }
+
+ // Final burst to ensure new connections are open right now.
+ runConcurrent(CONCURRENCY, threadIdx -> writeOnce(client, "ttl-final-"
+ threadIdx));
+ Set<String> afterPorts = sourcePortsToMock();
+ System.out.println("[ttl test] after " + (lifetimeSec + 4) + "s of
activity: source ports = " + afterPorts);
+
+ Set<String> stillPresent = new HashSet<>(beforePorts);
+ stillPresent.retainAll(afterPorts);
+ System.out.println("[ttl test] source ports surviving TTL: " +
stillPresent);
+ assertLessThan("TTL should have retired all initial connections.
before=" + beforePorts + " after=" + afterPorts
+ + " survivors=" + stillPresent, stillPresent.size(),
beforePorts.size());
+ }
+
+ // --------------------------- helpers ---------------------------
+
+ /**
+ * Builds an {@link S3CloudClient} via the existing public constructor
(which defaults idle and
+ * lifetime to 0), then reflectively patches the two private fields on the
{@link S3ClientConfig}
+ * to the desired values. We then rebuild the underlying SDK client by
reaching into the cloud
+ * client. This is test-only — production code never patches these fields
after construction.
+ */
+ private static S3CloudClient buildPatchedClient(int idleSec, int
lifetimeSec) throws Exception {
+ int writeBufferSize = StorageUtil.getIntSizeInBytes(5,
StorageUtil.StorageUnit.MEGABYTE);
+ S3ClientConfig config =
+ new S3ClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME,
"", true, Collections.emptyList(), 0,
+ writeBufferSize,
S3ClientConfig.S3ParallelDownloaderClientType.ASYNC, false);
+
+ // Patch the idle/lifetime fields on the config.
+ setIntField(config, "maxIdleSeconds", idleSec);
+ setIntField(config, "maxLifetimeSeconds", lifetimeSec);
+
+ // Build the cloud client. buildClient is invoked from the ctor and
will pick up the patched values.
+ return new S3CloudClient(config,
ICloudGuardian.NoOpCloudGuardian.INSTANCE);
+ }
+
+ private static void setIntField(Object target, String name, int value)
throws Exception {
+ Field f = target.getClass().getDeclaredField(name);
+ f.setAccessible(true);
+ f.setInt(target, value);
+ }
+
+ private static void writeOnce(S3CloudClient client, String key) {
+ try {
+ IWriteBufferProvider bufferProvider = new
WriterSingleBufferProvider(client.getWriteBufferSize());
+ ICloudWriter writer = client.createWriter(PLAYGROUND_BUCKET,
"objects/" + key, bufferProvider);
+ ByteBuffer buf = ByteBuffer.allocate(OBJ_BYTES);
+ for (int i = 0; i < OBJ_BYTES; i++) {
+ buf.put((byte) (i & 0xff));
+ }
+ buf.flip();
+ writer.write(buf);
+ writer.finish();
+ } catch (Exception e) {
+ throw new RuntimeException("write failed for key=" + key, e);
+ }
+ }
+
+ @FunctionalInterface
+ private interface IntTask {
+ void run(int threadIdx) throws Exception;
+ }
+
+ private static void runConcurrent(int n, IntTask task) throws Exception {
+ ExecutorService pool = Executors.newFixedThreadPool(n);
+ try {
+ CountDownLatch start = new CountDownLatch(1);
+ CountDownLatch done = new CountDownLatch(n);
+ AtomicInteger errors = new AtomicInteger();
+ for (int i = 0; i < n; i++) {
+ final int idx = i;
+ pool.submit(() -> {
+ try {
+ start.await();
+ task.run(idx);
+ } catch (Exception e) {
+ errors.incrementAndGet();
+ LOGGER.warn("concurrent task " + idx + " failed", e);
+ } finally {
+ done.countDown();
+ }
+ });
+ }
+ start.countDown();
+ if (!done.await(30, TimeUnit.SECONDS)) {
+ throw new RuntimeException("concurrent tasks timed out");
+ }
+ if (errors.get() > 0) {
+ throw new RuntimeException(errors.get() + " concurrent tasks
failed");
+ }
+ } finally {
+ pool.shutdownNow();
+ }
+ }
+
+ private static boolean lsofAvailable() {
+ try {
+ Process p = new ProcessBuilder("lsof",
"-v").redirectErrorStream(true).start();
+ return p.waitFor(2, TimeUnit.SECONDS) && p.exitValue() == 0;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+ private static int countEstablishedToMock() throws Exception {
+ return sourcePortsToMock().size();
+ }
+
+ /**
+ * Returns the set of local source ports for ESTABLISHED TCP sockets in
this JVM that have the
+ * S3Mock port as their destination. Each pooled connection corresponds to
one source port; a
+ * rotation of connections shows up as a new set of ports.
+ */
+ private static Set<String> sourcePortsToMock() throws Exception {
+ long pid = ProcessHandle.current().pid();
+ Process p = new ProcessBuilder("lsof", "-nP", "-iTCP", "-p",
String.valueOf(pid), "-sTCP:ESTABLISHED")
+ .redirectErrorStream(true).start();
+ Set<String> ports = new HashSet<>();
+ Pattern destPattern =
Pattern.compile(":(\\d+)->(?:127\\.0\\.0\\.1|\\[?::1]?):" + MOCK_SERVER_PORT);
+ try (BufferedReader r = new BufferedReader(new
InputStreamReader(p.getInputStream()))) {
+ String line;
+ while ((line = r.readLine()) != null) {
+ Matcher m = destPattern.matcher(line);
+ if (m.find()) {
+ ports.add(m.group(1));
+ }
+ }
+ }
+ p.waitFor(3, TimeUnit.SECONDS);
+ return ports;
+ }
+
+ private static void assertGreaterThan(String msg, int actual, int floor) {
+ if (actual <= floor) {
+ throw new AssertionError(msg + " (actual=" + actual + ", expected
> " + floor + ")");
+ }
+ }
+
+ private static void assertLessThan(String msg, int actual, int ceiling) {
+ if (actual >= ceiling) {
+ throw new AssertionError(msg + " (actual=" + actual + ", expected
< " + ceiling + ")");
+ }
+ }
+}
diff --git
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/clients/google/gcs/GCSConnectionLifecycleTest.java
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/clients/google/gcs/GCSConnectionLifecycleTest.java
new file mode 100644
index 0000000..e48c4ce
--- /dev/null
+++
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/clients/google/gcs/GCSConnectionLifecycleTest.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.google.gcs;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.asterix.cloud.clients.ICloudGuardian;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.annotations.AiProvenance;
+import org.apache.hyracks.util.annotations.AiProvenance.Agent;
+import org.apache.hyracks.util.annotations.AiProvenance.ContributionKind;
+import org.apache.hyracks.util.annotations.AiProvenance.Tool;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+
+/**
+ * Runtime test for MB-71767: verify that the {@code
cloudRequestsHttpConnectionMaxIdleSeconds} and
+ * {@code cloudRequestsHttpConnectionMaxLifetimeSeconds} settings actually
drive the underlying
+ * Apache HttpClient pool's eviction + TTL behaviour for the GCS path.
+ *
+ * Uses a JDK built-in {@link HttpServer} that returns a GCS-shaped 404 for
every request. We then
+ * exercise {@link GCSCloudClient#exists(String, String)} which gracefully
maps 404 to {@code
+ * false}, so the test path produces one HTTP request per call without
throwing. That's enough to
+ * populate the Apache HttpClient connection pool with pooled sockets; {@code
lsof} then lets us
+ * observe whether {@code evictIdleConnections} / {@code
setConnectionTimeToLive} kill them on the
+ * configured schedule. Zero external dependencies — runs on any OS with
{@code lsof} on PATH.
+ *
+ * Mirror of {@code S3ConnectionLifecycleTest} for the GCS code path.
+ */
+@AiProvenance(agent = Agent.CLAUDE_OPUS_4_7, tool = Tool.ANTHROPIC_CLI,
contributionKind = ContributionKind.TEST_GENERATED, notes = "MB-71767: runtime
verification of HTTP idle/lifetime knobs on the GCS path; generated via Claude
Code")
+public class GCSConnectionLifecycleTest {
+
+ private static final String MOCK_SERVER_REGION = "us-west2";
+ private static final String PLAYGROUND_BUCKET = "lifecycle-playground-gcs";
+
+ private static final int CONCURRENCY = 8;
+
+ private static HttpServer mockServer;
+ private static int mockServerPort;
+ private static String mockServerHostname;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ System.out.println("GCSConnectionLifecycleTest setup");
+ Assume.assumeTrue("lsof not on PATH; skipping socket observation
tests", lsofAvailable());
+
+ // Bind to an ephemeral port on loopback. The handler always responds
with a GCS-shaped
+ // 404 so GCSCloudClient.exists() returns false without throwing,
which keeps the test
+ // focused on TCP socket lifecycle.
+ mockServer = HttpServer.create(new InetSocketAddress("127.0.0.1", 0),
64);
+ mockServer.createContext("/", new GcsStubHandler());
+ mockServer.setExecutor(Executors.newFixedThreadPool(16));
+ mockServer.start();
+ mockServerPort = mockServer.getAddress().getPort();
+ mockServerHostname = "http://127.0.0.1:" + mockServerPort;
+ System.out.println("stub server up at " + mockServerHostname);
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ if (mockServer != null) {
+ mockServer.stop(0);
+ }
+ }
+
+ /**
+ * idle=2s, lifetime=large: after a burst of concurrent exists() goes
quiet for >idle the
+ * Apache HttpClient eviction thread should close the pooled sockets.
Sample every 2s for
+ * up to 20s as a safety net.
+ */
+ @Test
+ public void a_idleEvictionClosesQuietConnections() throws Exception {
+ int idleSec = 2;
+ int lifetimeSec = 600;
+ GCSCloudClient client = buildPatchedClient(idleSec, lifetimeSec);
+
+ runConcurrent(CONCURRENCY, threadIdx -> existsOnce(client, "burst-" +
threadIdx));
+
+ int beforeIdle = countEstablishedToMock();
+ System.out.println("[idle test] after burst: established conns to mock
= " + beforeIdle);
+ assertGreaterThan("pool should be populated after burst", beforeIdle,
0);
+
+ int totalSleepSec = 20;
+ int afterIdle = beforeIdle;
+ for (int waited = 0; waited < totalSleepSec; waited += 2) {
+ Thread.sleep(2_000L);
+ afterIdle = countEstablishedToMock();
+ System.out.println("[idle test] waited " + (waited + 2) + "s:
established conns to mock = " + afterIdle);
+ if (afterIdle < beforeIdle) {
+ break;
+ }
+ }
+ assertLessThan("idle eviction should have closed connections within "
+ totalSleepSec + "s. before="
+ + beforeIdle + " after=" + afterIdle, afterIdle, beforeIdle);
+ }
+
+ /**
+ * idle=large, lifetime=3s: keep firing requests so idle never trips, but
TTL must still retire
+ * the pooled connections. Verifies wall-clock-age semantics.
+ */
+ @Test
+ public void b_ttlRotatesConnectionsDespiteActivity() throws Exception {
+ int idleSec = 600;
+ int lifetimeSec = 3;
+ GCSCloudClient client = buildPatchedClient(idleSec, lifetimeSec);
+
+ runConcurrent(CONCURRENCY, threadIdx -> existsOnce(client, "ttl-init-"
+ threadIdx));
+ Set<String> beforePorts = sourcePortsToMock();
+ System.out.println("[ttl test] after initial burst: source ports = " +
beforePorts);
+ assertGreaterThan("pool should be populated after burst",
beforePorts.size(), 0);
+
+ long deadline = System.currentTimeMillis() + (lifetimeSec + 4) * 1000L;
+ int iters = 0;
+ while (System.currentTimeMillis() < deadline) {
+ final int iterSnapshot = iters;
+ runConcurrent(CONCURRENCY, threadIdx -> existsOnce(client,
"ttl-busy-" + iterSnapshot + "-" + threadIdx));
+ Thread.sleep(500);
+ iters++;
+ }
+
+ runConcurrent(CONCURRENCY, threadIdx -> existsOnce(client,
"ttl-final-" + threadIdx));
+ Set<String> afterPorts = sourcePortsToMock();
+ System.out.println("[ttl test] after " + (lifetimeSec + 4) + "s of
activity: source ports = " + afterPorts);
+
+ Set<String> stillPresent = new HashSet<>(beforePorts);
+ stillPresent.retainAll(afterPorts);
+ System.out.println("[ttl test] source ports surviving TTL: " +
stillPresent);
+ assertLessThan("TTL should have retired all initial connections.
before=" + beforePorts + " after=" + afterPorts
+ + " survivors=" + stillPresent, stillPresent.size(),
beforePorts.size());
+ }
+
+ // --------------------------- helpers ---------------------------
+
+ /**
+ * Builds a {@link GCSCloudClient} via the existing public constructor
(which defaults idle and
+ * lifetime to 0), then reflectively patches the two private fields on the
{@link GCSClientConfig}
+ * to the desired values. The cloud client's constructor invokes {@code
buildClient(config)}
+ * after we patch, so the patched values reach the {@code
ApacheHttpTransport} that backs the
+ * google-cloud-storage transport.
+ */
+ private static GCSCloudClient buildPatchedClient(int idleSec, int
lifetimeSec) throws Exception {
+ int writeBufferSize = StorageUtil.getIntSizeInBytes(5,
StorageUtil.StorageUnit.MEGABYTE);
+ GCSClientConfig config =
+ new GCSClientConfig(MOCK_SERVER_REGION, mockServerHostname,
true, 0, writeBufferSize, "");
+
+ setIntField(config, "maxIdleSeconds", idleSec);
+ setIntField(config, "maxLifetimeSeconds", lifetimeSec);
+
+ return new GCSCloudClient(config,
ICloudGuardian.NoOpCloudGuardian.INSTANCE);
+ }
+
+ private static void setIntField(Object target, String name, int value)
throws Exception {
+ Field f = target.getClass().getDeclaredField(name);
+ f.setAccessible(true);
+ f.setInt(target, value);
+ }
+
+ /**
+ * Drives one cheap GCS API call through the cloud client. Our stub
returns 404 for every
+ * path; {@link GCSCloudClient#exists} maps that to {@code false} without
throwing, so the
+ * only observable side effect is one HTTP request being issued through
the pooled HTTP
+ * client.
+ */
+ private static void existsOnce(GCSCloudClient client, String key) {
+ try {
+ client.exists(PLAYGROUND_BUCKET, "objects/" + key);
+ } catch (Exception e) {
+ // We don't care about response correctness — only that a TCP
connection was made.
+ }
+ }
+
+ @FunctionalInterface
+ private interface IntTask {
+ void run(int threadIdx) throws Exception;
+ }
+
+ private static void runConcurrent(int n, IntTask task) throws Exception {
+ ExecutorService pool = Executors.newFixedThreadPool(n);
+ try {
+ CountDownLatch start = new CountDownLatch(1);
+ CountDownLatch done = new CountDownLatch(n);
+ AtomicInteger errors = new AtomicInteger();
+ for (int i = 0; i < n; i++) {
+ final int idx = i;
+ pool.submit(() -> {
+ try {
+ start.await();
+ task.run(idx);
+ } catch (Exception e) {
+ errors.incrementAndGet();
+ System.err.println("concurrent task " + idx + "
failed: " + e);
+ e.printStackTrace(System.err);
+ } finally {
+ done.countDown();
+ }
+ });
+ }
+ start.countDown();
+ if (!done.await(30, TimeUnit.SECONDS)) {
+ throw new RuntimeException("concurrent tasks timed out");
+ }
+ // The test only cares about TCP socket lifecycle, not response
correctness — don't
+ // fail the test just because the stub gave a 404 the SDK didn't
love.
+ } finally {
+ pool.shutdownNow();
+ }
+ }
+
+ private static boolean lsofAvailable() {
+ try {
+ Process p = new ProcessBuilder("lsof",
"-v").redirectErrorStream(true).start();
+ return p.waitFor(2, TimeUnit.SECONDS) && p.exitValue() == 0;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+ private static int countEstablishedToMock() throws Exception {
+ return sourcePortsToMock().size();
+ }
+
+ /**
+ * Returns the set of local source ports for ESTABLISHED TCP sockets in
this JVM that have the
+ * stub server port as their destination. Each pooled connection
corresponds to one source
+ * port; a rotation of connections shows up as a new set of ports.
+ */
+ private static Set<String> sourcePortsToMock() throws Exception {
+ long pid = ProcessHandle.current().pid();
+ Process p = new ProcessBuilder("lsof", "-nP", "-iTCP", "-p",
String.valueOf(pid), "-sTCP:ESTABLISHED")
+ .redirectErrorStream(true).start();
+ Set<String> ports = new HashSet<>();
+ Pattern destPattern =
Pattern.compile(":(\\d+)->(?:127\\.0\\.0\\.1|\\[?::1]?):" + mockServerPort);
+ try (BufferedReader r = new BufferedReader(new
InputStreamReader(p.getInputStream()))) {
+ String line;
+ while ((line = r.readLine()) != null) {
+ Matcher m = destPattern.matcher(line);
+ if (m.find()) {
+ ports.add(m.group(1));
+ }
+ }
+ }
+ p.waitFor(3, TimeUnit.SECONDS);
+ return ports;
+ }
+
+ private static void assertGreaterThan(String msg, int actual, int floor) {
+ if (actual <= floor) {
+ throw new AssertionError(msg + " (actual=" + actual + ", expected
> " + floor + ")");
+ }
+ }
+
+ private static void assertLessThan(String msg, int actual, int ceiling) {
+ if (actual >= ceiling) {
+ throw new AssertionError(msg + " (actual=" + actual + ", expected
< " + ceiling + ")");
+ }
+ }
+
+ /**
+ * Minimal HTTP handler that returns a GCS-shaped 404 response for every
path. The body is the
+ * standard GCS error envelope so the SDK classifies the error as code=404
and short-circuits
+ * to a clean "not found" outcome (no throw, no retry storm).
+ */
+ private static final class GcsStubHandler implements HttpHandler {
+ private static final byte[] BODY =
("{\"error\":{\"code\":404,\"message\":\"Not Found\","
+ + "\"errors\":[{\"reason\":\"notFound\",\"message\":\"Not
Found\"}]}}").getBytes();
+
+ @Override
+ public void handle(HttpExchange exchange) throws IOException {
+ try {
+ exchange.getResponseHeaders().add("Content-Type",
"application/json; charset=UTF-8");
+ exchange.sendResponseHeaders(404, BODY.length);
+ try (OutputStream os = exchange.getResponseBody()) {
+ os.write(BODY);
+ }
+ } finally {
+ exchange.close();
+ }
+ }
+ }
+}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21235?usp=email
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings?usp=email
Gerrit-MessageType: merged
Gerrit-Project: asterixdb
Gerrit-Branch: lumina
Gerrit-Change-Id: I52bfe3e855b654b48f4294a9cd52e59406555aeb
Gerrit-Change-Number: 21235
Gerrit-PatchSet: 11
Gerrit-Owner: Rithwik Koul <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Murtadha Hubail <[email protected]>