This is an automated email from the ASF dual-hosted git repository. daim pushed a commit to branch OAK-12009 in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
commit b0ee5d96432c5336b639ce0a6f149f33b4a6b03b Author: rishabhdaim <[email protected]> AuthorDate: Fri Nov 21 13:29:13 2025 +0530 OAK-12009 : provided support for GCP for new AWS sdk 2.x --- .../jackrabbit/oak/blob/cloud/s3/S3Backend.java | 19 +-- .../apache/jackrabbit/oak/blob/cloud/s3/Utils.java | 129 ++++++++++++++++++--- .../jackrabbit/oak/blob/cloud/s3/UtilsTest.java | 17 ++- 3 files changed, 133 insertions(+), 32 deletions(-) diff --git a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java index 3ff34375de..8fdeb8e072 100644 --- a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java +++ b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java @@ -362,7 +362,7 @@ public class S3Backend extends AbstractSharedBackend { uploadReq.source(file). putObjectRequest( s3ReqDecorator.decorate( - PutObjectRequest.builder().bucket(bucket).key(key) + PutObjectRequest.builder().bucket(bucket).key(key).contentLength(file.length()) .build())) .build()); @@ -514,11 +514,13 @@ public class S3Backend extends AbstractSharedBackend { try { Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); // Specify `null` for the content length when you don't know the content length. - final AsyncRequestBody body = AsyncRequestBody.fromInputStream(input, null, executor); + byte[] bytes = input.readAllBytes(); + InputStream is = new ByteArrayInputStream(bytes); + final AsyncRequestBody body = AsyncRequestBody.fromInputStream(is, (long) bytes.length, executor); final Upload upload = tmx.upload(uploadReq -> uploadReq.requestBody(body). putObjectRequest( - s3ReqDecorator.decorate(PutObjectRequest.builder().bucket(bucket).key(addMetaKeyPrefix(name)).build())) + s3ReqDecorator.decorate(PutObjectRequest.builder().bucket(bucket).contentType("application/octet-stream").contentLength((long) bytes.length).key(addMetaKeyPrefix(name)).build())) .build()); upload.completionFuture().join(); } catch (Exception e) { @@ -528,15 +530,6 @@ public class S3Backend extends AbstractSharedBackend { if (contextClassLoader != null) { Thread.currentThread().setContextClassLoader(contextClassLoader); } - executor.shutdown(); - try { - if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { - executor.shutdownNow(); - } - } catch (InterruptedException e) { - executor.shutdownNow(); - Thread.currentThread().interrupt(); - } } } @@ -553,7 +546,7 @@ public class S3Backend extends AbstractSharedBackend { uploadReq.source(input). putObjectRequest( s3ReqDecorator.decorate( - PutObjectRequest.builder().bucket(bucket).key(addMetaKeyPrefix(name)).build())) + PutObjectRequest.builder().bucket(bucket).contentLength(input.length()).key(addMetaKeyPrefix(name)).build())) .build()); upload.completionFuture().join(); diff --git a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/Utils.java b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/Utils.java index d193689684..16eb451afa 100644 --- a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/Utils.java +++ b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/Utils.java @@ -31,6 +31,7 @@ import java.util.regex.Pattern; import org.apache.jackrabbit.oak.blob.cloud.s3.S3Backend.RemoteStorageMode; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +40,8 @@ import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.auth.signer.AwsS3V4Signer; +import software.amazon.awssdk.core.checksums.RequestChecksumCalculation; +import software.amazon.awssdk.core.checksums.ResponseChecksumValidation; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption; import software.amazon.awssdk.core.exception.SdkClientException; @@ -47,6 +50,7 @@ import software.amazon.awssdk.http.apache.ApacheHttpClient; import software.amazon.awssdk.http.apache.ProxyConfiguration; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.http.nio.netty.SdkEventLoopGroup; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain; import software.amazon.awssdk.services.s3.S3AsyncClient; @@ -54,6 +58,7 @@ import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; import software.amazon.awssdk.services.s3.S3BaseClientBuilder; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3ClientBuilder; +import software.amazon.awssdk.services.s3.S3Configuration; import software.amazon.awssdk.services.s3.presigner.S3Presigner; import software.amazon.awssdk.utils.BinaryUtils; import software.amazon.awssdk.utils.Md5Utils; @@ -129,11 +134,21 @@ public final class Utils { */ public static S3AsyncClient openAsyncService(final Properties prop) { S3AsyncClientBuilder builder = S3AsyncClient.builder(); + boolean isS3 = Objects.equals(RemoteStorageMode.S3, prop.get(S3Constants.MODE)); configureBuilder(builder, prop, false); // async http client builder.httpClient(getSdkAsyncHttpClient(prop)); - builder.multipartEnabled(true); + + // AWS-specific optimizations + if (isS3) { + builder.multipartEnabled(true); + builder.multipartConfiguration(c -> c + .minimumPartSizeInBytes(5L * 1024 * 1024) // 5MB minimum + .thresholdInBytes(10L * 1024 * 1024)); // 10MB threshold + } else { + builder.multipartEnabled(false); // GCP doesn't support S3 multipart + } return builder.build(); } @@ -150,9 +165,14 @@ public final class Utils { * @return a configured {@link S3Presigner} instance */ public static S3Presigner createPresigner(final S3Client s3Client, final Properties props) { - return S3Presigner.builder().s3Client(s3Client). - credentialsProvider(Utils.getAwsCredentials(props)) + final boolean isGCP = Objects.equals(RemoteStorageMode.GCP, props.get(S3Constants.MODE)); + return S3Presigner.builder().s3Client(s3Client) + .credentialsProvider(Utils.getAwsCredentials(props)) .region(Region.of(Utils.getRegion(props))) + .serviceConfiguration(S3Configuration.builder() + .pathStyleAccessEnabled(isGCP) + .chunkedEncodingEnabled(!isGCP) + .build()) .build(); } @@ -256,7 +276,7 @@ public final class Utils { String region = null; if (Objects.nonNull(prop.getProperty(S3Constants.S3_END_POINT))) { - region = getRegionFromEndpoint(prop.getProperty(S3Constants.S3_END_POINT)); + region = getRegionFromEndpoint(prop.getProperty(S3Constants.S3_END_POINT), prop.getProperty(S3Constants.S3_CONN_PROTOCOL)); } if (Objects.nonNull(region)) { @@ -282,6 +302,7 @@ public final class Utils { * <ul> * <li>https://s3.eu-west-1.amazonaws.com</li> * <li>https://bucket.s3.eu-west-1.amazonaws.com</li> + * <li>s3.eu-west-1.amazonaws.com</li> * <li>https://s3.amazonaws.com (returns us-east-1)</li> * </ul> * If the region cannot be determined, returns null. @@ -289,9 +310,9 @@ public final class Utils { * @param endpoint the S3 endpoint URL as a string * @return the AWS region string, or null if not found */ - static String getRegionFromEndpoint(final String endpoint) { + static String getRegionFromEndpoint(final String endpoint, String protocol) { try { - URI uri = URI.create(endpoint); + URI uri = getEndPointUri(endpoint, protocol); String host = uri.getHost(); // Pattern for standard S3 endpoints: s3.region.amazonaws.com or bucket.s3.region.amazonaws.com @@ -352,12 +373,27 @@ public final class Utils { */ // Check if endpoint already contains protocol + return getEndPointUri(endPoint, prop.getProperty(S3Constants.S3_CONN_PROTOCOL)); + } + + /** + * Constructs a URI for the S3 endpoint using the provided endpoint string and protocol. + * <p> + * If the endpoint string already contains a protocol (`http://` or `https://`), it is used directly. + * Otherwise, the specified protocol (or "https" if null/empty) is prepended to the endpoint. + * </p> + * + * @param endPoint the S3 endpoint string (may or may not include protocol) + * @param protocol the protocol to use ("http" or "https"); defaults to "https" if null or empty + * @return the constructed {@link URI} for the S3 endpoint + */ + @NotNull + private static URI getEndPointUri(final String endPoint, @Nullable String protocol) { if (endPoint.startsWith("http://") || endPoint.startsWith("https://")) { LOG.info("S3 service endpoint [{}] ", endPoint); return URI.create(endPoint); } - String protocol = prop.getProperty(S3Constants.S3_CONN_PROTOCOL); if (protocol == null || protocol.isEmpty()) { protocol = HTTPS; // default protocol } @@ -372,16 +408,24 @@ public final class Utils { } private static ClientOverrideConfiguration getClientConfiguration(Properties prop) { + final boolean isS3 = Objects.equals(RemoteStorageMode.S3, prop.get(S3Constants.MODE)); + int maxErrorRetry = Integer.parseInt(prop.getProperty(S3Constants.S3_MAX_ERR_RETRY)); int connectionTimeOut = Integer.parseInt(prop.getProperty(S3Constants.S3_CONN_TIMEOUT)); String encryptionType = prop.getProperty(S3Constants.S3_ENCRYPTION); + // API timeout should be much longer than connection timeout for large file uploads + // Use at least 5 minutes, or 10x connection timeout, whichever is larger + int apiTimeout = Math.max(connectionTimeOut * 10, 300000); // At least 5 minutes + ClientOverrideConfiguration.Builder builder = ClientOverrideConfiguration.builder(); builder.retryStrategy(b -> b.maxAttempts(maxErrorRetry)); - builder.apiCallTimeout(Duration.ofMillis(connectionTimeOut)); + builder.apiCallTimeout(Duration.ofMillis(apiTimeout)); // Long timeout for large uploads + builder.apiCallAttemptTimeout(Duration.ofMillis(connectionTimeOut)); // Per-attempt timeout - if (S3Constants.S3_ENCRYPTION_SSE_KMS.equals(encryptionType)) { + // Only use KMS signer for AWS S3, not for GCP + if (isS3 && S3Constants.S3_ENCRYPTION_SSE_KMS.equals(encryptionType)) { builder.putAdvancedOption(SdkAdvancedClientOption.SIGNER, AwsS3V4Signer.create()); } return builder.build(); @@ -390,10 +434,29 @@ public final class Utils { private static SdkHttpClient getSdkHttpClient(Properties prop) { HttpClientConfig config = new HttpClientConfig(prop); final ApacheHttpClient.Builder builder = ApacheHttpClient.builder(); + final boolean isGCP = Objects.equals(RemoteStorageMode.GCP, prop.get(S3Constants.MODE)); - builder.connectionTimeout(Duration.ofMillis(config.connectionTimeout)) + // Calculate connection lifecycle based on socket timeout (all in SECONDS) + long socketTimeoutSeconds = config.socketTimeout / 1000; + + // Idle time: 2x socket timeout (min 30s, max 120s) + long idleTimeSeconds = Math.min(120, Math.max(30, socketTimeoutSeconds * 2)); + + // TTL: 5x socket timeout (min 60s, max 600s = 10min) + long ttlSeconds = Math.min(600, Math.max(60, socketTimeoutSeconds * 5)); + + // GCP needs higher max connections (no HTTP/2) + int maxConnections = isGCP ? Math.max(100, config.maxConnections) : Math.max(50, config.maxConnections); + + builder.connectionTimeout(Duration.ofMillis(config.connectionTimeout)) // Connection timeouts .socketTimeout(Duration.ofMillis(config.socketTimeout)) - .maxConnections(config.maxConnections); + .maxConnections(maxConnections) // Connection pool + .connectionMaxIdleTime(Duration.ofSeconds(idleTimeSeconds)) + .connectionTimeToLive(Duration.ofSeconds(ttlSeconds)) + .useIdleConnectionReaper(true) + .connectionAcquisitionTimeout(Duration.ofSeconds(10)) // Connection acquisition timeout + .tcpKeepAlive(true) // TCP keepalive + .expectContinueEnabled(true); // Expect-continue handshake (reduces overhead for large uploads) if (config.proxyHost != null && !config.proxyHost.isEmpty() && config.proxyPort != null && !config.proxyPort.isEmpty()) { String protocol = "http".equalsIgnoreCase(config.protocol) ? "http" : config.protocol; @@ -410,12 +473,39 @@ public final class Utils { private static SdkAsyncHttpClient getSdkAsyncHttpClient(Properties prop) { HttpClientConfig config = new HttpClientConfig(prop); + final boolean isGCP = Objects.equals(RemoteStorageMode.GCP, prop.get(S3Constants.MODE)); final NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder(); - builder.connectionTimeout(Duration.ofMillis(config.connectionTimeout)) + // Calculate connection lifecycle based on socket timeout (all in SECONDS) + long socketTimeoutSeconds = config.socketTimeout / 1000; + + // Idle time: 2x socket timeout (min 30s, max 120s) + long idleTimeSeconds = Math.min(120, Math.max(30, socketTimeoutSeconds * 2)); + + // TTL: 5x socket timeout (min 60s, max 600s = 10min) + long ttlSeconds = Math.min(600, Math.max(60, socketTimeoutSeconds * 5)); + + // GCP needs higher concurrency (no HTTP/2, so more connections needed) + final int concurrency = isGCP ? Math.max(100, config.maxConnections) : Math.max(50, config.maxConnections); + + // More threads for GCP + final int threads = isGCP ? Math.max(16, Runtime.getRuntime().availableProcessors() * 2) + : Math.max(4, Runtime.getRuntime().availableProcessors()); + + + builder.connectionTimeout(Duration.ofMillis(config.connectionTimeout)) // Connection timeouts .readTimeout(Duration.ofMillis(config.socketTimeout)) .writeTimeout(Duration.ofMillis(config.socketTimeout)) - .maxConcurrency(config.maxConnections); + .maxConcurrency(concurrency) // Connection pool - increased for better concurrency + .connectionMaxIdleTime(Duration.ofSeconds(idleTimeSeconds)) + .connectionTimeToLive(Duration.ofSeconds(ttlSeconds)) + .useIdleConnectionReaper(true) + .connectionAcquisitionTimeout(Duration.ofSeconds(10)) // Don't wait too long for a connection from pool + .tcpKeepAlive(true) // TCP optimizations + .eventLoopGroup( + SdkEventLoopGroup.builder() + .numberOfThreads(threads) // Thread pool for Netty + .build()); if (config.proxyHost != null && !config.proxyHost.isEmpty() && config.proxyPort != null && !config.proxyPort.isEmpty()) { String protocol = HTTPS.equalsIgnoreCase(config.protocol) ? HTTPS : config.protocol; @@ -473,6 +563,8 @@ public final class Utils { } private static void configureBuilder(final S3BaseClientBuilder builder, final Properties prop, final boolean accReq) { + final boolean isGCP = Objects.equals(RemoteStorageMode.GCP, prop.get(S3Constants.MODE)); + builder.credentialsProvider(getAwsCredentials(prop)); builder.overrideConfiguration(getClientConfiguration(prop)); @@ -482,6 +574,17 @@ public final class Utils { builder.endpointOverride(getEndPointUri(prop, accReq, region)); builder.crossRegionAccessEnabled(Boolean.parseBoolean(prop.getProperty(S3Constants.S3_CROSS_REGION_ACCESS))); + + // Disable checksums (replaces deprecated checksumValidationEnabled) + builder.requestChecksumCalculation(RequestChecksumCalculation.WHEN_REQUIRED); + builder.responseChecksumValidation(ResponseChecksumValidation.WHEN_REQUIRED); + + builder.serviceConfiguration( + S3Configuration.builder() + .pathStyleAccessEnabled(isGCP) // enable for GCP + .chunkedEncodingEnabled(!isGCP) // Disable for GCP + .useArnRegionEnabled(!isGCP) // Disable for GCP + .build()); } // Helper class to hold common Http config diff --git a/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/UtilsTest.java b/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/UtilsTest.java index e47fe2af52..580c111e3c 100644 --- a/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/UtilsTest.java +++ b/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/UtilsTest.java @@ -111,32 +111,37 @@ public class UtilsTest { @Test public void testGetRegionFromStandardEndpoint() { - Assert.assertEquals("eu-west-1", Utils.getRegionFromEndpoint("https://s3.eu-west-1.amazonaws.com")); + Assert.assertEquals("eu-west-1", Utils.getRegionFromEndpoint("https://s3.eu-west-1.amazonaws.com", null)); } @Test public void testGetRegionFromVirtualHostedEndpoint() { - Assert.assertEquals("ap-south-1", Utils.getRegionFromEndpoint("https://bucket.s3.ap-south-1.amazonaws.com")); + Assert.assertEquals("ap-south-1", Utils.getRegionFromEndpoint("https://bucket.s3.ap-south-1.amazonaws.com", null)); } @Test public void testGetRegionFromUsEast1Endpoint() { - Assert.assertEquals("us-east-1", Utils.getRegionFromEndpoint("https://s3.amazonaws.com")); + Assert.assertEquals("us-east-1", Utils.getRegionFromEndpoint("https://s3.amazonaws.com", null)); } @Test public void testGetRegionFromVirtualHostedUsEast1() { - Assert.assertEquals("us-east-1", Utils.getRegionFromEndpoint("https://bucket.s3.amazonaws.com")); + Assert.assertEquals("us-east-1", Utils.getRegionFromEndpoint("https://bucket.s3.amazonaws.com", null)); } @Test public void testGetRegionFromInvalidEndpoint() { - Assert.assertNull(Utils.getRegionFromEndpoint("https://example.com")); + Assert.assertNull(Utils.getRegionFromEndpoint("https://example.com", null)); } @Test public void testGetRegionFromMalformedEndpoint() { - Assert.assertNull(Utils.getRegionFromEndpoint("not-a-valid-uri")); + Assert.assertNull(Utils.getRegionFromEndpoint("not-a-valid-uri", "https")); + } + + @Test + public void testGetRegionFromEndpointWithoutProtocol() { + Assert.assertEquals("us-east-1", Utils.getRegionFromEndpoint("s3.us-east-1.amazonaws.com", "https")); } @Test
