This is an automated email from the ASF dual-hosted git repository.

gaborgsomogyi pushed a commit to branch release-2.3
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.3 by this push:
     new 2cf4c361f5f [FLINK-39530][s3] Replace S3 compatible auto-detection 
with explicit chunked-encoding and checksum-validation config options
2cf4c361f5f is described below

commit 2cf4c361f5fb230ada46b1706eb38f58b5d4c399
Author: Gabor Somogyi <[email protected]>
AuthorDate: Fri Apr 24 07:07:48 2026 +0200

    [FLINK-39530][s3] Replace S3 compatible auto-detection with explicit 
chunked-encoding and checksum-validation config options
---
 .../fs/s3native/NativeS3FileSystemFactory.java     |  28 +-
 .../apache/flink/fs/s3native/S3ClientProvider.java |  87 ++-
 .../fs/s3native/NativeS3FileSystemFactoryTest.java | 686 ++++++++-------------
 3 files changed, 330 insertions(+), 471 deletions(-)

diff --git 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
index 81d92dc762b..e8cc953fc1e 100644
--- 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
+++ 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
@@ -88,7 +88,27 @@ public class NativeS3FileSystemFactory implements 
FileSystemFactory {
                     .booleanType()
                     .defaultValue(false)
                     .withFallbackKeys("s3.path.style.access")
-                    .withDescription("Use path-style access for S3 (for 
S3-compatible storage)");
+                    .withDescription(
+                            "Use path-style access for S3. Required for most 
S3-compatible servers "
+                                    + "(e.g. MinIO, Ceph) which do not support 
virtual-hosted style.");
+
+    public static final ConfigOption<Boolean> CHUNKED_ENCODING_ENABLED =
+            ConfigOptions.key("s3.chunked-encoding.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Enable AWS chunked transfer encoding for S3 
requests. "
+                                    + "Most S3-compatible servers do not 
support this AWS-specific "
+                                    + "extension and require it to be 
disabled.");
+
+    public static final ConfigOption<Boolean> CHECKSUM_VALIDATION_ENABLED =
+            ConfigOptions.key("s3.checksum-validation.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Enable checksum validation for S3 requests. "
+                                    + "Most S3-compatible servers do not 
support AWS checksum "
+                                    + "headers and require it to be 
disabled.");
 
     public static final ConfigOption<Long> PART_UPLOAD_MIN_SIZE =
             ConfigOptions.key("s3.upload.min.part.size")
@@ -295,10 +315,6 @@ public class NativeS3FileSystemFactory implements 
FileSystemFactory {
         String endpoint = config.get(ENDPOINT);
         boolean pathStyleAccess = config.get(PATH_STYLE_ACCESS);
 
-        if (endpoint != null && !pathStyleAccess) {
-            pathStyleAccess = true;
-        }
-
         S3EncryptionConfig encryptionConfig =
                 S3EncryptionConfig.fromConfig(config.get(SSE_TYPE), 
config.get(SSE_KMS_KEY_ID));
         String entropyInjectionKey = config.get(ENTROPY_INJECT_KEY_OPTION);
@@ -371,6 +387,8 @@ public class NativeS3FileSystemFactory implements 
FileSystemFactory {
                         .region(region)
                         .endpoint(endpoint)
                         .pathStyleAccess(pathStyleAccess)
+                        .chunkedEncoding(config.get(CHUNKED_ENCODING_ENABLED))
+                        
.checksumValidation(config.get(CHECKSUM_VALIDATION_ENABLED))
                         .maxConnections(maxConnections)
                         .connectionTimeout(config.get(CONNECTION_TIMEOUT))
                         .socketTimeout(config.get(SOCKET_TIMEOUT))
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
index daccd6247cd..656a3f567da 100644
--- 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
+++ 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
@@ -80,6 +80,11 @@ class S3ClientProvider implements AutoCloseableAsync {
     private final Duration connectionTimeout;
     private final Duration socketTimeout;
     private final Duration connectionMaxIdleTime;
+    private final boolean pathStyleAccess;
+    private final boolean chunkedEncoding;
+    private final boolean checksumValidation;
+    private final int maxConnections;
+    private final int maxRetries;
     private final AtomicBoolean closed = new AtomicBoolean(false);
 
     private S3ClientProvider(
@@ -91,7 +96,12 @@ class S3ClientProvider implements AutoCloseableAsync {
             Duration clientCloseTimeout,
             Duration connectionTimeout,
             Duration socketTimeout,
-            Duration connectionMaxIdleTime) {
+            Duration connectionMaxIdleTime,
+            boolean pathStyleAccess,
+            boolean chunkedEncoding,
+            boolean checksumValidation,
+            int maxConnections,
+            int maxRetries) {
         this.s3Client = s3Client;
         this.transferManager = transferManager;
         this.encryptionConfig =
@@ -102,6 +112,11 @@ class S3ClientProvider implements AutoCloseableAsync {
         this.connectionTimeout = connectionTimeout;
         this.socketTimeout = socketTimeout;
         this.connectionMaxIdleTime = connectionMaxIdleTime;
+        this.pathStyleAccess = pathStyleAccess;
+        this.chunkedEncoding = chunkedEncoding;
+        this.checksumValidation = checksumValidation;
+        this.maxConnections = maxConnections;
+        this.maxRetries = maxRetries;
     }
 
     public S3Client getS3Client() {
@@ -144,6 +159,31 @@ class S3ClientProvider implements AutoCloseableAsync {
         return connectionMaxIdleTime;
     }
 
+    @VisibleForTesting
+    boolean isPathStyleAccess() {
+        return pathStyleAccess;
+    }
+
+    @VisibleForTesting
+    boolean isChunkedEncoding() {
+        return chunkedEncoding;
+    }
+
+    @VisibleForTesting
+    boolean isChecksumValidation() {
+        return checksumValidation;
+    }
+
+    @VisibleForTesting
+    int getMaxConnections() {
+        return maxConnections;
+    }
+
+    @VisibleForTesting
+    int getMaxRetries() {
+        return maxRetries;
+    }
+
     @Override
     public CompletableFuture<Void> closeAsync() {
         if (!closed.compareAndSet(false, true)) {
@@ -204,11 +244,12 @@ class S3ClientProvider implements AutoCloseableAsync {
         private String region;
         private String endpoint;
         private boolean pathStyleAccess = false;
+        private boolean chunkedEncoding = true;
+        private boolean checksumValidation = true;
         private int maxConnections = 50;
         private Duration connectionTimeout = Duration.ofSeconds(60);
         private Duration socketTimeout = Duration.ofSeconds(60);
         private Duration connectionMaxIdleTime = Duration.ofSeconds(60);
-        private boolean disableCertCheck = false;
         private int maxRetries = 3;
         private Duration clientCloseTimeout = Duration.ofSeconds(30);
 
@@ -249,6 +290,16 @@ class S3ClientProvider implements AutoCloseableAsync {
             return this;
         }
 
+        public Builder chunkedEncoding(boolean chunkedEncoding) {
+            this.chunkedEncoding = chunkedEncoding;
+            return this;
+        }
+
+        public Builder checksumValidation(boolean checksumValidation) {
+            this.checksumValidation = checksumValidation;
+            return this;
+        }
+
         public Builder maxConnections(int maxConnections) {
             this.maxConnections = maxConnections;
             return this;
@@ -274,11 +325,6 @@ class S3ClientProvider implements AutoCloseableAsync {
             return this;
         }
 
-        public Builder disableCertCheck(boolean disableCertCheck) {
-            this.disableCertCheck = disableCertCheck;
-            return this;
-        }
-
         public Builder maxRetries(int maxRetries) {
             this.maxRetries = maxRetries;
             return this;
@@ -326,14 +372,6 @@ class S3ClientProvider implements AutoCloseableAsync {
             }
 
             URI endpointUri = (endpoint != null) ? URI.create(endpoint) : null;
-            boolean isS3Compatible = endpointUri != null;
-
-            if (isS3Compatible && !pathStyleAccess) {
-                pathStyleAccess = true;
-            }
-            if (isS3Compatible && 
"http".equalsIgnoreCase(endpointUri.getScheme())) {
-                disableCertCheck = true;
-            }
 
             Region awsRegion = resolveRegion(region);
             StsClient stsClient = null;
@@ -347,12 +385,12 @@ class S3ClientProvider implements AutoCloseableAsync {
                 credentialsProvider = baseProvider;
             }
 
-            S3Configuration.Builder s3ConfigBuilder =
-                    
S3Configuration.builder().pathStyleAccessEnabled(pathStyleAccess);
-            if (isS3Compatible) {
-                
s3ConfigBuilder.chunkedEncodingEnabled(false).checksumValidationEnabled(false);
-            }
-            S3Configuration s3Config = s3ConfigBuilder.build();
+            S3Configuration s3Config =
+                    S3Configuration.builder()
+                            .pathStyleAccessEnabled(pathStyleAccess)
+                            .chunkedEncodingEnabled(chunkedEncoding)
+                            .checksumValidationEnabled(checksumValidation)
+                            .build();
 
             ClientOverrideConfiguration overrideConfig =
                     ClientOverrideConfiguration.builder()
@@ -406,7 +444,12 @@ class S3ClientProvider implements AutoCloseableAsync {
                     clientCloseTimeout,
                     connectionTimeout,
                     socketTimeout,
-                    connectionMaxIdleTime);
+                    connectionMaxIdleTime,
+                    pathStyleAccess,
+                    chunkedEncoding,
+                    checksumValidation,
+                    maxConnections,
+                    maxRetries);
         }
 
         private AwsCredentialsProvider buildBaseCredentialsProvider() {
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
 
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
index f9e57b55858..21d86301233 100644
--- 
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
+++ 
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.fs.s3native;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.core.fs.FileSystem;
 
 import org.junit.jupiter.api.Test;
 
@@ -33,565 +32,364 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 /** Tests for {@link NativeS3FileSystemFactory}. */
 class NativeS3FileSystemFactoryTest {
 
-    @Test
-    void testSchemeReturnsS3() {
-        NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
-        assertThat(factory.getScheme()).isEqualTo("s3");
-    }
-
-    @Test
-    void testConfigureAcceptsConfiguration() {
-        NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
-        Configuration config = new Configuration();
-        config.setString("s3.access-key", "test-key");
-        config.setString("s3.secret-key", "test-secret");
-
-        // Should not throw
-        factory.configure(config);
-    }
-
-    @Test
-    void testCreateFileSystemWithMinimalConfiguration() throws Exception {
-        NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
+    private static Configuration baseConfig() {
         Configuration config = new Configuration();
         config.setString("s3.access-key", "test-access-key");
         config.setString("s3.secret-key", "test-secret-key");
         config.setString("s3.region", "us-east-1");
         config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
-        factory.configure(config);
-
-        URI fsUri = new URI("s3://test-bucket/");
-        FileSystem fs = factory.create(fsUri);
-
-        assertThat(fs).isNotNull();
-        assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
-        assertThat(fs.getUri()).isEqualTo(fsUri);
+        return config;
     }
 
-    @Test
-    void testCreateFileSystemWithCustomEndpoint() throws Exception {
+    private static NativeS3FileSystem createFs(Configuration config) throws 
Exception {
         NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
-        Configuration config = new Configuration();
-        config.setString("s3.access-key", "test-access-key");
-        config.setString("s3.secret-key", "test-secret-key");
-        config.setString("s3.endpoint", "http://localhost:9000";);
-        config.setString("s3.region", "us-east-1");
-        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
         factory.configure(config);
-
-        URI fsUri = new URI("s3://test-bucket/");
-        FileSystem fs = factory.create(fsUri);
-
-        assertThat(fs).isNotNull();
-        assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
+        return (NativeS3FileSystem) 
factory.create(URI.create("s3://test-bucket/"));
     }
 
-    @Test
-    void testPartSizeTooSmallThrowsException() {
-        NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
-        Configuration config = new Configuration();
-        config.setString("s3.access-key", "test-access-key");
-        config.setString("s3.secret-key", "test-secret-key");
-        config.setString("s3.region", "us-east-1");
-        config.set(NativeS3FileSystemFactory.PART_UPLOAD_MIN_SIZE, 1024L); // 
Too small (< 5MB)
-        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
+    private static NativeS3FileSystem createS3aFs(Configuration config) throws 
Exception {
+        NativeS3AFileSystemFactory factory = new NativeS3AFileSystemFactory();
         factory.configure(config);
-
-        URI fsUri = URI.create("s3://test-bucket/");
-        assertThatThrownBy(() -> factory.create(fsUri))
-                .isInstanceOf(IllegalArgumentException.class)
-                .hasMessageContaining("must be at least");
+        return (NativeS3FileSystem) 
factory.create(URI.create("s3a://test-bucket/"));
     }
 
     @Test
-    void testPartSizeTooLargeThrowsException() {
-        NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
-        Configuration config = new Configuration();
-        config.setString("s3.access-key", "test-access-key");
-        config.setString("s3.secret-key", "test-secret-key");
-        config.setString("s3.region", "us-east-1");
-        config.set(
-                NativeS3FileSystemFactory.PART_UPLOAD_MIN_SIZE, 6L * 1024 * 
1024 * 1024); // > 5GB
-        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
-        factory.configure(config);
-
-        URI fsUri = URI.create("s3://test-bucket/");
-        assertThatThrownBy(() -> factory.create(fsUri))
-                .isInstanceOf(IllegalArgumentException.class)
-                .hasMessageContaining("must not exceed 5GB");
+    void testSchemeReturnsS3() {
+        assertThat(new 
NativeS3FileSystemFactory().getScheme()).isEqualTo("s3");
     }
 
     @Test
-    void testInvalidMaxConcurrentUploadsThrowsException() {
-        NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
-        Configuration config = new Configuration();
-        config.setString("s3.access-key", "test-access-key");
-        config.setString("s3.secret-key", "test-secret-key");
-        config.setString("s3.region", "us-east-1");
-        config.set(NativeS3FileSystemFactory.MAX_CONCURRENT_UPLOADS, 0);
-        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
-        factory.configure(config);
-
-        URI fsUri = URI.create("s3://test-bucket/");
-        assertThatThrownBy(() -> factory.create(fsUri))
-                .isInstanceOf(IllegalArgumentException.class)
-                .hasMessageContaining("must be positive");
+    void testS3ASchemeReturnsS3A() {
+        assertThat(new 
NativeS3AFileSystemFactory().getScheme()).isEqualTo("s3a");
     }
 
     @Test
-    void testInvalidEntropyKeyThrowsException() {
-        NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
-        Configuration config = new Configuration();
-        config.setString("s3.access-key", "test-access-key");
-        config.setString("s3.secret-key", "test-secret-key");
-        config.setString("s3.region", "us-east-1");
-        config.setString("s3.entropy.key", "__INVALID#KEY__"); // Contains #
-        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
-        factory.configure(config);
-
-        URI fsUri = URI.create("s3://test-bucket/");
-        assertThatThrownBy(() -> factory.create(fsUri))
-                .isInstanceOf(IllegalConfigurationException.class)
-                .hasMessageContaining("Invalid character");
+    void testCreateFileSystemWithMinimalConfiguration() throws Exception {
+        NativeS3FileSystem fs = createFs(baseConfig());
+        assertThat(fs.getUri()).isEqualTo(URI.create("s3://test-bucket/"));
     }
 
     @Test
-    void testInvalidEntropyLengthThrowsException() {
-        NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
-        Configuration config = new Configuration();
-        config.setString("s3.access-key", "test-access-key");
-        config.setString("s3.secret-key", "test-secret-key");
-        config.setString("s3.region", "us-east-1");
-        config.setString("s3.entropy.key", "__ENTROPY__");
-        config.set(NativeS3FileSystemFactory.ENTROPY_INJECT_LENGTH_OPTION, 0); 
// Invalid
-        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
-        factory.configure(config);
+    void testCustomEndpointDoesNotTriggerS3CompatibleMode() throws Exception {
+        Configuration config = baseConfig();
+        config.setString("s3.endpoint", "http://localhost:9000";);
+        S3ClientProvider provider = createFs(config).getClientProvider();
+        assertThat(provider.isPathStyleAccess()).isFalse();
+        assertThat(provider.isChunkedEncoding()).isTrue();
+        assertThat(provider.isChecksumValidation()).isTrue();
+    }
 
-        URI fsUri = URI.create("s3://test-bucket/");
-        assertThatThrownBy(() -> factory.create(fsUri))
-                .isInstanceOf(IllegalConfigurationException.class)
-                .hasMessageContaining("must be > 0");
+    @Test
+    void testS3CompatibleServerExplicitConfiguration() throws Exception {
+        Configuration config = baseConfig();
+        config.setString("s3.endpoint", "http://localhost:9000";);
+        config.set(NativeS3FileSystemFactory.PATH_STYLE_ACCESS, true);
+        config.set(NativeS3FileSystemFactory.CHUNKED_ENCODING_ENABLED, false);
+        config.set(NativeS3FileSystemFactory.CHECKSUM_VALIDATION_ENABLED, 
false);
+        S3ClientProvider provider = createFs(config).getClientProvider();
+        assertThat(provider.isPathStyleAccess()).isTrue();
+        assertThat(provider.isChunkedEncoding()).isFalse();
+        assertThat(provider.isChecksumValidation()).isFalse();
     }
 
     @Test
-    void testEntropyInjectionWithValidConfiguration() throws Exception {
-        NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
-        Configuration config = new Configuration();
-        config.setString("s3.access-key", "test-access-key");
-        config.setString("s3.secret-key", "test-secret-key");
-        config.setString("s3.region", "us-east-1");
-        config.setString("s3.entropy.key", "__ENTROPY__");
-        config.set(NativeS3FileSystemFactory.ENTROPY_INJECT_LENGTH_OPTION, 4);
-        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+    void testS3ACreateFileSystemWithMinimalConfiguration() throws Exception {
+        NativeS3FileSystem fs = createS3aFs(baseConfig());
+        assertThat(fs.getUri()).isEqualTo(URI.create("s3a://test-bucket/"));
+    }
 
-        factory.configure(config);
+    @Test
+    void testS3ACreateFileSystemWithCustomEndpoint() throws Exception {
+        Configuration config = baseConfig();
+        config.setString("s3.endpoint", "http://localhost:9000";);
+        assertThat(createS3aFs(config)).isNotNull();
+    }
 
-        URI fsUri = URI.create("s3://test-bucket/");
-        FileSystem fs = factory.create(fsUri);
+    // --- Path-style access ---
 
-        assertThat(fs).isNotNull();
-        assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
+    @Test
+    void testPathStyleAccessDefaultIsFalse() throws Exception {
+        
assertThat(createFs(baseConfig()).getClientProvider().isPathStyleAccess()).isFalse();
     }
 
     @Test
-    void testPathStyleAccessAutoEnabledForCustomEndpoint() throws Exception {
-        NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
-        Configuration config = new Configuration();
-        config.setString("s3.access-key", "test-access-key");
-        config.setString("s3.secret-key", "test-secret-key");
-        config.setString("s3.endpoint", "http://minio:9000";);
-        config.setString("s3.region", "us-east-1");
-        config.set(NativeS3FileSystemFactory.PATH_STYLE_ACCESS, false); // 
Explicitly set to false
-        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
-        factory.configure(config);
+    void testPathStyleAccessExplicitlyEnabled() throws Exception {
+        Configuration config = baseConfig();
+        config.set(NativeS3FileSystemFactory.PATH_STYLE_ACCESS, true);
+        
assertThat(createFs(config).getClientProvider().isPathStyleAccess()).isTrue();
+    }
 
-        URI fsUri = URI.create("s3://test-bucket/");
-        FileSystem fs = factory.create(fsUri);
+    // --- Chunked encoding ---
 
-        // Should succeed - path-style access is auto-enabled
-        assertThat(fs).isNotNull();
+    @Test
+    void testChunkedEncodingDefaultIsTrue() throws Exception {
+        
assertThat(createFs(baseConfig()).getClientProvider().isChunkedEncoding()).isTrue();
     }
 
     @Test
-    void testBulkCopyConfiguration() throws Exception {
-        NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
-        Configuration config = new Configuration();
-        config.setString("s3.access-key", "test-access-key");
-        config.setString("s3.secret-key", "test-secret-key");
-        config.setString("s3.region", "us-east-1");
-        config.set(NativeS3FileSystemFactory.BULK_COPY_ENABLED, true);
-        config.set(NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT, 32);
-        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
-        factory.configure(config);
+    void testChunkedEncodingExplicitlyDisabled() throws Exception {
+        Configuration config = baseConfig();
+        config.set(NativeS3FileSystemFactory.CHUNKED_ENCODING_ENABLED, false);
+        
assertThat(createFs(config).getClientProvider().isChunkedEncoding()).isFalse();
+    }
 
-        URI fsUri = URI.create("s3://test-bucket/");
-        FileSystem fs = factory.create(fsUri);
+    // --- Checksum validation ---
 
-        assertThat(fs).isNotNull();
-        assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
+    @Test
+    void testChecksumValidationDefaultIsTrue() throws Exception {
+        
assertThat(createFs(baseConfig()).getClientProvider().isChecksumValidation()).isTrue();
     }
 
     @Test
-    void testExplicitRegionConfiguration() throws Exception {
-        NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
-        Configuration config = new Configuration();
-        config.setString("s3.access-key", "test-access-key");
-        config.setString("s3.secret-key", "test-secret-key");
-        config.setString("s3.region", "eu-west-1"); // Explicit non-default 
region
-        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
-        factory.configure(config);
+    void testChecksumValidationExplicitlyDisabled() throws Exception {
+        Configuration config = baseConfig();
+        config.set(NativeS3FileSystemFactory.CHECKSUM_VALIDATION_ENABLED, 
false);
+        
assertThat(createFs(config).getClientProvider().isChecksumValidation()).isFalse();
+    }
 
-        URI fsUri = URI.create("s3://test-bucket/");
-        FileSystem fs = factory.create(fsUri);
+    // --- Max connections ---
 
-        assertThat(fs).isNotNull();
-        assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
+    @Test
+    void testMaxConnectionsDefault() throws Exception {
+        
assertThat(createFs(baseConfig()).getClientProvider().getMaxConnections()).isEqualTo(50);
     }
 
     @Test
-    void testExplicitRegionTakesPriorityOverAutodiscovery() throws Exception {
-        NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
-        Configuration config = new Configuration();
-        config.setString("s3.access-key", "test-access-key");
-        config.setString("s3.secret-key", "test-secret-key");
-        config.setString("s3.region", "ap-southeast-1"); // Explicit region
-        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+    void testMaxConnectionsExplicitlyConfigured() throws Exception {
+        Configuration config = baseConfig();
+        config.set(NativeS3FileSystemFactory.MAX_CONNECTIONS, 100);
+        
assertThat(createFs(config).getClientProvider().getMaxConnections()).isEqualTo(100);
+    }
 
-        factory.configure(config);
+    @Test
+    void testInvalidMaxConnectionsThrowsException() {
+        Configuration config = baseConfig();
+        config.set(NativeS3FileSystemFactory.MAX_CONNECTIONS, 0);
+        assertThatThrownBy(() -> createFs(config))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("s3.connection.max")
+                .hasMessageContaining("must be a positive integer");
+    }
 
-        URI fsUri = URI.create("s3://test-bucket/");
-        FileSystem fs = factory.create(fsUri);
+    // --- Max retries ---
 
-        // Should succeed with explicit region regardless of environment
-        assertThat(fs).isNotNull();
-        assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
+    @Test
+    void testMaxRetriesDefault() throws Exception {
+        
assertThat(createFs(baseConfig()).getClientProvider().getMaxRetries()).isEqualTo(3);
     }
 
     @Test
-    void testRegionAutodiscoveryWithoutExplicitConfig() throws Exception {
-        // This test verifies that region autodiscovery works when no explicit 
region is set.
-        // The test will either succeed (if AWS region can be auto-detected 
from environment)
-        // or fail with a helpful error message.
-        NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
-        Configuration config = new Configuration();
-        config.setString("s3.access-key", "test-access-key");
-        config.setString("s3.secret-key", "test-secret-key");
-        // Intentionally not setting s3.region to test autodiscovery
-        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+    void testMaxRetriesExplicitlyConfigured() throws Exception {
+        Configuration config = baseConfig();
+        config.set(NativeS3FileSystemFactory.MAX_RETRIES, 5);
+        
assertThat(createFs(config).getClientProvider().getMaxRetries()).isEqualTo(5);
+    }
 
-        factory.configure(config);
+    // --- Timeouts ---
 
-        URI fsUri = URI.create("s3://test-bucket/");
+    @Test
+    void testCustomTimeoutConfiguration() throws Exception {
+        Configuration config = baseConfig();
+        config.set(NativeS3FileSystemFactory.CONNECTION_TIMEOUT, 
Duration.ofSeconds(30));
+        config.set(NativeS3FileSystemFactory.SOCKET_TIMEOUT, 
Duration.ofSeconds(45));
+        config.set(NativeS3FileSystemFactory.CONNECTION_MAX_IDLE_TIME, 
Duration.ofMinutes(2));
+        config.set(NativeS3FileSystemFactory.FS_CLOSE_TIMEOUT, 
Duration.ofSeconds(90));
+        config.set(NativeS3FileSystemFactory.CLIENT_CLOSE_TIMEOUT, 
Duration.ofSeconds(15));
 
-        try {
-            FileSystem fs = factory.create(fsUri);
-            assertThat(fs).isNotNull();
-            assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
-        } catch (IllegalArgumentException e) {
-            // If no region can be auto-detected, verify the error message is 
helpful
-            assertThat(e.getMessage()).contains("AWS region could not be 
determined");
-            assertThat(e.getMessage()).contains("s3.region");
-            assertThat(e.getMessage()).contains("AWS_REGION");
-        }
+        NativeS3FileSystem fs = createFs(config);
+        assertThat(fs.getFsCloseTimeout()).isEqualTo(Duration.ofSeconds(90));
+
+        S3ClientProvider clientProvider = fs.getClientProvider();
+        
assertThat(clientProvider.getConnectionTimeout()).isEqualTo(Duration.ofSeconds(30));
+        
assertThat(clientProvider.getSocketTimeout()).isEqualTo(Duration.ofSeconds(45));
+        
assertThat(clientProvider.getConnectionMaxIdleTime()).isEqualTo(Duration.ofMinutes(2));
+        
assertThat(clientProvider.getClientCloseTimeout()).isEqualTo(Duration.ofSeconds(15));
     }
 
     @Test
-    void testRegionAutodiscoveryWithCustomEndpoint() throws Exception {
-        NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
-        Configuration config = new Configuration();
-        config.setString("s3.access-key", "test-access-key");
-        config.setString("s3.secret-key", "test-secret-key");
-        config.setString("s3.endpoint", "http://localhost:9000";);
-        // Intentionally not setting s3.region with custom endpoint
-        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
-        factory.configure(config);
+    void testTimeoutConfigurationWithStringDuration() throws Exception {
+        Configuration config = baseConfig();
+        config.setString("s3.connection.timeout", "30 s");
+        config.setString("s3.socket.timeout", "2 min");
+        config.setString("s3.close.timeout", "1 min");
 
-        URI fsUri = URI.create("s3://test-bucket/");
+        NativeS3FileSystem fs = createFs(config);
+        assertThat(fs.getFsCloseTimeout()).isEqualTo(Duration.ofMinutes(1));
 
-        try {
-            FileSystem fs = factory.create(fsUri);
-            assertThat(fs).isNotNull();
-            assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
-        } catch (IllegalArgumentException e) {
-            assertThat(e.getMessage()).contains("AWS region could not be 
determined");
-            assertThat(e.getMessage()).contains("AWS_REGION");
-            assertThat(e.getMessage()).contains("~/.aws/config");
-        }
+        S3ClientProvider clientProvider = fs.getClientProvider();
+        
assertThat(clientProvider.getConnectionTimeout()).isEqualTo(Duration.ofSeconds(30));
+        
assertThat(clientProvider.getSocketTimeout()).isEqualTo(Duration.ofMinutes(2));
     }
 
     @Test
-    void testEmptyRegionFallsBackToAutodiscovery() throws Exception {
-        NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
-        Configuration config = new Configuration();
-        config.setString("s3.access-key", "test-access-key");
-        config.setString("s3.secret-key", "test-secret-key");
-        config.setString("s3.region", ""); // Empty region should trigger 
autodiscovery
-        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
-        factory.configure(config);
+    void testInvalidTimeoutConfigurationThrowsException() {
+        Configuration config = baseConfig();
+        config.setString("s3.connection.timeout", "not-a-duration");
+        assertThatThrownBy(() -> 
createFs(config)).isInstanceOf(IllegalArgumentException.class);
+    }
 
-        URI fsUri = URI.create("s3://test-bucket/");
+    // --- Part upload size ---
 
-        try {
-            FileSystem fs = factory.create(fsUri);
-            assertThat(fs).isNotNull();
-            assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
-        } catch (IllegalArgumentException e) {
-            // If no region can be auto-detected, verify the error message is 
helpful
-            assertThat(e.getMessage()).contains("AWS region could not be 
determined");
-        }
+    @Test
+    void testPartSizeTooSmallThrowsException() {
+        Configuration config = baseConfig();
+        config.set(NativeS3FileSystemFactory.PART_UPLOAD_MIN_SIZE, 1024L);
+        assertThatThrownBy(() -> createFs(config))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("must be at least");
     }
 
     @Test
-    void testInvalidMaxConnectionsThrowsException() {
-        NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
-        Configuration config = new Configuration();
-        config.setString("s3.access-key", "test-access-key");
-        config.setString("s3.secret-key", "test-secret-key");
-        config.setString("s3.region", "us-east-1");
-        config.set(NativeS3FileSystemFactory.MAX_CONNECTIONS, 0);
-        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+    void testPartSizeTooLargeThrowsException() {
+        Configuration config = baseConfig();
+        config.set(NativeS3FileSystemFactory.PART_UPLOAD_MIN_SIZE, 6L * 1024 * 
1024 * 1024);
+        assertThatThrownBy(() -> createFs(config))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("must not exceed 5GB");
+    }
 
-        factory.configure(config);
+    // --- Max concurrent uploads ---
 
-        URI fsUri = URI.create("s3://test-bucket/");
-        assertThatThrownBy(() -> factory.create(fsUri))
+    @Test
+    void testInvalidMaxConcurrentUploadsThrowsException() {
+        Configuration config = baseConfig();
+        config.set(NativeS3FileSystemFactory.MAX_CONCURRENT_UPLOADS, 0);
+        assertThatThrownBy(() -> createFs(config))
                 .isInstanceOf(IllegalArgumentException.class)
-                .hasMessageContaining("s3.connection.max")
-                .hasMessageContaining("must be a positive integer");
+                .hasMessageContaining("must be positive");
     }
 
+    // --- Entropy injection ---
+
     @Test
-    void testInvalidBulkCopyMaxConcurrentThrowsException() {
-        NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
-        Configuration config = new Configuration();
-        config.setString("s3.access-key", "test-access-key");
-        config.setString("s3.secret-key", "test-secret-key");
-        config.setString("s3.region", "us-east-1");
-        config.set(NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT, 0);
-        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+    void testInvalidEntropyKeyThrowsException() {
+        Configuration config = baseConfig();
+        config.setString("s3.entropy.key", "__INVALID#KEY__");
+        assertThatThrownBy(() -> createFs(config))
+                .isInstanceOf(IllegalConfigurationException.class)
+                .hasMessageContaining("Invalid character");
+    }
 
-        factory.configure(config);
+    @Test
+    void testInvalidEntropyLengthThrowsException() {
+        Configuration config = baseConfig();
+        config.setString("s3.entropy.key", "__ENTROPY__");
+        config.set(NativeS3FileSystemFactory.ENTROPY_INJECT_LENGTH_OPTION, 0);
+        assertThatThrownBy(() -> createFs(config))
+                .isInstanceOf(IllegalConfigurationException.class)
+                .hasMessageContaining("must be > 0");
+    }
 
-        URI fsUri = URI.create("s3://test-bucket/");
-        assertThatThrownBy(() -> factory.create(fsUri))
-                .isInstanceOf(IllegalArgumentException.class)
-                .hasMessageContaining("s3.bulk-copy.max-concurrent")
-                .hasMessageContaining("must be a positive integer");
+    @Test
+    void testEntropyInjectionWithValidConfiguration() throws Exception {
+        Configuration config = baseConfig();
+        config.setString("s3.entropy.key", "__ENTROPY__");
+        config.set(NativeS3FileSystemFactory.ENTROPY_INJECT_LENGTH_OPTION, 4);
+        assertThat(createFs(config)).isNotNull();
     }
 
+    // --- Bulk copy ---
+
     @Test
     void testBulkCopyMaxConcurrentClampedToMaxConnections() throws Exception {
-        NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
-        Configuration config = new Configuration();
-        config.setString("s3.access-key", "test-access-key");
-        config.setString("s3.secret-key", "test-secret-key");
-        config.setString("s3.region", "us-east-1");
+        Configuration config = baseConfig();
         config.set(NativeS3FileSystemFactory.BULK_COPY_ENABLED, true);
         config.set(NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT, 32);
         config.set(NativeS3FileSystemFactory.MAX_CONNECTIONS, 10);
-        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
-        factory.configure(config);
 
-        URI fsUri = URI.create("s3://test-bucket/");
-        FileSystem fs = factory.create(fsUri);
-
-        assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
-        NativeS3FileSystem nativeFs = (NativeS3FileSystem) fs;
-        assertThat(nativeFs.getBulkCopyHelper()).isNotNull();
-        
assertThat(nativeFs.getBulkCopyHelper().getMaxConcurrentCopies()).isEqualTo(10);
+        NativeS3FileSystem fs = createFs(config);
+        assertThat(fs.getBulkCopyHelper()).isNotNull();
+        
assertThat(fs.getBulkCopyHelper().getMaxConcurrentCopies()).isEqualTo(10);
     }
 
     @Test
     void testBulkCopyMaxConcurrentPreservedWithinMaxConnections() throws 
Exception {
-        NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
-        Configuration config = new Configuration();
-        config.setString("s3.access-key", "test-access-key");
-        config.setString("s3.secret-key", "test-secret-key");
-        config.setString("s3.region", "us-east-1");
+        Configuration config = baseConfig();
         config.set(NativeS3FileSystemFactory.BULK_COPY_ENABLED, true);
         config.set(NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT, 10);
         config.set(NativeS3FileSystemFactory.MAX_CONNECTIONS, 50);
-        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
-        factory.configure(config);
 
-        URI fsUri = URI.create("s3://test-bucket/");
-        FileSystem fs = factory.create(fsUri);
-
-        assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
-        NativeS3FileSystem nativeFs = (NativeS3FileSystem) fs;
-        assertThat(nativeFs.getBulkCopyHelper()).isNotNull();
-        
assertThat(nativeFs.getBulkCopyHelper().getMaxConcurrentCopies()).isEqualTo(10);
+        NativeS3FileSystem fs = createFs(config);
+        assertThat(fs.getBulkCopyHelper()).isNotNull();
+        
assertThat(fs.getBulkCopyHelper().getMaxConcurrentCopies()).isEqualTo(10);
     }
 
     @Test
-    void testS3ASchemeReturnsS3A() {
-        NativeS3AFileSystemFactory factory = new NativeS3AFileSystemFactory();
-        assertThat(factory.getScheme()).isEqualTo("s3a");
+    void testInvalidBulkCopyMaxConcurrentThrowsException() {
+        Configuration config = baseConfig();
+        config.set(NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT, 0);
+        assertThatThrownBy(() -> createFs(config))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("s3.bulk-copy.max-concurrent")
+                .hasMessageContaining("must be a positive integer");
     }
 
-    @Test
-    void testS3ACreateFileSystemWithMinimalConfiguration() throws Exception {
-        NativeS3AFileSystemFactory factory = new NativeS3AFileSystemFactory();
-        Configuration config = new Configuration();
-        config.setString("s3.access-key", "test-access-key");
-        config.setString("s3.secret-key", "test-secret-key");
-        config.setString("s3.region", "us-east-1");
-        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
-        factory.configure(config);
-
-        URI fsUri = URI.create("s3a://test-bucket/");
-        FileSystem fs = factory.create(fsUri);
-
-        assertThat(fs).isNotNull();
-        assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
-    }
+    // --- Region ---
 
     @Test
-    void testS3ACreateFileSystemWithCustomEndpoint() throws Exception {
-        NativeS3AFileSystemFactory factory = new NativeS3AFileSystemFactory();
-        Configuration config = new Configuration();
-        config.setString("s3.access-key", "test-access-key");
-        config.setString("s3.secret-key", "test-secret-key");
-        config.setString("s3.endpoint", "http://localhost:9000";);
-        config.setString("s3.region", "us-east-1");
-        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
-        factory.configure(config);
-
-        URI fsUri = URI.create("s3a://test-bucket/path/to/file");
-        FileSystem fs = factory.create(fsUri);
-
-        assertThat(fs).isNotNull();
-        assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
-    }
-
-    @Test
-    void testS3AInheritsAllS3Configuration() throws Exception {
-        NativeS3AFileSystemFactory factory = new NativeS3AFileSystemFactory();
-        Configuration config = new Configuration();
-        config.setString("s3.access-key", "test-access-key");
-        config.setString("s3.secret-key", "test-secret-key");
+    void testExplicitRegionConfiguration() throws Exception {
+        Configuration config = baseConfig();
         config.setString("s3.region", "eu-west-1");
-        config.setString("s3.entropy.key", "__ENTROPY__");
-        config.set(NativeS3FileSystemFactory.ENTROPY_INJECT_LENGTH_OPTION, 6);
-        config.set(NativeS3FileSystemFactory.BULK_COPY_ENABLED, true);
-        config.set(NativeS3FileSystemFactory.USE_ASYNC_OPERATIONS, true);
-        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
-        factory.configure(config);
-
-        URI fsUri = URI.create("s3a://test-bucket/");
-        FileSystem fs = factory.create(fsUri);
-
-        assertThat(fs).isNotNull();
-        assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
+        assertThat(createFs(config)).isNotNull();
     }
 
     @Test
-    void testS3AWithSSEConfiguration() throws Exception {
-        NativeS3AFileSystemFactory factory = new NativeS3AFileSystemFactory();
-        Configuration config = new Configuration();
-        config.setString("s3.access-key", "test-access-key");
-        config.setString("s3.secret-key", "test-secret-key");
-        config.setString("s3.region", "us-east-1");
-        config.setString("s3.sse.type", "sse-s3");
-        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
-        factory.configure(config);
-
-        URI fsUri = URI.create("s3a://test-bucket/");
-        FileSystem fs = factory.create(fsUri);
-
-        assertThat(fs).isNotNull();
-        assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
+    void testRegionAutodiscoveryWithoutExplicitConfig() throws Exception {
+        Configuration config = baseConfig();
+        config.removeConfig(NativeS3FileSystemFactory.REGION);
+        try {
+            assertThat(createFs(config)).isNotNull();
+        } catch (IllegalArgumentException e) {
+            assertThat(e.getMessage()).contains("AWS region could not be 
determined");
+            assertThat(e.getMessage()).contains("s3.region");
+            assertThat(e.getMessage()).contains("AWS_REGION");
+        }
     }
 
     @Test
-    void testCustomTimeoutConfiguration() throws Exception {
-        NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
-        Configuration config = new Configuration();
-        config.setString("s3.access-key", "test-access-key");
-        config.setString("s3.secret-key", "test-secret-key");
-        config.setString("s3.region", "us-east-1");
-        config.set(NativeS3FileSystemFactory.CONNECTION_TIMEOUT, 
Duration.ofSeconds(30));
-        config.set(NativeS3FileSystemFactory.SOCKET_TIMEOUT, 
Duration.ofSeconds(45));
-        config.set(NativeS3FileSystemFactory.CONNECTION_MAX_IDLE_TIME, 
Duration.ofMinutes(2));
-        config.set(NativeS3FileSystemFactory.FS_CLOSE_TIMEOUT, 
Duration.ofSeconds(90));
-        config.set(NativeS3FileSystemFactory.CLIENT_CLOSE_TIMEOUT, 
Duration.ofSeconds(15));
-        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
-        factory.configure(config);
-
-        URI fsUri = URI.create("s3://test-bucket/");
-        FileSystem fs = factory.create(fsUri);
-
-        assertThat(fs).isNotNull();
-        assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
-        NativeS3FileSystem nativeFs = (NativeS3FileSystem) fs;
-        
assertThat(nativeFs.getFsCloseTimeout()).isEqualTo(Duration.ofSeconds(90));
-
-        S3ClientProvider clientProvider = nativeFs.getClientProvider();
-        
assertThat(clientProvider.getConnectionTimeout()).isEqualTo(Duration.ofSeconds(30));
-        
assertThat(clientProvider.getSocketTimeout()).isEqualTo(Duration.ofSeconds(45));
-        
assertThat(clientProvider.getConnectionMaxIdleTime()).isEqualTo(Duration.ofMinutes(2));
-        
assertThat(clientProvider.getClientCloseTimeout()).isEqualTo(Duration.ofSeconds(15));
+    void testRegionAutodiscoveryWithCustomEndpoint() throws Exception {
+        Configuration config = baseConfig();
+        config.setString("s3.endpoint", "http://localhost:9000";);
+        config.removeConfig(NativeS3FileSystemFactory.REGION);
+        try {
+            assertThat(createFs(config)).isNotNull();
+        } catch (IllegalArgumentException e) {
+            assertThat(e.getMessage()).contains("AWS region could not be 
determined");
+            assertThat(e.getMessage()).contains("AWS_REGION");
+            assertThat(e.getMessage()).contains("~/.aws/config");
+        }
     }
 
     @Test
-    void testTimeoutConfigurationWithStringDuration() throws Exception {
-        NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
-        Configuration config = new Configuration();
-        config.setString("s3.access-key", "test-access-key");
-        config.setString("s3.secret-key", "test-secret-key");
-        config.setString("s3.region", "us-east-1");
-        config.setString("s3.connection.timeout", "30 s");
-        config.setString("s3.socket.timeout", "2 min");
-        config.setString("s3.close.timeout", "1 min");
-        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
-        factory.configure(config);
-
-        URI fsUri = URI.create("s3://test-bucket/");
-        FileSystem fs = factory.create(fsUri);
+    void testEmptyRegionFallsBackToAutodiscovery() throws Exception {
+        Configuration config = baseConfig();
+        config.setString("s3.region", "");
+        try {
+            assertThat(createFs(config)).isNotNull();
+        } catch (IllegalArgumentException e) {
+            assertThat(e.getMessage()).contains("AWS region could not be 
determined");
+        }
+    }
 
-        assertThat(fs).isNotNull();
-        NativeS3FileSystem nativeFs = (NativeS3FileSystem) fs;
-        
assertThat(nativeFs.getFsCloseTimeout()).isEqualTo(Duration.ofMinutes(1));
+    // --- s3a scheme ---
 
-        S3ClientProvider clientProvider = nativeFs.getClientProvider();
-        
assertThat(clientProvider.getConnectionTimeout()).isEqualTo(Duration.ofSeconds(30));
-        
assertThat(clientProvider.getSocketTimeout()).isEqualTo(Duration.ofMinutes(2));
+    @Test
+    void testS3AWithSSEConfiguration() throws Exception {
+        Configuration config = baseConfig();
+        config.setString("s3.sse.type", "sse-s3");
+        assertThat(createS3aFs(config)).isNotNull();
     }
 
     @Test
-    void testInvalidTimeoutConfigurationThrowsException() {
-        NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
-        Configuration config = new Configuration();
-        config.setString("s3.access-key", "test-access-key");
-        config.setString("s3.secret-key", "test-secret-key");
-        config.setString("s3.region", "us-east-1");
-        config.setString("s3.connection.timeout", "not-a-duration");
-        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
-        factory.configure(config);
-
-        URI fsUri = URI.create("s3://test-bucket/");
-        assertThatThrownBy(() -> factory.create(fsUri))
-                .isInstanceOf(IllegalArgumentException.class);
+    void testS3AInheritsS3Configuration() throws Exception {
+        Configuration config = baseConfig();
+        config.set(NativeS3FileSystemFactory.PATH_STYLE_ACCESS, true);
+        config.set(NativeS3FileSystemFactory.CHUNKED_ENCODING_ENABLED, false);
+        config.set(NativeS3FileSystemFactory.CHECKSUM_VALIDATION_ENABLED, 
false);
+
+        NativeS3FileSystem fs = createS3aFs(config);
+        assertThat(fs.getClientProvider().isPathStyleAccess()).isTrue();
+        assertThat(fs.getClientProvider().isChunkedEncoding()).isFalse();
+        assertThat(fs.getClientProvider().isChecksumValidation()).isFalse();
     }
 }

Reply via email to