>From Murtadha Hubail <[email protected]>:

Murtadha Hubail has submitted this change. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21235?usp=email )

Change subject: [ASTERIXDB-3774][CLOUD] Adding HTTP idle/lifetime config in S3 
and GCS cloud clients
......................................................................

[ASTERIXDB-3774][CLOUD] Adding HTTP idle/lifetime config in S3 and GCS cloud 
clients

- user model changes: no
- storage format changes: no
- interface changes: no

Apply an idle timeout and a lifetime cap to the Apache HttpClient pool
backing the S3 and GCS clients: a pooled connection idle longer than
cloudRequestsHttpConnectionMaxIdleSeconds is evicted, and a connection          
                                                               older than 
cloudRequestsHttpConnectionMaxLifetimeSeconds is retired on                     
                                                its next lease regardless of 
activity.

Ex-ref: MB-71767
Change-Id: I52bfe3e855b654b48f4294a9cd52e59406555aeb
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21235
Tested-by: Murtadha Hubail <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
Integration-Tests: Jenkins <[email protected]>
---
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
A 
asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/clients/aws/s3/S3ConnectionLifecycleTest.java
A 
asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/clients/google/gcs/GCSConnectionLifecycleTest.java
7 files changed, 705 insertions(+), 8 deletions(-)

Approvals:
  Murtadha Hubail: Looks good to me, approved; Verified
  Jenkins: Verified




diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
index 73237ce..6b98436 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
@@ -50,6 +50,8 @@
     private final int requestsMaxHttpConnections;
     private final int requestsMaxPendingHttpConnections;
     private final int requestsHttpConnectionAcquireTimeout;
+    private final int maxIdleSeconds;
+    private final int maxLifetimeSeconds;
     private final boolean forcePathStyle;
     private final boolean disableSslVerify;
     private final int s3ReadTimeoutInSeconds;
@@ -63,7 +65,7 @@
             Collection<String> certificates, long profilerLogInterval, int 
writeBufferSize,
             S3ParallelDownloaderClientType parallelDownloaderClientType, 
boolean roundRobinDnsResolver) {
         this(region, endpoint, prefix, anonymousAuth, certificates, 
profilerLogInterval, writeBufferSize, 1, 0, 0, 0,
-                false, false, 0, 0, -1, parallelDownloaderClientType, 
roundRobinDnsResolver, "", "",
+                false, false, 0, 0, 0, 0, -1, parallelDownloaderClientType, 
roundRobinDnsResolver, "", "",
                 S3ChecksumBehavior.defaultForEndpoint(endpoint));
     }

@@ -71,9 +73,10 @@
             Collection<String> certificates, long profilerLogInterval, int 
writeBufferSize, long tokenAcquireTimeout,
             int writeMaxRequestsPerSeconds, int readMaxRequestsPerSeconds, int 
requestsMaxHttpConnections,
             boolean forcePathStyle, boolean disableSslVerify, int 
requestsMaxPendingHttpConnections,
-            int requestsHttpConnectionAcquireTimeout, int 
s3ReadTimeoutInSeconds,
-            S3ParallelDownloaderClientType parallelDownloaderClientType, 
boolean roundRobinDnsResolver,
-            String accessKeyId, String secretAccessKey, S3ChecksumBehavior 
checksumBehavior) {
+            int requestsHttpConnectionAcquireTimeout, int maxIdleSeconds, int 
maxLifetimeSeconds,
+            int s3ReadTimeoutInSeconds, S3ParallelDownloaderClientType 
parallelDownloaderClientType,
+            boolean roundRobinDnsResolver, String accessKeyId, String 
secretAccessKey,
+            S3ChecksumBehavior checksumBehavior) {
         this.region = Objects.requireNonNull(region, "region");
         this.endpoint = endpoint;
         this.prefix = Objects.requireNonNull(prefix, "prefix");
@@ -87,6 +90,8 @@
         this.requestsMaxHttpConnections = requestsMaxHttpConnections;
         this.requestsMaxPendingHttpConnections = 
requestsMaxPendingHttpConnections;
         this.requestsHttpConnectionAcquireTimeout = 
requestsHttpConnectionAcquireTimeout;
+        this.maxIdleSeconds = maxIdleSeconds;
+        this.maxLifetimeSeconds = maxLifetimeSeconds;
         this.forcePathStyle = forcePathStyle;
         this.disableSslVerify = disableSslVerify;
         this.s3ReadTimeoutInSeconds = s3ReadTimeoutInSeconds;
@@ -105,7 +110,10 @@
                 cloudProperties.getWriteMaxRequestsPerSecond(), 
cloudProperties.getReadMaxRequestsPerSecond(),
                 cloudProperties.getRequestsMaxHttpConnections(), 
cloudProperties.isStorageForcePathStyle(),
                 cloudProperties.isStorageDisableSSLVerify(), 
cloudProperties.getRequestsMaxPendingHttpConnections(),
-                cloudProperties.getRequestsHttpConnectionAcquireTimeout(), 
cloudProperties.getS3ReadTimeoutInSeconds(),
+                cloudProperties.getRequestsHttpConnectionAcquireTimeout(),
+                cloudProperties.getRequestsHttpConnectionMaxIdleSeconds(),
+                cloudProperties.getRequestsHttpConnectionMaxLifetimeSeconds(),
+                cloudProperties.getS3ReadTimeoutInSeconds(),
                 
S3ParallelDownloaderClientType.valueOf(cloudProperties.getS3ParallelDownloaderClientType()),
                 cloudProperties.useRoundRobinDnsResolver(), 
cloudProperties.getS3AccessKeyId(),
                 cloudProperties.getS3SecretAccessKey(), 
cloudProperties.getS3ChecksumBehavior());
@@ -203,6 +211,14 @@
         return requestsHttpConnectionAcquireTimeout;
     }

+    public int getMaxIdleSeconds() {
+        return maxIdleSeconds;
+    }
+
+    public int getMaxLifetimeSeconds() {
+        return maxLifetimeSeconds;
+    }
+
     public boolean isDisableSslVerify() {
         return disableSslVerify;
     }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index e1fa4e9..e22579d 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -406,6 +406,12 @@
             builder.endpointOverride(URI.create(config.getEndpoint()));
         }
         ApacheHttpClient.Builder apacheBuilder = ApacheHttpClient.builder();
+        if (config.getMaxIdleSeconds() > 0) {
+            
apacheBuilder.connectionMaxIdleTime(Duration.ofSeconds(config.getMaxIdleSeconds()));
+        }
+        if (config.getMaxLifetimeSeconds() > 0) {
+            
apacheBuilder.connectionTimeToLive(Duration.ofSeconds(config.getMaxLifetimeSeconds()));
+        }
         if (config.isDisableSslVerify()) {
             
customHttpConfigBuilder.put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, 
true);
         } else if (!config.getCertificates().isEmpty()) {
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
index a64e07b..39b7cb2 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
@@ -226,6 +226,13 @@
         NettyNioAsyncHttpClient.Builder nettyBuilder =
                 
NettyNioAsyncHttpClient.builder().eventLoopGroup(SHARED_EVENT_LOOP);

+        if (config.getMaxIdleSeconds() > 0) {
+            
nettyBuilder.connectionMaxIdleTime(Duration.ofSeconds(config.getMaxIdleSeconds()));
+        }
+        if (config.getMaxLifetimeSeconds() > 0) {
+            
nettyBuilder.connectionTimeToLive(Duration.ofSeconds(config.getMaxLifetimeSeconds()));
+        }
+
         if (!config.isDisableSslVerify() && 
!config.getCertificates().isEmpty()) {
             
nettyBuilder.tlsTrustManagersProvider(S3TrustManagerProvider.create(config.getCertificates()));
         }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
index 5df3cfd..7e2216d 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
@@ -43,11 +43,13 @@
     private final int readMaxRequestsPerSeconds;
     private final int writeMaxRequestsPerSeconds;
     private final int writeBufferSize;
+    private final int maxIdleSeconds;
+    private final int maxLifetimeSeconds;
     private final String prefix;

     private GCSClientConfig(String region, String endpoint, boolean 
anonymousAuth, long profilerLogInterval,
             long tokenAcquireTimeout, int writeMaxRequestsPerSeconds, int 
readMaxRequestsPerSeconds,
-            int writeBufferSize, String prefix) {
+            int writeBufferSize, int maxIdleSeconds, int maxLifetimeSeconds, 
String prefix) {
         this.region = region;
         this.endpoint = endpoint;
         this.anonymousAuth = anonymousAuth;
@@ -56,12 +58,14 @@
         this.writeMaxRequestsPerSeconds = writeMaxRequestsPerSeconds;
         this.readMaxRequestsPerSeconds = readMaxRequestsPerSeconds;
         this.writeBufferSize = writeBufferSize;
+        this.maxIdleSeconds = maxIdleSeconds;
+        this.maxLifetimeSeconds = maxLifetimeSeconds;
         this.prefix = prefix;
     }

     public GCSClientConfig(String region, String endpoint, boolean 
anonymousAuth, long profilerLogInterval,
             int writeBufferSize, String prefix) {
-        this(region, endpoint, anonymousAuth, profilerLogInterval, 1, 0, 0, 
writeBufferSize, prefix);
+        this(region, endpoint, anonymousAuth, profilerLogInterval, 1, 0, 0, 
writeBufferSize, 0, 0, prefix);
     }

     public static GCSClientConfig of(ICloudProperties cloudProperties) {
@@ -69,7 +73,8 @@
                 cloudProperties.isStorageAnonymousAuth(), 
cloudProperties.getProfilerLogInterval(),
                 cloudProperties.getTokenAcquireTimeout(), 
cloudProperties.getWriteMaxRequestsPerSecond(),
                 cloudProperties.getReadMaxRequestsPerSecond(), 
cloudProperties.getWriteBufferSize(),
-                cloudProperties.getStoragePrefix());
+                cloudProperties.getRequestsHttpConnectionMaxIdleSeconds(),
+                cloudProperties.getRequestsHttpConnectionMaxLifetimeSeconds(), 
cloudProperties.getStoragePrefix());
     }

     public static GCSClientConfig of(Map<String, String> configuration, int 
writeBufferSize) {
@@ -123,6 +128,14 @@
         return writeBufferSize;
     }

+    public int getMaxIdleSeconds() {
+        return maxIdleSeconds;
+    }
+
+    public int getMaxLifetimeSeconds() {
+        return maxLifetimeSeconds;
+    }
+
     public String getPrefix() {
         return prefix;
     }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
index 79b30e5..b00439c 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
@@ -32,6 +32,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;

 import org.apache.asterix.cloud.IWriteBufferProvider;
@@ -45,6 +46,7 @@
 import org.apache.asterix.cloud.clients.profiler.RequestLimiterNoOpProfiler;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.util.CleanupUtils;
@@ -58,12 +60,15 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.api.client.http.apache.v2.ApacheHttpTransport;
 import com.google.api.gax.paging.Page;
 import com.google.cloud.BaseServiceException;
 import com.google.cloud.ReadChannel;
+import com.google.cloud.http.HttpTransportOptions;
 import com.google.cloud.storage.Blob;
 import com.google.cloud.storage.BlobId;
 import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.HttpStorageOptions;
 import com.google.cloud.storage.Storage;
 import com.google.cloud.storage.Storage.BlobListOption;
 import com.google.cloud.storage.Storage.CopyRequest;
@@ -347,6 +352,19 @@
         if (config.getEndpoint() != null && !config.getEndpoint().isEmpty()) {
             builder.setHost(config.getEndpoint());
         }
+        if (config.getMaxIdleSeconds() > 0 || config.getMaxLifetimeSeconds() > 
0) {
+            HttpClientBuilder hcb = 
ApacheHttpTransport.newDefaultHttpClientBuilder();
+            if (config.getMaxIdleSeconds() > 0) {
+                hcb.evictIdleConnections(config.getMaxIdleSeconds(), 
TimeUnit.SECONDS);
+            }
+            if (config.getMaxLifetimeSeconds() > 0) {
+                hcb.setConnectionTimeToLive(config.getMaxLifetimeSeconds(), 
TimeUnit.SECONDS);
+            }
+            ApacheHttpTransport transport = new 
ApacheHttpTransport(hcb.build());
+            HttpTransportOptions transportOptions = 
HttpStorageOptions.defaults().getDefaultTransportOptions()
+                    .toBuilder().setHttpTransportFactory(() -> 
transport).build();
+            builder.setTransportOptions(transportOptions);
+        }
         return builder.build().getService();
     }

diff --git 
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/clients/aws/s3/S3ConnectionLifecycleTest.java
 
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/clients/aws/s3/S3ConnectionLifecycleTest.java
new file mode 100644
index 0000000..3e07a9b
--- /dev/null
+++ 
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/clients/aws/s3/S3ConnectionLifecycleTest.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.aws.s3;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.asterix.cloud.IWriteBufferProvider;
+import org.apache.asterix.cloud.WriterSingleBufferProvider;
+import org.apache.asterix.cloud.clients.ICloudGuardian;
+import org.apache.asterix.cloud.clients.ICloudWriter;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.annotations.AiProvenance;
+import org.apache.hyracks.util.annotations.AiProvenance.Agent;
+import org.apache.hyracks.util.annotations.AiProvenance.ContributionKind;
+import org.apache.hyracks.util.annotations.AiProvenance.Tool;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.findify.s3mock.S3Mock;
+import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
+
+/**
+ * Runtime test for MB-71767: verify that the {@code 
cloudRequestsHttpConnectionMaxIdleSeconds} and
+ * {@code cloudRequestsHttpConnectionMaxLifetimeSeconds} settings actually 
drive the underlying
+ * Apache HttpClient pool's eviction + TTL behaviour for the sync S3 path.
+ *
+ * Approach: spin up an in-process S3Mock, build a {@link S3CloudClient} with 
small idle/lifetime
+ * values, drive concurrent uploads to populate the connection pool, then 
observe established
+ * TCP sockets via {@code lsof} to confirm connections are torn down at the 
configured times.
+ */
+@AiProvenance(agent = Agent.CLAUDE_OPUS_4_7, tool = Tool.ANTHROPIC_CLI, 
contributionKind = ContributionKind.TEST_GENERATED, notes = "MB-71767: runtime 
verification of HTTP idle/lifetime knobs on the sync S3 path; generated via 
Claude Code")
+public class S3ConnectionLifecycleTest {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private static final int MOCK_SERVER_PORT = 8002;
+    private static final String MOCK_SERVER_HOSTNAME = "http://127.0.0.1:"; + 
MOCK_SERVER_PORT;
+    private static final String MOCK_SERVER_REGION = "us-west-2";
+    private static final String PLAYGROUND_BUCKET = "lifecycle-playground";
+
+    private static final int CONCURRENCY = 8;
+    private static final int OBJ_BYTES = 256;
+
+    private static S3Mock s3MockServer;
+    private static S3Client setupClient;
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        System.out.println("S3ConnectionLifecycleTest setup");
+        s3MockServer = new 
S3Mock.Builder().withPort(MOCK_SERVER_PORT).withInMemoryBackend().build();
+        try {
+            s3MockServer.start();
+        } catch (Exception alreadyStarted) {
+            // ignore
+        }
+
+        setupClient = S3Client.builder().region(Region.of(MOCK_SERVER_REGION))
+                .credentialsProvider(AnonymousCredentialsProvider.create())
+                
.endpointOverride(URI.create(MOCK_SERVER_HOSTNAME)).forcePathStyle(true).build();
+        try {
+            
setupClient.deleteBucket(DeleteBucketRequest.builder().bucket(PLAYGROUND_BUCKET).build());
+        } catch (Exception ignore) {
+        }
+        
setupClient.createBucket(CreateBucketRequest.builder().bucket(PLAYGROUND_BUCKET).build());
+
+        // lsof must be on PATH for this test to observe socket state. Skip 
with a warning on other OS.
+        Assume.assumeTrue("lsof not on PATH; skipping socket observation 
tests", lsofAvailable());
+    }
+
+    @AfterClass
+    public static void tearDown() {
+        if (setupClient != null) {
+            setupClient.close();
+        }
+        if (s3MockServer != null) {
+            s3MockServer.shutdown();
+        }
+    }
+
+    /**
+     * idle=2s, lifetime=large: after a burst of concurrent uploads goes quiet 
for >idle the AWS
+     * SDK v2 IdleConnectionReaper should close the pooled sockets. The reaper 
has its own
+     * ~60s wake interval (independent of connectionMaxIdleTime), so we poll 
for up to 70s
+     * and short-circuit as soon as we observe the drop.
+     */
+    @Test
+    public void a_idleEvictionClosesQuietConnections() throws Exception {
+        int idleSec = 2;
+        int lifetimeSec = 600;
+        S3CloudClient client = buildPatchedClient(idleSec, lifetimeSec);
+
+        // Populate the pool with a concurrent burst.
+        runConcurrent(CONCURRENCY, threadIdx -> writeOnce(client, "burst-" + 
threadIdx));
+
+        int beforeIdle = countEstablishedToMock();
+        System.out.println("[idle test] after burst: established conns to mock 
= " + beforeIdle);
+        assertGreaterThan("pool should be populated after burst", beforeIdle, 
0);
+
+        // Go quiet for a long stretch — the AWS SDK v2 IdleConnectionReaper 
wakes on its own
+        // schedule (often ~60s, independent of connectionMaxIdleTime). Sample 
every 5s for
+        // up to 70s so we observe eviction whenever the reaper finally fires.
+        int totalSleepSec = 70;
+        int afterIdle = beforeIdle;
+        for (int waited = 0; waited < totalSleepSec; waited += 5) {
+            Thread.sleep(5_000L);
+            afterIdle = countEstablishedToMock();
+            System.out.println("[idle test] waited " + (waited + 5) + "s: 
established conns to mock = " + afterIdle);
+            if (afterIdle < beforeIdle) {
+                break;
+            }
+        }
+        assertLessThan("idle eviction should have closed connections within " 
+ totalSleepSec + "s. before="
+                + beforeIdle + " after=" + afterIdle, afterIdle, beforeIdle);
+    }
+
+    /**
+     * idle=large, lifetime=3s: keep firing requests so idle never trips, but 
TTL must still retire
+     * the pooled connections. Verifies wall-clock-age semantics, not 
idle-time semantics.
+     */
+    @Test
+    public void b_ttlRotatesConnectionsDespiteActivity() throws Exception {
+        int idleSec = 600;
+        int lifetimeSec = 3;
+        S3CloudClient client = buildPatchedClient(idleSec, lifetimeSec);
+
+        // Initial burst, snapshot source ports.
+        runConcurrent(CONCURRENCY, threadIdx -> writeOnce(client, "ttl-init-" 
+ threadIdx));
+        Set<String> beforePorts = sourcePortsToMock();
+        System.out.println("[ttl test] after initial burst: source ports = " + 
beforePorts);
+        assertGreaterThan("pool should be populated after burst", 
beforePorts.size(), 0);
+
+        // Keep busy for > TTL — fire small bursts every 500ms.
+        long deadline = System.currentTimeMillis() + (lifetimeSec + 4) * 1000L;
+        int iters = 0;
+        while (System.currentTimeMillis() < deadline) {
+            final int iterSnapshot = iters;
+            runConcurrent(CONCURRENCY, threadIdx -> writeOnce(client, 
"ttl-busy-" + iterSnapshot + "-" + threadIdx));
+            Thread.sleep(500);
+            iters++;
+        }
+
+        // Final burst to ensure new connections are open right now.
+        runConcurrent(CONCURRENCY, threadIdx -> writeOnce(client, "ttl-final-" 
+ threadIdx));
+        Set<String> afterPorts = sourcePortsToMock();
+        System.out.println("[ttl test] after " + (lifetimeSec + 4) + "s of 
activity: source ports = " + afterPorts);
+
+        Set<String> stillPresent = new HashSet<>(beforePorts);
+        stillPresent.retainAll(afterPorts);
+        System.out.println("[ttl test] source ports surviving TTL: " + 
stillPresent);
+        assertLessThan("TTL should have retired all initial connections. 
before=" + beforePorts + " after=" + afterPorts
+                + " survivors=" + stillPresent, stillPresent.size(), 
beforePorts.size());
+    }
+
+    // --------------------------- helpers ---------------------------
+
+    /**
+     * Builds an {@link S3CloudClient} via the existing public constructor 
(which defaults idle and
+     * lifetime to 0), then reflectively patches the two private fields on the 
{@link S3ClientConfig}
+     * to the desired values. We then rebuild the underlying SDK client by 
reaching into the cloud
+     * client. This is test-only — production code never patches these fields 
after construction.
+     */
+    private static S3CloudClient buildPatchedClient(int idleSec, int 
lifetimeSec) throws Exception {
+        int writeBufferSize = StorageUtil.getIntSizeInBytes(5, 
StorageUtil.StorageUnit.MEGABYTE);
+        S3ClientConfig config =
+                new S3ClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, 
"", true, Collections.emptyList(), 0,
+                        writeBufferSize, 
S3ClientConfig.S3ParallelDownloaderClientType.ASYNC, false);
+
+        // Patch the idle/lifetime fields on the config.
+        setIntField(config, "maxIdleSeconds", idleSec);
+        setIntField(config, "maxLifetimeSeconds", lifetimeSec);
+
+        // Build the cloud client. buildClient is invoked from the ctor and 
will pick up the patched values.
+        return new S3CloudClient(config, 
ICloudGuardian.NoOpCloudGuardian.INSTANCE);
+    }
+
+    private static void setIntField(Object target, String name, int value) 
throws Exception {
+        Field f = target.getClass().getDeclaredField(name);
+        f.setAccessible(true);
+        f.setInt(target, value);
+    }
+
+    private static void writeOnce(S3CloudClient client, String key) {
+        try {
+            IWriteBufferProvider bufferProvider = new 
WriterSingleBufferProvider(client.getWriteBufferSize());
+            ICloudWriter writer = client.createWriter(PLAYGROUND_BUCKET, 
"objects/" + key, bufferProvider);
+            ByteBuffer buf = ByteBuffer.allocate(OBJ_BYTES);
+            for (int i = 0; i < OBJ_BYTES; i++) {
+                buf.put((byte) (i & 0xff));
+            }
+            buf.flip();
+            writer.write(buf);
+            writer.finish();
+        } catch (Exception e) {
+            throw new RuntimeException("write failed for key=" + key, e);
+        }
+    }
+
+    @FunctionalInterface
+    private interface IntTask {
+        void run(int threadIdx) throws Exception;
+    }
+
+    private static void runConcurrent(int n, IntTask task) throws Exception {
+        ExecutorService pool = Executors.newFixedThreadPool(n);
+        try {
+            CountDownLatch start = new CountDownLatch(1);
+            CountDownLatch done = new CountDownLatch(n);
+            AtomicInteger errors = new AtomicInteger();
+            for (int i = 0; i < n; i++) {
+                final int idx = i;
+                pool.submit(() -> {
+                    try {
+                        start.await();
+                        task.run(idx);
+                    } catch (Exception e) {
+                        errors.incrementAndGet();
+                        LOGGER.warn("concurrent task " + idx + " failed", e);
+                    } finally {
+                        done.countDown();
+                    }
+                });
+            }
+            start.countDown();
+            if (!done.await(30, TimeUnit.SECONDS)) {
+                throw new RuntimeException("concurrent tasks timed out");
+            }
+            if (errors.get() > 0) {
+                throw new RuntimeException(errors.get() + " concurrent tasks 
failed");
+            }
+        } finally {
+            pool.shutdownNow();
+        }
+    }
+
+    private static boolean lsofAvailable() {
+        try {
+            Process p = new ProcessBuilder("lsof", 
"-v").redirectErrorStream(true).start();
+            return p.waitFor(2, TimeUnit.SECONDS) && p.exitValue() == 0;
+        } catch (Exception e) {
+            return false;
+        }
+    }
+
+    private static int countEstablishedToMock() throws Exception {
+        return sourcePortsToMock().size();
+    }
+
+    /**
+     * Returns the set of local source ports for ESTABLISHED TCP sockets in 
this JVM that have the
+     * S3Mock port as their destination. Each pooled connection corresponds to 
one source port; a
+     * rotation of connections shows up as a new set of ports.
+     */
+    private static Set<String> sourcePortsToMock() throws Exception {
+        long pid = ProcessHandle.current().pid();
+        Process p = new ProcessBuilder("lsof", "-nP", "-iTCP", "-p", 
String.valueOf(pid), "-sTCP:ESTABLISHED")
+                .redirectErrorStream(true).start();
+        Set<String> ports = new HashSet<>();
+        Pattern destPattern = 
Pattern.compile(":(\\d+)->(?:127\\.0\\.0\\.1|\\[?::1]?):" + MOCK_SERVER_PORT);
+        try (BufferedReader r = new BufferedReader(new 
InputStreamReader(p.getInputStream()))) {
+            String line;
+            while ((line = r.readLine()) != null) {
+                Matcher m = destPattern.matcher(line);
+                if (m.find()) {
+                    ports.add(m.group(1));
+                }
+            }
+        }
+        p.waitFor(3, TimeUnit.SECONDS);
+        return ports;
+    }
+
+    private static void assertGreaterThan(String msg, int actual, int floor) {
+        if (actual <= floor) {
+            throw new AssertionError(msg + " (actual=" + actual + ", expected 
> " + floor + ")");
+        }
+    }
+
+    private static void assertLessThan(String msg, int actual, int ceiling) {
+        if (actual >= ceiling) {
+            throw new AssertionError(msg + " (actual=" + actual + ", expected 
< " + ceiling + ")");
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/clients/google/gcs/GCSConnectionLifecycleTest.java
 
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/clients/google/gcs/GCSConnectionLifecycleTest.java
new file mode 100644
index 0000000..e48c4ce
--- /dev/null
+++ 
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/clients/google/gcs/GCSConnectionLifecycleTest.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.google.gcs;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.asterix.cloud.clients.ICloudGuardian;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.annotations.AiProvenance;
+import org.apache.hyracks.util.annotations.AiProvenance.Agent;
+import org.apache.hyracks.util.annotations.AiProvenance.ContributionKind;
+import org.apache.hyracks.util.annotations.AiProvenance.Tool;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+
+/**
+ * Runtime test for MB-71767: verify that the {@code 
cloudRequestsHttpConnectionMaxIdleSeconds} and
+ * {@code cloudRequestsHttpConnectionMaxLifetimeSeconds} settings actually 
drive the underlying
+ * Apache HttpClient pool's eviction + TTL behaviour for the GCS path.
+ *
+ * Uses a JDK built-in {@link HttpServer} that returns a GCS-shaped 404 for 
every request. We then
+ * exercise {@link GCSCloudClient#exists(String, String)} which gracefully 
maps 404 to {@code
+ * false}, so the test path produces one HTTP request per call without 
throwing. That's enough to
+ * populate the Apache HttpClient connection pool with pooled sockets; {@code 
lsof} then lets us
+ * observe whether {@code evictIdleConnections} / {@code 
setConnectionTimeToLive} kill them on the
+ * configured schedule. Zero external dependencies — runs on any OS with 
{@code lsof} on PATH.
+ *
+ * Mirror of {@code S3ConnectionLifecycleTest} for the GCS code path.
+ */
+@AiProvenance(agent = Agent.CLAUDE_OPUS_4_7, tool = Tool.ANTHROPIC_CLI, 
contributionKind = ContributionKind.TEST_GENERATED, notes = "MB-71767: runtime 
verification of HTTP idle/lifetime knobs on the GCS path; generated via Claude 
Code")
+public class GCSConnectionLifecycleTest {
+
+    private static final String MOCK_SERVER_REGION = "us-west2";
+    private static final String PLAYGROUND_BUCKET = "lifecycle-playground-gcs";
+
+    private static final int CONCURRENCY = 8;
+
+    private static HttpServer mockServer;
+    private static int mockServerPort;
+    private static String mockServerHostname;
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        System.out.println("GCSConnectionLifecycleTest setup");
+        Assume.assumeTrue("lsof not on PATH; skipping socket observation 
tests", lsofAvailable());
+
+        // Bind to an ephemeral port on loopback. The handler always responds 
with a GCS-shaped
+        // 404 so GCSCloudClient.exists() returns false without throwing, 
which keeps the test
+        // focused on TCP socket lifecycle.
+        mockServer = HttpServer.create(new InetSocketAddress("127.0.0.1", 0), 
64);
+        mockServer.createContext("/", new GcsStubHandler());
+        mockServer.setExecutor(Executors.newFixedThreadPool(16));
+        mockServer.start();
+        mockServerPort = mockServer.getAddress().getPort();
+        mockServerHostname = "http://127.0.0.1:"; + mockServerPort;
+        System.out.println("stub server up at " + mockServerHostname);
+    }
+
+    @AfterClass
+    public static void tearDown() {
+        if (mockServer != null) {
+            mockServer.stop(0);
+        }
+    }
+
+    /**
+     * idle=2s, lifetime=large: after a burst of concurrent exists() goes 
quiet for >idle the
+     * Apache HttpClient eviction thread should close the pooled sockets. 
Sample every 2s for
+     * up to 20s as a safety net.
+     */
+    @Test
+    public void a_idleEvictionClosesQuietConnections() throws Exception {
+        int idleSec = 2;
+        int lifetimeSec = 600;
+        GCSCloudClient client = buildPatchedClient(idleSec, lifetimeSec);
+
+        runConcurrent(CONCURRENCY, threadIdx -> existsOnce(client, "burst-" + 
threadIdx));
+
+        int beforeIdle = countEstablishedToMock();
+        System.out.println("[idle test] after burst: established conns to mock 
= " + beforeIdle);
+        assertGreaterThan("pool should be populated after burst", beforeIdle, 
0);
+
+        int totalSleepSec = 20;
+        int afterIdle = beforeIdle;
+        for (int waited = 0; waited < totalSleepSec; waited += 2) {
+            Thread.sleep(2_000L);
+            afterIdle = countEstablishedToMock();
+            System.out.println("[idle test] waited " + (waited + 2) + "s: 
established conns to mock = " + afterIdle);
+            if (afterIdle < beforeIdle) {
+                break;
+            }
+        }
+        assertLessThan("idle eviction should have closed connections within " 
+ totalSleepSec + "s. before="
+                + beforeIdle + " after=" + afterIdle, afterIdle, beforeIdle);
+    }
+
+    /**
+     * idle=large, lifetime=3s: keep firing requests so idle never trips, but 
TTL must still retire
+     * the pooled connections. Verifies wall-clock-age semantics.
+     */
+    @Test
+    public void b_ttlRotatesConnectionsDespiteActivity() throws Exception {
+        int idleSec = 600;
+        int lifetimeSec = 3;
+        GCSCloudClient client = buildPatchedClient(idleSec, lifetimeSec);
+
+        runConcurrent(CONCURRENCY, threadIdx -> existsOnce(client, "ttl-init-" 
+ threadIdx));
+        Set<String> beforePorts = sourcePortsToMock();
+        System.out.println("[ttl test] after initial burst: source ports = " + 
beforePorts);
+        assertGreaterThan("pool should be populated after burst", 
beforePorts.size(), 0);
+
+        long deadline = System.currentTimeMillis() + (lifetimeSec + 4) * 1000L;
+        int iters = 0;
+        while (System.currentTimeMillis() < deadline) {
+            final int iterSnapshot = iters;
+            runConcurrent(CONCURRENCY, threadIdx -> existsOnce(client, 
"ttl-busy-" + iterSnapshot + "-" + threadIdx));
+            Thread.sleep(500);
+            iters++;
+        }
+
+        runConcurrent(CONCURRENCY, threadIdx -> existsOnce(client, 
"ttl-final-" + threadIdx));
+        Set<String> afterPorts = sourcePortsToMock();
+        System.out.println("[ttl test] after " + (lifetimeSec + 4) + "s of 
activity: source ports = " + afterPorts);
+
+        Set<String> stillPresent = new HashSet<>(beforePorts);
+        stillPresent.retainAll(afterPorts);
+        System.out.println("[ttl test] source ports surviving TTL: " + 
stillPresent);
+        assertLessThan("TTL should have retired all initial connections. 
before=" + beforePorts + " after=" + afterPorts
+                + " survivors=" + stillPresent, stillPresent.size(), 
beforePorts.size());
+    }
+
+    // --------------------------- helpers ---------------------------
+
+    /**
+     * Builds a {@link GCSCloudClient} via the existing public constructor 
(which defaults idle and
+     * lifetime to 0), then reflectively patches the two private fields on the 
{@link GCSClientConfig}
+     * to the desired values. The cloud client's constructor invokes {@code 
buildClient(config)}
+     * after we patch, so the patched values reach the {@code 
ApacheHttpTransport} that backs the
+     * google-cloud-storage transport.
+     */
+    private static GCSCloudClient buildPatchedClient(int idleSec, int 
lifetimeSec) throws Exception {
+        int writeBufferSize = StorageUtil.getIntSizeInBytes(5, 
StorageUtil.StorageUnit.MEGABYTE);
+        GCSClientConfig config =
+                new GCSClientConfig(MOCK_SERVER_REGION, mockServerHostname, 
true, 0, writeBufferSize, "");
+
+        setIntField(config, "maxIdleSeconds", idleSec);
+        setIntField(config, "maxLifetimeSeconds", lifetimeSec);
+
+        return new GCSCloudClient(config, 
ICloudGuardian.NoOpCloudGuardian.INSTANCE);
+    }
+
+    private static void setIntField(Object target, String name, int value) 
throws Exception {
+        Field f = target.getClass().getDeclaredField(name);
+        f.setAccessible(true);
+        f.setInt(target, value);
+    }
+
+    /**
+     * Drives one cheap GCS API call through the cloud client. Our stub 
returns 404 for every
+     * path; {@link GCSCloudClient#exists} maps that to {@code false} without 
throwing, so the
+     * only observable side effect is one HTTP request being issued through 
the pooled HTTP
+     * client.
+     */
+    private static void existsOnce(GCSCloudClient client, String key) {
+        try {
+            client.exists(PLAYGROUND_BUCKET, "objects/" + key);
+        } catch (Exception e) {
+            // We don't care about response correctness — only that a TCP 
connection was made.
+        }
+    }
+
+    @FunctionalInterface
+    private interface IntTask {
+        void run(int threadIdx) throws Exception;
+    }
+
+    private static void runConcurrent(int n, IntTask task) throws Exception {
+        ExecutorService pool = Executors.newFixedThreadPool(n);
+        try {
+            CountDownLatch start = new CountDownLatch(1);
+            CountDownLatch done = new CountDownLatch(n);
+            AtomicInteger errors = new AtomicInteger();
+            for (int i = 0; i < n; i++) {
+                final int idx = i;
+                pool.submit(() -> {
+                    try {
+                        start.await();
+                        task.run(idx);
+                    } catch (Exception e) {
+                        errors.incrementAndGet();
+                        System.err.println("concurrent task " + idx + " 
failed: " + e);
+                        e.printStackTrace(System.err);
+                    } finally {
+                        done.countDown();
+                    }
+                });
+            }
+            start.countDown();
+            if (!done.await(30, TimeUnit.SECONDS)) {
+                throw new RuntimeException("concurrent tasks timed out");
+            }
+            // The test only cares about TCP socket lifecycle, not response 
correctness — don't
+            // fail the test just because the stub gave a 404 the SDK didn't 
love.
+        } finally {
+            pool.shutdownNow();
+        }
+    }
+
+    private static boolean lsofAvailable() {
+        try {
+            Process p = new ProcessBuilder("lsof", 
"-v").redirectErrorStream(true).start();
+            return p.waitFor(2, TimeUnit.SECONDS) && p.exitValue() == 0;
+        } catch (Exception e) {
+            return false;
+        }
+    }
+
+    private static int countEstablishedToMock() throws Exception {
+        return sourcePortsToMock().size();
+    }
+
+    /**
+     * Returns the set of local source ports for ESTABLISHED TCP sockets in 
this JVM that have the
+     * stub server port as their destination. Each pooled connection 
corresponds to one source
+     * port; a rotation of connections shows up as a new set of ports.
+     */
+    private static Set<String> sourcePortsToMock() throws Exception {
+        long pid = ProcessHandle.current().pid();
+        Process p = new ProcessBuilder("lsof", "-nP", "-iTCP", "-p", 
String.valueOf(pid), "-sTCP:ESTABLISHED")
+                .redirectErrorStream(true).start();
+        Set<String> ports = new HashSet<>();
+        Pattern destPattern = 
Pattern.compile(":(\\d+)->(?:127\\.0\\.0\\.1|\\[?::1]?):" + mockServerPort);
+        try (BufferedReader r = new BufferedReader(new 
InputStreamReader(p.getInputStream()))) {
+            String line;
+            while ((line = r.readLine()) != null) {
+                Matcher m = destPattern.matcher(line);
+                if (m.find()) {
+                    ports.add(m.group(1));
+                }
+            }
+        }
+        p.waitFor(3, TimeUnit.SECONDS);
+        return ports;
+    }
+
+    private static void assertGreaterThan(String msg, int actual, int floor) {
+        if (actual <= floor) {
+            throw new AssertionError(msg + " (actual=" + actual + ", expected 
> " + floor + ")");
+        }
+    }
+
+    private static void assertLessThan(String msg, int actual, int ceiling) {
+        if (actual >= ceiling) {
+            throw new AssertionError(msg + " (actual=" + actual + ", expected 
< " + ceiling + ")");
+        }
+    }
+
+    /**
+     * Minimal HTTP handler that returns a GCS-shaped 404 response for every 
path. The body is the
+     * standard GCS error envelope so the SDK classifies the error as code=404 
and short-circuits
+     * to a clean "not found" outcome (no throw, no retry storm).
+     */
+    private static final class GcsStubHandler implements HttpHandler {
+        private static final byte[] BODY = 
("{\"error\":{\"code\":404,\"message\":\"Not Found\","
+                + "\"errors\":[{\"reason\":\"notFound\",\"message\":\"Not 
Found\"}]}}").getBytes();
+
+        @Override
+        public void handle(HttpExchange exchange) throws IOException {
+            try {
+                exchange.getResponseHeaders().add("Content-Type", 
"application/json; charset=UTF-8");
+                exchange.sendResponseHeaders(404, BODY.length);
+                try (OutputStream os = exchange.getResponseBody()) {
+                    os.write(BODY);
+                }
+            } finally {
+                exchange.close();
+            }
+        }
+    }
+}

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21235?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: merged
Gerrit-Project: asterixdb
Gerrit-Branch: lumina
Gerrit-Change-Id: I52bfe3e855b654b48f4294a9cd52e59406555aeb
Gerrit-Change-Number: 21235
Gerrit-PatchSet: 11
Gerrit-Owner: Rithwik Koul <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Murtadha Hubail <[email protected]>

Reply via email to