This is an automated email from the ASF dual-hosted git repository.
gaborgsomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 76e194a836f [FLINK-39530][s3] Replace S3 compatible auto-detection
with explicit chunked-encoding and checksum-validation config options
76e194a836f is described below
commit 76e194a836f1a67f2605f9b546f8d78603decc0e
Author: Gabor Somogyi <[email protected]>
AuthorDate: Thu Apr 23 22:37:46 2026 +0200
[FLINK-39530][s3] Replace S3 compatible auto-detection with explicit
chunked-encoding and checksum-validation config options
---
.../fs/s3native/NativeS3FileSystemFactory.java | 22 +-
.../apache/flink/fs/s3native/S3ClientProvider.java | 87 ++-
.../fs/s3native/NativeS3FileSystemFactoryTest.java | 672 +++++++--------------
3 files changed, 310 insertions(+), 471 deletions(-)
diff --git
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
index 81d92dc762b..21f0dc0319e 100644
---
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
+++
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
@@ -90,6 +90,22 @@ public class NativeS3FileSystemFactory implements
FileSystemFactory {
.withFallbackKeys("s3.path.style.access")
.withDescription("Use path-style access for S3 (for
S3-compatible storage)");
+ public static final ConfigOption<Boolean> CHUNKED_ENCODING_ENABLED =
+ ConfigOptions.key("s3.chunked-encoding.enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Enable chunked encoding for S3 requests. "
+ + "Disable for S3-compatible servers that
do not support chunked encoding.");
+
+ public static final ConfigOption<Boolean> CHECKSUM_VALIDATION_ENABLED =
+ ConfigOptions.key("s3.checksum-validation.enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Enable checksum validation for S3 requests. "
+ + "Disable for S3-compatible servers that
do not support checksum validation.");
+
public static final ConfigOption<Long> PART_UPLOAD_MIN_SIZE =
ConfigOptions.key("s3.upload.min.part.size")
.longType()
@@ -295,10 +311,6 @@ public class NativeS3FileSystemFactory implements
FileSystemFactory {
String endpoint = config.get(ENDPOINT);
boolean pathStyleAccess = config.get(PATH_STYLE_ACCESS);
- if (endpoint != null && !pathStyleAccess) {
- pathStyleAccess = true;
- }
-
S3EncryptionConfig encryptionConfig =
S3EncryptionConfig.fromConfig(config.get(SSE_TYPE),
config.get(SSE_KMS_KEY_ID));
String entropyInjectionKey = config.get(ENTROPY_INJECT_KEY_OPTION);
@@ -371,6 +383,8 @@ public class NativeS3FileSystemFactory implements
FileSystemFactory {
.region(region)
.endpoint(endpoint)
.pathStyleAccess(pathStyleAccess)
+ .chunkedEncoding(config.get(CHUNKED_ENCODING_ENABLED))
+
.checksumValidation(config.get(CHECKSUM_VALIDATION_ENABLED))
.maxConnections(maxConnections)
.connectionTimeout(config.get(CONNECTION_TIMEOUT))
.socketTimeout(config.get(SOCKET_TIMEOUT))
diff --git
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
index daccd6247cd..656a3f567da 100644
---
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
+++
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
@@ -80,6 +80,11 @@ class S3ClientProvider implements AutoCloseableAsync {
private final Duration connectionTimeout;
private final Duration socketTimeout;
private final Duration connectionMaxIdleTime;
+ private final boolean pathStyleAccess;
+ private final boolean chunkedEncoding;
+ private final boolean checksumValidation;
+ private final int maxConnections;
+ private final int maxRetries;
private final AtomicBoolean closed = new AtomicBoolean(false);
private S3ClientProvider(
@@ -91,7 +96,12 @@ class S3ClientProvider implements AutoCloseableAsync {
Duration clientCloseTimeout,
Duration connectionTimeout,
Duration socketTimeout,
- Duration connectionMaxIdleTime) {
+ Duration connectionMaxIdleTime,
+ boolean pathStyleAccess,
+ boolean chunkedEncoding,
+ boolean checksumValidation,
+ int maxConnections,
+ int maxRetries) {
this.s3Client = s3Client;
this.transferManager = transferManager;
this.encryptionConfig =
@@ -102,6 +112,11 @@ class S3ClientProvider implements AutoCloseableAsync {
this.connectionTimeout = connectionTimeout;
this.socketTimeout = socketTimeout;
this.connectionMaxIdleTime = connectionMaxIdleTime;
+ this.pathStyleAccess = pathStyleAccess;
+ this.chunkedEncoding = chunkedEncoding;
+ this.checksumValidation = checksumValidation;
+ this.maxConnections = maxConnections;
+ this.maxRetries = maxRetries;
}
public S3Client getS3Client() {
@@ -144,6 +159,31 @@ class S3ClientProvider implements AutoCloseableAsync {
return connectionMaxIdleTime;
}
+ @VisibleForTesting
+ boolean isPathStyleAccess() {
+ return pathStyleAccess;
+ }
+
+ @VisibleForTesting
+ boolean isChunkedEncoding() {
+ return chunkedEncoding;
+ }
+
+ @VisibleForTesting
+ boolean isChecksumValidation() {
+ return checksumValidation;
+ }
+
+ @VisibleForTesting
+ int getMaxConnections() {
+ return maxConnections;
+ }
+
+ @VisibleForTesting
+ int getMaxRetries() {
+ return maxRetries;
+ }
+
@Override
public CompletableFuture<Void> closeAsync() {
if (!closed.compareAndSet(false, true)) {
@@ -204,11 +244,12 @@ class S3ClientProvider implements AutoCloseableAsync {
private String region;
private String endpoint;
private boolean pathStyleAccess = false;
+ private boolean chunkedEncoding = true;
+ private boolean checksumValidation = true;
private int maxConnections = 50;
private Duration connectionTimeout = Duration.ofSeconds(60);
private Duration socketTimeout = Duration.ofSeconds(60);
private Duration connectionMaxIdleTime = Duration.ofSeconds(60);
- private boolean disableCertCheck = false;
private int maxRetries = 3;
private Duration clientCloseTimeout = Duration.ofSeconds(30);
@@ -249,6 +290,16 @@ class S3ClientProvider implements AutoCloseableAsync {
return this;
}
+ public Builder chunkedEncoding(boolean chunkedEncoding) {
+ this.chunkedEncoding = chunkedEncoding;
+ return this;
+ }
+
+ public Builder checksumValidation(boolean checksumValidation) {
+ this.checksumValidation = checksumValidation;
+ return this;
+ }
+
public Builder maxConnections(int maxConnections) {
this.maxConnections = maxConnections;
return this;
@@ -274,11 +325,6 @@ class S3ClientProvider implements AutoCloseableAsync {
return this;
}
- public Builder disableCertCheck(boolean disableCertCheck) {
- this.disableCertCheck = disableCertCheck;
- return this;
- }
-
public Builder maxRetries(int maxRetries) {
this.maxRetries = maxRetries;
return this;
@@ -326,14 +372,6 @@ class S3ClientProvider implements AutoCloseableAsync {
}
URI endpointUri = (endpoint != null) ? URI.create(endpoint) : null;
- boolean isS3Compatible = endpointUri != null;
-
- if (isS3Compatible && !pathStyleAccess) {
- pathStyleAccess = true;
- }
- if (isS3Compatible &&
"http".equalsIgnoreCase(endpointUri.getScheme())) {
- disableCertCheck = true;
- }
Region awsRegion = resolveRegion(region);
StsClient stsClient = null;
@@ -347,12 +385,12 @@ class S3ClientProvider implements AutoCloseableAsync {
credentialsProvider = baseProvider;
}
- S3Configuration.Builder s3ConfigBuilder =
-
S3Configuration.builder().pathStyleAccessEnabled(pathStyleAccess);
- if (isS3Compatible) {
-
s3ConfigBuilder.chunkedEncodingEnabled(false).checksumValidationEnabled(false);
- }
- S3Configuration s3Config = s3ConfigBuilder.build();
+ S3Configuration s3Config =
+ S3Configuration.builder()
+ .pathStyleAccessEnabled(pathStyleAccess)
+ .chunkedEncodingEnabled(chunkedEncoding)
+ .checksumValidationEnabled(checksumValidation)
+ .build();
ClientOverrideConfiguration overrideConfig =
ClientOverrideConfiguration.builder()
@@ -406,7 +444,12 @@ class S3ClientProvider implements AutoCloseableAsync {
clientCloseTimeout,
connectionTimeout,
socketTimeout,
- connectionMaxIdleTime);
+ connectionMaxIdleTime,
+ pathStyleAccess,
+ chunkedEncoding,
+ checksumValidation,
+ maxConnections,
+ maxRetries);
}
private AwsCredentialsProvider buildBaseCredentialsProvider() {
diff --git
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
index f9e57b55858..21d04bd09f0 100644
---
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
+++
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.fs.s3native;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.core.fs.FileSystem;
import org.junit.jupiter.api.Test;
@@ -33,565 +32,348 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for {@link NativeS3FileSystemFactory}. */
class NativeS3FileSystemFactoryTest {
- @Test
- void testSchemeReturnsS3() {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- assertThat(factory.getScheme()).isEqualTo("s3");
- }
-
- @Test
- void testConfigureAcceptsConfiguration() {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-key");
- config.setString("s3.secret-key", "test-secret");
-
- // Should not throw
- factory.configure(config);
- }
-
- @Test
- void testCreateFileSystemWithMinimalConfiguration() throws Exception {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
+ private static Configuration baseConfig() {
Configuration config = new Configuration();
config.setString("s3.access-key", "test-access-key");
config.setString("s3.secret-key", "test-secret-key");
config.setString("s3.region", "us-east-1");
config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
- factory.configure(config);
-
- URI fsUri = new URI("s3://test-bucket/");
- FileSystem fs = factory.create(fsUri);
-
- assertThat(fs).isNotNull();
- assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
- assertThat(fs.getUri()).isEqualTo(fsUri);
+ return config;
}
- @Test
- void testCreateFileSystemWithCustomEndpoint() throws Exception {
+ private static NativeS3FileSystem createFs(Configuration config) throws
Exception {
NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.endpoint", "http://localhost:9000");
- config.setString("s3.region", "us-east-1");
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
factory.configure(config);
-
- URI fsUri = new URI("s3://test-bucket/");
- FileSystem fs = factory.create(fsUri);
-
- assertThat(fs).isNotNull();
- assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
+ return (NativeS3FileSystem)
factory.create(URI.create("s3://test-bucket/"));
}
- @Test
- void testPartSizeTooSmallThrowsException() {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.region", "us-east-1");
- config.set(NativeS3FileSystemFactory.PART_UPLOAD_MIN_SIZE, 1024L); //
Too small (< 5MB)
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
+ private static NativeS3FileSystem createS3aFs(Configuration config) throws
Exception {
+ NativeS3AFileSystemFactory factory = new NativeS3AFileSystemFactory();
factory.configure(config);
-
- URI fsUri = URI.create("s3://test-bucket/");
- assertThatThrownBy(() -> factory.create(fsUri))
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining("must be at least");
+ return (NativeS3FileSystem)
factory.create(URI.create("s3a://test-bucket/"));
}
@Test
- void testPartSizeTooLargeThrowsException() {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.region", "us-east-1");
- config.set(
- NativeS3FileSystemFactory.PART_UPLOAD_MIN_SIZE, 6L * 1024 *
1024 * 1024); // > 5GB
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
- factory.configure(config);
-
- URI fsUri = URI.create("s3://test-bucket/");
- assertThatThrownBy(() -> factory.create(fsUri))
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining("must not exceed 5GB");
+ void testSchemeReturnsS3() {
+ assertThat(new
NativeS3FileSystemFactory().getScheme()).isEqualTo("s3");
}
@Test
- void testInvalidMaxConcurrentUploadsThrowsException() {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.region", "us-east-1");
- config.set(NativeS3FileSystemFactory.MAX_CONCURRENT_UPLOADS, 0);
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
- factory.configure(config);
-
- URI fsUri = URI.create("s3://test-bucket/");
- assertThatThrownBy(() -> factory.create(fsUri))
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining("must be positive");
+ void testS3ASchemeReturnsS3A() {
+ assertThat(new
NativeS3AFileSystemFactory().getScheme()).isEqualTo("s3a");
}
@Test
- void testInvalidEntropyKeyThrowsException() {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.region", "us-east-1");
- config.setString("s3.entropy.key", "__INVALID#KEY__"); // Contains #
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
- factory.configure(config);
-
- URI fsUri = URI.create("s3://test-bucket/");
- assertThatThrownBy(() -> factory.create(fsUri))
- .isInstanceOf(IllegalConfigurationException.class)
- .hasMessageContaining("Invalid character");
+ void testCreateFileSystemWithMinimalConfiguration() throws Exception {
+ NativeS3FileSystem fs = createFs(baseConfig());
+ assertThat(fs.getUri()).isEqualTo(URI.create("s3://test-bucket/"));
}
@Test
- void testInvalidEntropyLengthThrowsException() {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.region", "us-east-1");
- config.setString("s3.entropy.key", "__ENTROPY__");
- config.set(NativeS3FileSystemFactory.ENTROPY_INJECT_LENGTH_OPTION, 0);
// Invalid
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
- factory.configure(config);
-
- URI fsUri = URI.create("s3://test-bucket/");
- assertThatThrownBy(() -> factory.create(fsUri))
- .isInstanceOf(IllegalConfigurationException.class)
- .hasMessageContaining("must be > 0");
+ void testCreateFileSystemWithCustomEndpoint() throws Exception {
+ Configuration config = baseConfig();
+ config.setString("s3.endpoint", "http://localhost:9000");
+ assertThat(createFs(config)).isNotNull();
}
@Test
- void testEntropyInjectionWithValidConfiguration() throws Exception {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.region", "us-east-1");
- config.setString("s3.entropy.key", "__ENTROPY__");
- config.set(NativeS3FileSystemFactory.ENTROPY_INJECT_LENGTH_OPTION, 4);
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+ void testS3ACreateFileSystemWithMinimalConfiguration() throws Exception {
+ NativeS3FileSystem fs = createS3aFs(baseConfig());
+ assertThat(fs.getUri()).isEqualTo(URI.create("s3a://test-bucket/"));
+ }
- factory.configure(config);
+ @Test
+ void testS3ACreateFileSystemWithCustomEndpoint() throws Exception {
+ Configuration config = baseConfig();
+ config.setString("s3.endpoint", "http://localhost:9000");
+ assertThat(createS3aFs(config)).isNotNull();
+ }
- URI fsUri = URI.create("s3://test-bucket/");
- FileSystem fs = factory.create(fsUri);
+ // --- Path-style access ---
- assertThat(fs).isNotNull();
- assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
+ @Test
+ void testPathStyleAccessDefaultIsFalse() throws Exception {
+
assertThat(createFs(baseConfig()).getClientProvider().isPathStyleAccess()).isFalse();
}
@Test
- void testPathStyleAccessAutoEnabledForCustomEndpoint() throws Exception {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.endpoint", "http://minio:9000");
- config.setString("s3.region", "us-east-1");
- config.set(NativeS3FileSystemFactory.PATH_STYLE_ACCESS, false); //
Explicitly set to false
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
- factory.configure(config);
+ void testPathStyleAccessExplicitlyEnabled() throws Exception {
+ Configuration config = baseConfig();
+ config.set(NativeS3FileSystemFactory.PATH_STYLE_ACCESS, true);
+
assertThat(createFs(config).getClientProvider().isPathStyleAccess()).isTrue();
+ }
- URI fsUri = URI.create("s3://test-bucket/");
- FileSystem fs = factory.create(fsUri);
+ // --- Chunked encoding ---
- // Should succeed - path-style access is auto-enabled
- assertThat(fs).isNotNull();
+ @Test
+ void testChunkedEncodingDefaultIsTrue() throws Exception {
+
assertThat(createFs(baseConfig()).getClientProvider().isChunkedEncoding()).isTrue();
}
@Test
- void testBulkCopyConfiguration() throws Exception {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.region", "us-east-1");
- config.set(NativeS3FileSystemFactory.BULK_COPY_ENABLED, true);
- config.set(NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT, 32);
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
- factory.configure(config);
+ void testChunkedEncodingExplicitlyDisabled() throws Exception {
+ Configuration config = baseConfig();
+ config.set(NativeS3FileSystemFactory.CHUNKED_ENCODING_ENABLED, false);
+
assertThat(createFs(config).getClientProvider().isChunkedEncoding()).isFalse();
+ }
- URI fsUri = URI.create("s3://test-bucket/");
- FileSystem fs = factory.create(fsUri);
+ // --- Checksum validation ---
- assertThat(fs).isNotNull();
- assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
+ @Test
+ void testChecksumValidationDefaultIsTrue() throws Exception {
+
assertThat(createFs(baseConfig()).getClientProvider().isChecksumValidation()).isTrue();
}
@Test
- void testExplicitRegionConfiguration() throws Exception {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.region", "eu-west-1"); // Explicit non-default
region
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
- factory.configure(config);
+ void testChecksumValidationExplicitlyDisabled() throws Exception {
+ Configuration config = baseConfig();
+ config.set(NativeS3FileSystemFactory.CHECKSUM_VALIDATION_ENABLED,
false);
+
assertThat(createFs(config).getClientProvider().isChecksumValidation()).isFalse();
+ }
- URI fsUri = URI.create("s3://test-bucket/");
- FileSystem fs = factory.create(fsUri);
+ // --- Max connections ---
- assertThat(fs).isNotNull();
- assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
+ @Test
+ void testMaxConnectionsDefault() throws Exception {
+
assertThat(createFs(baseConfig()).getClientProvider().getMaxConnections()).isEqualTo(50);
}
@Test
- void testExplicitRegionTakesPriorityOverAutodiscovery() throws Exception {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.region", "ap-southeast-1"); // Explicit region
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+ void testMaxConnectionsExplicitlyConfigured() throws Exception {
+ Configuration config = baseConfig();
+ config.set(NativeS3FileSystemFactory.MAX_CONNECTIONS, 100);
+
assertThat(createFs(config).getClientProvider().getMaxConnections()).isEqualTo(100);
+ }
- factory.configure(config);
+ @Test
+ void testInvalidMaxConnectionsThrowsException() {
+ Configuration config = baseConfig();
+ config.set(NativeS3FileSystemFactory.MAX_CONNECTIONS, 0);
+ assertThatThrownBy(() -> createFs(config))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("s3.connection.max")
+ .hasMessageContaining("must be a positive integer");
+ }
- URI fsUri = URI.create("s3://test-bucket/");
- FileSystem fs = factory.create(fsUri);
+ // --- Max retries ---
- // Should succeed with explicit region regardless of environment
- assertThat(fs).isNotNull();
- assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
+ @Test
+ void testMaxRetriesDefault() throws Exception {
+
assertThat(createFs(baseConfig()).getClientProvider().getMaxRetries()).isEqualTo(3);
}
@Test
- void testRegionAutodiscoveryWithoutExplicitConfig() throws Exception {
- // This test verifies that region autodiscovery works when no explicit
region is set.
- // The test will either succeed (if AWS region can be auto-detected
from environment)
- // or fail with a helpful error message.
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- // Intentionally not setting s3.region to test autodiscovery
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+ void testMaxRetriesExplicitlyConfigured() throws Exception {
+ Configuration config = baseConfig();
+ config.set(NativeS3FileSystemFactory.MAX_RETRIES, 5);
+
assertThat(createFs(config).getClientProvider().getMaxRetries()).isEqualTo(5);
+ }
- factory.configure(config);
+ // --- Timeouts ---
- URI fsUri = URI.create("s3://test-bucket/");
+ @Test
+ void testCustomTimeoutConfiguration() throws Exception {
+ Configuration config = baseConfig();
+ config.set(NativeS3FileSystemFactory.CONNECTION_TIMEOUT,
Duration.ofSeconds(30));
+ config.set(NativeS3FileSystemFactory.SOCKET_TIMEOUT,
Duration.ofSeconds(45));
+ config.set(NativeS3FileSystemFactory.CONNECTION_MAX_IDLE_TIME,
Duration.ofMinutes(2));
+ config.set(NativeS3FileSystemFactory.FS_CLOSE_TIMEOUT,
Duration.ofSeconds(90));
+ config.set(NativeS3FileSystemFactory.CLIENT_CLOSE_TIMEOUT,
Duration.ofSeconds(15));
- try {
- FileSystem fs = factory.create(fsUri);
- assertThat(fs).isNotNull();
- assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
- } catch (IllegalArgumentException e) {
- // If no region can be auto-detected, verify the error message is
helpful
- assertThat(e.getMessage()).contains("AWS region could not be
determined");
- assertThat(e.getMessage()).contains("s3.region");
- assertThat(e.getMessage()).contains("AWS_REGION");
- }
+ NativeS3FileSystem fs = createFs(config);
+ assertThat(fs.getFsCloseTimeout()).isEqualTo(Duration.ofSeconds(90));
+
+ S3ClientProvider clientProvider = fs.getClientProvider();
+
assertThat(clientProvider.getConnectionTimeout()).isEqualTo(Duration.ofSeconds(30));
+
assertThat(clientProvider.getSocketTimeout()).isEqualTo(Duration.ofSeconds(45));
+
assertThat(clientProvider.getConnectionMaxIdleTime()).isEqualTo(Duration.ofMinutes(2));
+
assertThat(clientProvider.getClientCloseTimeout()).isEqualTo(Duration.ofSeconds(15));
}
@Test
- void testRegionAutodiscoveryWithCustomEndpoint() throws Exception {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.endpoint", "http://localhost:9000");
- // Intentionally not setting s3.region with custom endpoint
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
- factory.configure(config);
+ void testTimeoutConfigurationWithStringDuration() throws Exception {
+ Configuration config = baseConfig();
+ config.setString("s3.connection.timeout", "30 s");
+ config.setString("s3.socket.timeout", "2 min");
+ config.setString("s3.close.timeout", "1 min");
- URI fsUri = URI.create("s3://test-bucket/");
+ NativeS3FileSystem fs = createFs(config);
+ assertThat(fs.getFsCloseTimeout()).isEqualTo(Duration.ofMinutes(1));
- try {
- FileSystem fs = factory.create(fsUri);
- assertThat(fs).isNotNull();
- assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
- } catch (IllegalArgumentException e) {
- assertThat(e.getMessage()).contains("AWS region could not be
determined");
- assertThat(e.getMessage()).contains("AWS_REGION");
- assertThat(e.getMessage()).contains("~/.aws/config");
- }
+ S3ClientProvider clientProvider = fs.getClientProvider();
+
assertThat(clientProvider.getConnectionTimeout()).isEqualTo(Duration.ofSeconds(30));
+
assertThat(clientProvider.getSocketTimeout()).isEqualTo(Duration.ofMinutes(2));
}
@Test
- void testEmptyRegionFallsBackToAutodiscovery() throws Exception {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.region", ""); // Empty region should trigger
autodiscovery
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
- factory.configure(config);
+ void testInvalidTimeoutConfigurationThrowsException() {
+ Configuration config = baseConfig();
+ config.setString("s3.connection.timeout", "not-a-duration");
+ assertThatThrownBy(() ->
createFs(config)).isInstanceOf(IllegalArgumentException.class);
+ }
- URI fsUri = URI.create("s3://test-bucket/");
+ // --- Part upload size ---
- try {
- FileSystem fs = factory.create(fsUri);
- assertThat(fs).isNotNull();
- assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
- } catch (IllegalArgumentException e) {
- // If no region can be auto-detected, verify the error message is
helpful
- assertThat(e.getMessage()).contains("AWS region could not be
determined");
- }
+ @Test
+ void testPartSizeTooSmallThrowsException() {
+ Configuration config = baseConfig();
+ config.set(NativeS3FileSystemFactory.PART_UPLOAD_MIN_SIZE, 1024L);
+ assertThatThrownBy(() -> createFs(config))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("must be at least");
}
@Test
- void testInvalidMaxConnectionsThrowsException() {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.region", "us-east-1");
- config.set(NativeS3FileSystemFactory.MAX_CONNECTIONS, 0);
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+ void testPartSizeTooLargeThrowsException() {
+ Configuration config = baseConfig();
+ config.set(NativeS3FileSystemFactory.PART_UPLOAD_MIN_SIZE, 6L * 1024 *
1024 * 1024);
+ assertThatThrownBy(() -> createFs(config))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("must not exceed 5GB");
+ }
- factory.configure(config);
+ // --- Max concurrent uploads ---
- URI fsUri = URI.create("s3://test-bucket/");
- assertThatThrownBy(() -> factory.create(fsUri))
+ @Test
+ void testInvalidMaxConcurrentUploadsThrowsException() {
+ Configuration config = baseConfig();
+ config.set(NativeS3FileSystemFactory.MAX_CONCURRENT_UPLOADS, 0);
+ assertThatThrownBy(() -> createFs(config))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining("s3.connection.max")
- .hasMessageContaining("must be a positive integer");
+ .hasMessageContaining("must be positive");
}
+ // --- Entropy injection ---
+
@Test
- void testInvalidBulkCopyMaxConcurrentThrowsException() {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.region", "us-east-1");
- config.set(NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT, 0);
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+ void testInvalidEntropyKeyThrowsException() {
+ Configuration config = baseConfig();
+ config.setString("s3.entropy.key", "__INVALID#KEY__");
+ assertThatThrownBy(() -> createFs(config))
+ .isInstanceOf(IllegalConfigurationException.class)
+ .hasMessageContaining("Invalid character");
+ }
- factory.configure(config);
+ @Test
+ void testInvalidEntropyLengthThrowsException() {
+ Configuration config = baseConfig();
+ config.setString("s3.entropy.key", "__ENTROPY__");
+ config.set(NativeS3FileSystemFactory.ENTROPY_INJECT_LENGTH_OPTION, 0);
+ assertThatThrownBy(() -> createFs(config))
+ .isInstanceOf(IllegalConfigurationException.class)
+ .hasMessageContaining("must be > 0");
+ }
- URI fsUri = URI.create("s3://test-bucket/");
- assertThatThrownBy(() -> factory.create(fsUri))
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining("s3.bulk-copy.max-concurrent")
- .hasMessageContaining("must be a positive integer");
+ @Test
+ void testEntropyInjectionWithValidConfiguration() throws Exception {
+ Configuration config = baseConfig();
+ config.setString("s3.entropy.key", "__ENTROPY__");
+ config.set(NativeS3FileSystemFactory.ENTROPY_INJECT_LENGTH_OPTION, 4);
+ assertThat(createFs(config)).isNotNull();
}
+ // --- Bulk copy ---
+
@Test
void testBulkCopyMaxConcurrentClampedToMaxConnections() throws Exception {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.region", "us-east-1");
+ Configuration config = baseConfig();
config.set(NativeS3FileSystemFactory.BULK_COPY_ENABLED, true);
config.set(NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT, 32);
config.set(NativeS3FileSystemFactory.MAX_CONNECTIONS, 10);
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
- factory.configure(config);
- URI fsUri = URI.create("s3://test-bucket/");
- FileSystem fs = factory.create(fsUri);
-
- assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
- NativeS3FileSystem nativeFs = (NativeS3FileSystem) fs;
- assertThat(nativeFs.getBulkCopyHelper()).isNotNull();
-
assertThat(nativeFs.getBulkCopyHelper().getMaxConcurrentCopies()).isEqualTo(10);
+ NativeS3FileSystem fs = createFs(config);
+ assertThat(fs.getBulkCopyHelper()).isNotNull();
+
assertThat(fs.getBulkCopyHelper().getMaxConcurrentCopies()).isEqualTo(10);
}
@Test
void testBulkCopyMaxConcurrentPreservedWithinMaxConnections() throws
Exception {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.region", "us-east-1");
+ Configuration config = baseConfig();
config.set(NativeS3FileSystemFactory.BULK_COPY_ENABLED, true);
config.set(NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT, 10);
config.set(NativeS3FileSystemFactory.MAX_CONNECTIONS, 50);
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
- factory.configure(config);
- URI fsUri = URI.create("s3://test-bucket/");
- FileSystem fs = factory.create(fsUri);
-
- assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
- NativeS3FileSystem nativeFs = (NativeS3FileSystem) fs;
- assertThat(nativeFs.getBulkCopyHelper()).isNotNull();
-
assertThat(nativeFs.getBulkCopyHelper().getMaxConcurrentCopies()).isEqualTo(10);
+ NativeS3FileSystem fs = createFs(config);
+ assertThat(fs.getBulkCopyHelper()).isNotNull();
+
assertThat(fs.getBulkCopyHelper().getMaxConcurrentCopies()).isEqualTo(10);
}
@Test
- void testS3ASchemeReturnsS3A() {
- NativeS3AFileSystemFactory factory = new NativeS3AFileSystemFactory();
- assertThat(factory.getScheme()).isEqualTo("s3a");
+ void testInvalidBulkCopyMaxConcurrentThrowsException() {
+ Configuration config = baseConfig();
+ config.set(NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT, 0);
+ assertThatThrownBy(() -> createFs(config))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("s3.bulk-copy.max-concurrent")
+ .hasMessageContaining("must be a positive integer");
}
- @Test
- void testS3ACreateFileSystemWithMinimalConfiguration() throws Exception {
- NativeS3AFileSystemFactory factory = new NativeS3AFileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.region", "us-east-1");
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
- factory.configure(config);
-
- URI fsUri = URI.create("s3a://test-bucket/");
- FileSystem fs = factory.create(fsUri);
-
- assertThat(fs).isNotNull();
- assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
- }
+ // --- Region ---
@Test
- void testS3ACreateFileSystemWithCustomEndpoint() throws Exception {
- NativeS3AFileSystemFactory factory = new NativeS3AFileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.endpoint", "http://localhost:9000");
- config.setString("s3.region", "us-east-1");
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
- factory.configure(config);
-
- URI fsUri = URI.create("s3a://test-bucket/path/to/file");
- FileSystem fs = factory.create(fsUri);
-
- assertThat(fs).isNotNull();
- assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
- }
-
- @Test
- void testS3AInheritsAllS3Configuration() throws Exception {
- NativeS3AFileSystemFactory factory = new NativeS3AFileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
+ void testExplicitRegionConfiguration() throws Exception {
+ Configuration config = baseConfig();
config.setString("s3.region", "eu-west-1");
- config.setString("s3.entropy.key", "__ENTROPY__");
- config.set(NativeS3FileSystemFactory.ENTROPY_INJECT_LENGTH_OPTION, 6);
- config.set(NativeS3FileSystemFactory.BULK_COPY_ENABLED, true);
- config.set(NativeS3FileSystemFactory.USE_ASYNC_OPERATIONS, true);
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
- factory.configure(config);
-
- URI fsUri = URI.create("s3a://test-bucket/");
- FileSystem fs = factory.create(fsUri);
-
- assertThat(fs).isNotNull();
- assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
+ assertThat(createFs(config)).isNotNull();
}
@Test
- void testS3AWithSSEConfiguration() throws Exception {
- NativeS3AFileSystemFactory factory = new NativeS3AFileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.region", "us-east-1");
- config.setString("s3.sse.type", "sse-s3");
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
- factory.configure(config);
-
- URI fsUri = URI.create("s3a://test-bucket/");
- FileSystem fs = factory.create(fsUri);
-
- assertThat(fs).isNotNull();
- assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
+ void testRegionAutodiscoveryWithoutExplicitConfig() throws Exception {
+ Configuration config = baseConfig();
+ config.removeConfig(NativeS3FileSystemFactory.REGION);
+ try {
+ assertThat(createFs(config)).isNotNull();
+ } catch (IllegalArgumentException e) {
+ assertThat(e.getMessage()).contains("AWS region could not be
determined");
+ assertThat(e.getMessage()).contains("s3.region");
+ assertThat(e.getMessage()).contains("AWS_REGION");
+ }
}
@Test
- void testCustomTimeoutConfiguration() throws Exception {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.region", "us-east-1");
- config.set(NativeS3FileSystemFactory.CONNECTION_TIMEOUT,
Duration.ofSeconds(30));
- config.set(NativeS3FileSystemFactory.SOCKET_TIMEOUT,
Duration.ofSeconds(45));
- config.set(NativeS3FileSystemFactory.CONNECTION_MAX_IDLE_TIME,
Duration.ofMinutes(2));
- config.set(NativeS3FileSystemFactory.FS_CLOSE_TIMEOUT,
Duration.ofSeconds(90));
- config.set(NativeS3FileSystemFactory.CLIENT_CLOSE_TIMEOUT,
Duration.ofSeconds(15));
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
- factory.configure(config);
-
- URI fsUri = URI.create("s3://test-bucket/");
- FileSystem fs = factory.create(fsUri);
-
- assertThat(fs).isNotNull();
- assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
- NativeS3FileSystem nativeFs = (NativeS3FileSystem) fs;
-
assertThat(nativeFs.getFsCloseTimeout()).isEqualTo(Duration.ofSeconds(90));
-
- S3ClientProvider clientProvider = nativeFs.getClientProvider();
-
assertThat(clientProvider.getConnectionTimeout()).isEqualTo(Duration.ofSeconds(30));
-
assertThat(clientProvider.getSocketTimeout()).isEqualTo(Duration.ofSeconds(45));
-
assertThat(clientProvider.getConnectionMaxIdleTime()).isEqualTo(Duration.ofMinutes(2));
-
assertThat(clientProvider.getClientCloseTimeout()).isEqualTo(Duration.ofSeconds(15));
+ void testRegionAutodiscoveryWithCustomEndpoint() throws Exception {
+ Configuration config = baseConfig();
+ config.setString("s3.endpoint", "http://localhost:9000");
+ config.removeConfig(NativeS3FileSystemFactory.REGION);
+ try {
+ assertThat(createFs(config)).isNotNull();
+ } catch (IllegalArgumentException e) {
+ assertThat(e.getMessage()).contains("AWS region could not be
determined");
+ assertThat(e.getMessage()).contains("AWS_REGION");
+ assertThat(e.getMessage()).contains("~/.aws/config");
+ }
}
@Test
- void testTimeoutConfigurationWithStringDuration() throws Exception {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.region", "us-east-1");
- config.setString("s3.connection.timeout", "30 s");
- config.setString("s3.socket.timeout", "2 min");
- config.setString("s3.close.timeout", "1 min");
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
- factory.configure(config);
-
- URI fsUri = URI.create("s3://test-bucket/");
- FileSystem fs = factory.create(fsUri);
+ void testEmptyRegionFallsBackToAutodiscovery() throws Exception {
+ Configuration config = baseConfig();
+ config.setString("s3.region", "");
+ try {
+ assertThat(createFs(config)).isNotNull();
+ } catch (IllegalArgumentException e) {
+ assertThat(e.getMessage()).contains("AWS region could not be
determined");
+ }
+ }
- assertThat(fs).isNotNull();
- NativeS3FileSystem nativeFs = (NativeS3FileSystem) fs;
-
assertThat(nativeFs.getFsCloseTimeout()).isEqualTo(Duration.ofMinutes(1));
+ // --- s3a scheme ---
- S3ClientProvider clientProvider = nativeFs.getClientProvider();
-
assertThat(clientProvider.getConnectionTimeout()).isEqualTo(Duration.ofSeconds(30));
-
assertThat(clientProvider.getSocketTimeout()).isEqualTo(Duration.ofMinutes(2));
+ @Test
+ void testS3AWithSSEConfiguration() throws Exception {
+ Configuration config = baseConfig();
+ config.setString("s3.sse.type", "sse-s3");
+ assertThat(createS3aFs(config)).isNotNull();
}
@Test
- void testInvalidTimeoutConfigurationThrowsException() {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.region", "us-east-1");
- config.setString("s3.connection.timeout", "not-a-duration");
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
- factory.configure(config);
-
- URI fsUri = URI.create("s3://test-bucket/");
- assertThatThrownBy(() -> factory.create(fsUri))
- .isInstanceOf(IllegalArgumentException.class);
+ void testS3AInheritsS3Configuration() throws Exception {
+ Configuration config = baseConfig();
+ config.set(NativeS3FileSystemFactory.PATH_STYLE_ACCESS, true);
+ config.set(NativeS3FileSystemFactory.CHUNKED_ENCODING_ENABLED, false);
+ config.set(NativeS3FileSystemFactory.CHECKSUM_VALIDATION_ENABLED,
false);
+
+ NativeS3FileSystem fs = createS3aFs(config);
+ assertThat(fs.getClientProvider().isPathStyleAccess()).isTrue();
+ assertThat(fs.getClientProvider().isChunkedEncoding()).isFalse();
+ assertThat(fs.getClientProvider().isChecksumValidation()).isFalse();
}
}