Samrat002 commented on code in PR #28009:
URL: https://github.com/apache/flink/pull/28009#discussion_r3131635159
##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java:
##########
@@ -295,10 +311,6 @@ public FileSystem create(URI fsUri) throws IOException {
String endpoint = config.get(ENDPOINT);
boolean pathStyleAccess = config.get(PATH_STYLE_ACCESS);
- if (endpoint != null && !pathStyleAccess) {
Review Comment:
This removal changes behavior for existing MinIO users. Previously, setting
onlys3.endpoint=http://minio:9000 was sufficient. path-style, chunked and
checksum were all auto-configured.
After this change user must explicitly set three additional flags.
Could we add a WARN-level log when endpoint != null and chunkedEncoding +
checksumValidation are both at their defaults (true)?
##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java:
##########
@@ -90,6 +90,22 @@ public class NativeS3FileSystemFactory implements
FileSystemFactory {
.withFallbackKeys("s3.path.style.access")
.withDescription("Use path-style access for S3 (for
S3-compatible storage)");
+ public static final ConfigOption<Boolean> CHUNKED_ENCODING_ENABLED =
Review Comment:
[NIT] The descriptions for both new config options could mention the typical
S3-compatible servers that need these disabled (MinIO, etc) and reference
`s3.path-style-access` so users discover all three knobs together.
##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java:
##########
@@ -102,6 +112,11 @@ private S3ClientProvider(
this.connectionTimeout = connectionTimeout;
this.socketTimeout = socketTimeout;
this.connectionMaxIdleTime = connectionMaxIdleTime;
+ this.pathStyleAccess = pathStyleAccess;
Review Comment:
The constructor now takes very long parameter list 😄 . i see lot of these
params are already baked into the S3Client and S3Configuration at build time.
grouping them into a small config record would be better here
##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java:
##########
@@ -33,565 +32,349 @@
/** Tests for {@link NativeS3FileSystemFactory}. */
class NativeS3FileSystemFactoryTest {
- @Test
- void testSchemeReturnsS3() {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- assertThat(factory.getScheme()).isEqualTo("s3");
- }
-
- @Test
- void testConfigureAcceptsConfiguration() {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-key");
- config.setString("s3.secret-key", "test-secret");
-
- // Should not throw
- factory.configure(config);
- }
-
- @Test
- void testCreateFileSystemWithMinimalConfiguration() throws Exception {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
+ private static Configuration baseConfig() {
Configuration config = new Configuration();
config.setString("s3.access-key", "test-access-key");
config.setString("s3.secret-key", "test-secret-key");
config.setString("s3.region", "us-east-1");
config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
- factory.configure(config);
-
- URI fsUri = new URI("s3://test-bucket/");
- FileSystem fs = factory.create(fsUri);
-
- assertThat(fs).isNotNull();
- assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
- assertThat(fs.getUri()).isEqualTo(fsUri);
+ return config;
}
- @Test
- void testCreateFileSystemWithCustomEndpoint() throws Exception {
+ private static NativeS3FileSystem createFs(Configuration config) throws
Exception {
NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.endpoint", "http://localhost:9000");
- config.setString("s3.region", "us-east-1");
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
factory.configure(config);
-
- URI fsUri = new URI("s3://test-bucket/");
- FileSystem fs = factory.create(fsUri);
-
- assertThat(fs).isNotNull();
- assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
+ return (NativeS3FileSystem)
factory.create(URI.create("s3://test-bucket/"));
}
- @Test
- void testPartSizeTooSmallThrowsException() {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.region", "us-east-1");
- config.set(NativeS3FileSystemFactory.PART_UPLOAD_MIN_SIZE, 1024L); //
Too small (< 5MB)
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
+ private static NativeS3FileSystem createS3aFs(Configuration config) throws
Exception {
+ NativeS3AFileSystemFactory factory = new NativeS3AFileSystemFactory();
factory.configure(config);
-
- URI fsUri = URI.create("s3://test-bucket/");
- assertThatThrownBy(() -> factory.create(fsUri))
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining("must be at least");
+ return (NativeS3FileSystem)
factory.create(URI.create("s3a://test-bucket/"));
}
@Test
- void testPartSizeTooLargeThrowsException() {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.region", "us-east-1");
- config.set(
- NativeS3FileSystemFactory.PART_UPLOAD_MIN_SIZE, 6L * 1024 *
1024 * 1024); // > 5GB
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
- factory.configure(config);
-
- URI fsUri = URI.create("s3://test-bucket/");
- assertThatThrownBy(() -> factory.create(fsUri))
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining("must not exceed 5GB");
+ void testSchemeReturnsS3() {
+ assertThat(new
NativeS3FileSystemFactory().getScheme()).isEqualTo("s3");
}
@Test
- void testInvalidMaxConcurrentUploadsThrowsException() {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.region", "us-east-1");
- config.set(NativeS3FileSystemFactory.MAX_CONCURRENT_UPLOADS, 0);
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
- factory.configure(config);
-
- URI fsUri = URI.create("s3://test-bucket/");
- assertThatThrownBy(() -> factory.create(fsUri))
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining("must be positive");
+ void testS3ASchemeReturnsS3A() {
+ assertThat(new
NativeS3AFileSystemFactory().getScheme()).isEqualTo("s3a");
}
@Test
- void testInvalidEntropyKeyThrowsException() {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.region", "us-east-1");
- config.setString("s3.entropy.key", "__INVALID#KEY__"); // Contains #
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
- factory.configure(config);
-
- URI fsUri = URI.create("s3://test-bucket/");
- assertThatThrownBy(() -> factory.create(fsUri))
- .isInstanceOf(IllegalConfigurationException.class)
- .hasMessageContaining("Invalid character");
+ void testCreateFileSystemWithMinimalConfiguration() throws Exception {
+ NativeS3FileSystem fs = createFs(baseConfig());
+ assertThat(fs.getUri()).isEqualTo(URI.create("s3://test-bucket/"));
}
@Test
- void testInvalidEntropyLengthThrowsException() {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.region", "us-east-1");
- config.setString("s3.entropy.key", "__ENTROPY__");
- config.set(NativeS3FileSystemFactory.ENTROPY_INJECT_LENGTH_OPTION, 0);
// Invalid
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
- factory.configure(config);
-
- URI fsUri = URI.create("s3://test-bucket/");
- assertThatThrownBy(() -> factory.create(fsUri))
- .isInstanceOf(IllegalConfigurationException.class)
- .hasMessageContaining("must be > 0");
+ void testCreateFileSystemWithCustomEndpoint() throws Exception {
+ Configuration config = baseConfig();
+ config.setString("s3.endpoint", "http://localhost:9000");
+ assertThat(createFs(config)).isNotNull();
Review Comment:
This test only checks notNull for a custom endpoint, but does not validate
the actual fix.
##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java:
##########
@@ -33,565 +32,349 @@
/** Tests for {@link NativeS3FileSystemFactory}. */
class NativeS3FileSystemFactoryTest {
- @Test
- void testSchemeReturnsS3() {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- assertThat(factory.getScheme()).isEqualTo("s3");
- }
-
- @Test
- void testConfigureAcceptsConfiguration() {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-key");
- config.setString("s3.secret-key", "test-secret");
-
- // Should not throw
- factory.configure(config);
- }
-
- @Test
- void testCreateFileSystemWithMinimalConfiguration() throws Exception {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
+ private static Configuration baseConfig() {
Configuration config = new Configuration();
config.setString("s3.access-key", "test-access-key");
config.setString("s3.secret-key", "test-secret-key");
config.setString("s3.region", "us-east-1");
config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
- factory.configure(config);
-
- URI fsUri = new URI("s3://test-bucket/");
- FileSystem fs = factory.create(fsUri);
-
- assertThat(fs).isNotNull();
- assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
- assertThat(fs.getUri()).isEqualTo(fsUri);
+ return config;
}
- @Test
- void testCreateFileSystemWithCustomEndpoint() throws Exception {
+ private static NativeS3FileSystem createFs(Configuration config) throws
Exception {
NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.endpoint", "http://localhost:9000");
- config.setString("s3.region", "us-east-1");
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
factory.configure(config);
-
- URI fsUri = new URI("s3://test-bucket/");
- FileSystem fs = factory.create(fsUri);
-
- assertThat(fs).isNotNull();
- assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
+ return (NativeS3FileSystem)
factory.create(URI.create("s3://test-bucket/"));
}
- @Test
- void testPartSizeTooSmallThrowsException() {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.region", "us-east-1");
- config.set(NativeS3FileSystemFactory.PART_UPLOAD_MIN_SIZE, 1024L); //
Too small (< 5MB)
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
+ private static NativeS3FileSystem createS3aFs(Configuration config) throws
Exception {
+ NativeS3AFileSystemFactory factory = new NativeS3AFileSystemFactory();
factory.configure(config);
-
- URI fsUri = URI.create("s3://test-bucket/");
- assertThatThrownBy(() -> factory.create(fsUri))
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining("must be at least");
+ return (NativeS3FileSystem)
factory.create(URI.create("s3a://test-bucket/"));
}
@Test
- void testPartSizeTooLargeThrowsException() {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.region", "us-east-1");
- config.set(
- NativeS3FileSystemFactory.PART_UPLOAD_MIN_SIZE, 6L * 1024 *
1024 * 1024); // > 5GB
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
- factory.configure(config);
-
- URI fsUri = URI.create("s3://test-bucket/");
- assertThatThrownBy(() -> factory.create(fsUri))
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining("must not exceed 5GB");
+ void testSchemeReturnsS3() {
+ assertThat(new
NativeS3FileSystemFactory().getScheme()).isEqualTo("s3");
}
@Test
- void testInvalidMaxConcurrentUploadsThrowsException() {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.region", "us-east-1");
- config.set(NativeS3FileSystemFactory.MAX_CONCURRENT_UPLOADS, 0);
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
- factory.configure(config);
-
- URI fsUri = URI.create("s3://test-bucket/");
- assertThatThrownBy(() -> factory.create(fsUri))
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining("must be positive");
+ void testS3ASchemeReturnsS3A() {
+ assertThat(new
NativeS3AFileSystemFactory().getScheme()).isEqualTo("s3a");
}
@Test
- void testInvalidEntropyKeyThrowsException() {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.region", "us-east-1");
- config.setString("s3.entropy.key", "__INVALID#KEY__"); // Contains #
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
- factory.configure(config);
-
- URI fsUri = URI.create("s3://test-bucket/");
- assertThatThrownBy(() -> factory.create(fsUri))
- .isInstanceOf(IllegalConfigurationException.class)
- .hasMessageContaining("Invalid character");
+ void testCreateFileSystemWithMinimalConfiguration() throws Exception {
+ NativeS3FileSystem fs = createFs(baseConfig());
+ assertThat(fs.getUri()).isEqualTo(URI.create("s3://test-bucket/"));
}
@Test
- void testInvalidEntropyLengthThrowsException() {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.region", "us-east-1");
- config.setString("s3.entropy.key", "__ENTROPY__");
- config.set(NativeS3FileSystemFactory.ENTROPY_INJECT_LENGTH_OPTION, 0);
// Invalid
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
- factory.configure(config);
-
- URI fsUri = URI.create("s3://test-bucket/");
- assertThatThrownBy(() -> factory.create(fsUri))
- .isInstanceOf(IllegalConfigurationException.class)
- .hasMessageContaining("must be > 0");
+ void testCreateFileSystemWithCustomEndpoint() throws Exception {
+ Configuration config = baseConfig();
+ config.setString("s3.endpoint", "http://localhost:9000");
+ assertThat(createFs(config)).isNotNull();
}
@Test
- void testEntropyInjectionWithValidConfiguration() throws Exception {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.region", "us-east-1");
- config.setString("s3.entropy.key", "__ENTROPY__");
- config.set(NativeS3FileSystemFactory.ENTROPY_INJECT_LENGTH_OPTION, 4);
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+ void testS3ACreateFileSystemWithMinimalConfiguration() throws Exception {
+ NativeS3FileSystem fs = createS3aFs(baseConfig());
+ assertThat(fs.getUri()).isEqualTo(URI.create("s3a://test-bucket/"));
+ }
- factory.configure(config);
+ @Test
+ void testS3ACreateFileSystemWithCustomEndpoint() throws Exception {
+ Configuration config = baseConfig();
+ config.setString("s3.endpoint", "http://localhost:9000");
+ assertThat(createS3aFs(config)).isNotNull();
+ }
- URI fsUri = URI.create("s3://test-bucket/");
- FileSystem fs = factory.create(fsUri);
+ // --- Path-style access ---
- assertThat(fs).isNotNull();
- assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
+ @Test
+ void testPathStyleAccessDefaultIsFalse() throws Exception {
+
assertThat(createFs(baseConfig()).getClientProvider().isPathStyleAccess()).isFalse();
}
@Test
- void testPathStyleAccessAutoEnabledForCustomEndpoint() throws Exception {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.endpoint", "http://minio:9000");
- config.setString("s3.region", "us-east-1");
- config.set(NativeS3FileSystemFactory.PATH_STYLE_ACCESS, false); //
Explicitly set to false
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
- factory.configure(config);
+ void testPathStyleAccessExplicitlyEnabled() throws Exception {
+ Configuration config = baseConfig();
+ config.set(NativeS3FileSystemFactory.PATH_STYLE_ACCESS, true);
+
assertThat(createFs(config).getClientProvider().isPathStyleAccess()).isTrue();
+ }
- URI fsUri = URI.create("s3://test-bucket/");
- FileSystem fs = factory.create(fsUri);
+ // --- Chunked encoding ---
- // Should succeed - path-style access is auto-enabled
- assertThat(fs).isNotNull();
+ @Test
+ void testChunkedEncodingDefaultIsTrue() throws Exception {
+
assertThat(createFs(baseConfig()).getClientProvider().isChunkedEncoding()).isTrue();
}
@Test
- void testBulkCopyConfiguration() throws Exception {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.region", "us-east-1");
- config.set(NativeS3FileSystemFactory.BULK_COPY_ENABLED, true);
- config.set(NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT, 32);
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
- factory.configure(config);
+ void testChunkedEncodingExplicitlyDisabled() throws Exception {
+ Configuration config = baseConfig();
+ config.set(NativeS3FileSystemFactory.CHUNKED_ENCODING_ENABLED, false);
+
assertThat(createFs(config).getClientProvider().isChunkedEncoding()).isFalse();
+ }
- URI fsUri = URI.create("s3://test-bucket/");
- FileSystem fs = factory.create(fsUri);
+ // --- Checksum validation ---
- assertThat(fs).isNotNull();
- assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
+ @Test
+ void testChecksumValidationDefaultIsTrue() throws Exception {
+
assertThat(createFs(baseConfig()).getClientProvider().isChecksumValidation()).isTrue();
}
@Test
- void testExplicitRegionConfiguration() throws Exception {
- NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
- Configuration config = new Configuration();
- config.setString("s3.access-key", "test-access-key");
- config.setString("s3.secret-key", "test-secret-key");
- config.setString("s3.region", "eu-west-1"); // Explicit non-default
region
- config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
-
- factory.configure(config);
+ void testChecksumValidationExplicitlyDisabled() throws Exception {
Review Comment:
There's no test proving that the new explicit opt-in works end-to-end for
S3-compatible servers.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]