[
https://issues.apache.org/jira/browse/FLINK-39472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18079408#comment-18079408
]
Samrat Deb edited comment on FLINK-39472 at 5/9/26 7:44 AM:
------------------------------------------------------------
Yes.
Few Pointers to Add :
1. i have tried to replicate the issue `Test 6: Entropy doesn't work` in s3
bucket (not minio)
I don't think this issue is reproducible with real s3 bucket.
2. Autodetect region works with s3 server. Illegal argument error message is
reproducible with a real S3 bucket. I have created a bug ticket for 2.4
So, we have incompatibility with Minio, but s3 vanilla bucket works properly.
It has been discussed that Minio is on maintainance mode. it is possible that
support is not available for new changes of s3.
We have also discussed that the focus for native s3 fs is now to make the
Filesystem stable in next release.
so we are good to close the ticket
cc: [~gaborgsomogyi]
was (Author: samrat007):
Yes.
Few Pointers to Add :
1. i have tried to replicate the issue `Test 6: Entropy doesn't work` in s3
bucket (not minio)
I don't think this issue is reproducible with real s3 bucket.
2. Autodetect region works with s3 server. Illegal argument error message is
also not reproducible with real s3 bucket.
So, we have incompatiblity with minio but real s3 vanilla bucket works
properly. It has been discussed that Minio is on maintainance mode . it is
possible that support is not available for new changes of s3.
We have also discussed that the focus for native s3 fs is now to make the
Filesystem stable in next release.
so we are good to close the ticket
cc: [~gaborgsomogyi]
> Release Testing: Verify FLIP-555: Native S3 FileSystem
> ------------------------------------------------------
>
> Key: FLINK-39472
> URL: https://issues.apache.org/jira/browse/FLINK-39472
> Project: Flink
> Issue Type: Sub-task
> Components: Connectors / FileSystem
> Affects Versions: 2.3.0
> Reporter: Samrat Deb
> Assignee: Xiqian Yu
> Priority: Blocker
> Labels: release-testing
> Fix For: 2.3.0
>
>
> h2. Background
> [FLIP-555|https://cwiki.apache.org/confluence/display/FLINK/FLIP-555%3A+Flink+Native+S3+FileSystem]
> introduces \{{flink-s3-fs-native}}, a new S3 filesystem plugin built
> directly on the AWS SDK for Java v2. It replaces the legacy
> \{{flink-s3-fs-hadoop}} and \{{flink-s3-fs-presto}} connectors with a single,
> dependency-isolated module that supports the full \{{FileSystem}},
> \{{RecoverableWriter}}, \{{EntropyInjectingFileSystem}}, and
> \{{PathsCopyingFileSystem}} interfaces.
> h2. Prerequisites
> - A latest built Flink distribution (2.3).
> - An S3 bucket with read/write access (or [MinIO|https://min.io/] for local
> testing).
> - AWS credentials configured (environment variables, \{{~/.aws/credentials}},
> or IAM role).
> - The \{{flink-s3-fs-native}} plugin JAR deployed:
> {code:bash}
> mkdir -p $FLINK_HOME/plugins/s3-fs-native
> cp flink-s3-fs-native-*.jar $FLINK_HOME/plugins/s3-fs-native/
> {code}
> - *Important*: Remove any existing \{{flink-s3-fs-hadoop}} or
> \{{flink-s3-fs-presto}} JARs from the \{{plugins/}} directory to avoid scheme
> conflicts.
> - For MinIO-based testing, start a local instance:
> {code:bash}
> docker run -d --name minio -p 9000:9000 -p 9001:9001 \
> -e MINIO_ROOT_USER=minioadmin -e MINIO_ROOT_PASSWORD=minioadmin \
> minio/minio server /data --console-address ":9001"
> # Create a test bucket
> aws --endpoint-url http://localhost:9000 s3 mb s3://flink-test
> {code}
> - For MinIO, add the following to \{{conf/config.yaml}}:
> {code:yaml}
> s3.endpoint: http://localhost:9000
> s3.access-key: minioadmin
> s3.secret-key: minioadmin
> s3.path-style-access: true
> {code}
> h2. Test Cases
> ----
> h3. Test 1: Plugin Loading and Basic FileSystem Operations
> {*}Objective\{*}: Verify the native S3 plugin loads correctly for both
> \{{s3://}} and \{{s3a://}} URI schemes, and basic filesystem operations
> (write, read, list, delete) work end-to-end.
> {*}Steps\{*}:
> 1. Start a Flink Session cluster:
> {code:bash}
> ./bin/start-cluster.sh
> {code}
> 2. Open the Flink SQL Client:
> {code:bash}
> ./bin/sql-client.sh
> {code}
> 3. Create a table writing to S3 and insert data:
> {code:sql}
> CREATE TABLE s3_test (
> id INT,
> name STRING
> ) WITH (
> 'connector' = 'filesystem',
> 'path' = 's3://flink-test/release-test/basic/',
> 'format' = 'csv'
> );
> INSERT INTO s3_test VALUES (1, 'alice'), (2, 'bob'), (3, 'charlie');
> {code}
> 4. Wait for the batch job to finish (check the Web UI at
> \{{http://localhost:8081}}).
> 5. Read back the data:
> {code:sql}
> SELECT * FROM s3_test;
> {code}
> 6. Repeat steps 3-5 using the \{{s3a://}} scheme:
> {code:sql}
> CREATE TABLE s3a_test (
> id INT,
> name STRING
> ) WITH (
> 'connector' = 'filesystem',
> 'path' = 's3a://flink-test/release-test/basic-s3a/',
> 'format' = 'csv'
> );
> INSERT INTO s3a_test VALUES (10, 'delta'), (20, 'echo');
> SELECT * FROM s3a_test;
> {code}
> 7. Verify the files exist in S3:
> {code:bash}
> aws s3 ls s3://flink-test/release-test/basic/ --recursive
> aws s3 ls s3://flink-test/release-test/basic-s3a/ --recursive
> {code}
> {*}Expected Results\{*}:
> - The plugin loads without errors (no \{{ClassNotFoundException}} or
> scheme-registration errors in the JobManager log).
> - Both \{{s3://}} and \{{s3a://}} produce identical behavior.
> - \{{SELECT}} returns exactly the rows inserted.
> - Files are visible in the S3 bucket.
> {*}Cleanup\{*}:
> {code:bash}
> aws s3 rm s3://flink-test/release-test/basic/ --recursive
> aws s3 rm s3://flink-test/release-test/basic-s3a/ --recursive
> ./bin/stop-cluster.sh
> {code}
> ----
> h3. Test 2: Streaming FileSink with Exactly-Once Semantics (RecoverableWriter)
> {*}Objective\{*}: Verify the \{{RecoverableWriter}} implementation correctly
> handles streaming writes with checkpointing, producing exactly-once output
> files on S3.
> {*}Steps\{*}:
> 1. Configure checkpointing in \{{conf/config.yaml}}:
> {code:yaml}
> execution.checkpointing.interval: 10s
> execution.checkpointing.mode: EXACTLY_ONCE
> state.checkpoints.dir: s3://flink-test/checkpoints/
> {code}
> 2. Start a Flink Session cluster:
> {code:bash}
> ./bin/start-cluster.sh
> {code}
> 3. Open the Flink SQL Client:
> {code:bash}
> ./bin/sql-client.sh
> {code}
> 4. Create a streaming source and an S3 sink:
> {code:sql}
> SET 'execution.runtime-mode' = 'streaming';
> CREATE TABLE datagen_source (
> id INT,
> ts TIMESTAMP(3),
> payload STRING,
> WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
> ) WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '10',
> 'fields.id.kind' = 'sequence',
> 'fields.id.start' = '1',
> 'fields.id.end' = '10000',
> 'fields.payload.length' = '64'
> );
> CREATE TABLE s3_streaming_sink (
> id INT,
> ts TIMESTAMP(3),
> payload STRING
> ) WITH (
> 'connector' = 'filesystem',
> 'path' = 's3://flink-test/release-test/streaming-sink/',
> 'format' = 'csv',
> 'sink.rolling-policy.rollover-interval' = '30s',
> 'sink.rolling-policy.check-interval' = '5s'
> );
> INSERT INTO s3_streaming_sink SELECT * FROM datagen_source;
> {code}
> 5. Let the job run for at least 2 minutes. Verify in the Web UI that multiple
> checkpoints complete successfully.
> 6. Cancel the job with a savepoint:
> {code:bash}
> ./bin/flink cancel -s s3://flink-test/savepoints/ <JOB_ID>
> {code}
> 7. Verify output files on S3:
> {code:bash}
> aws s3 ls s3://flink-test/release-test/streaming-sink/ --recursive
> {code}
> 8. Count total rows across all output files and verify no duplicates:
> {code:bash}
> aws s3 cp s3://flink-test/release-test/streaming-sink/ /tmp/s3-output/
> --recursive
> cat /tmp/s3-output/**/*.csv | sort -t',' -k1 -n | uniq -d | wc -l
> # Expected: 0 duplicates
> wc -l /tmp/s3-output/**/*.csv
> # Expected: total matches the number of records produced
> {code}
> {*}Expected Results\{*}:
> - Checkpoints complete successfully (visible in the Web UI checkpoint
> history).
> - Checkpoint metadata is stored under \{{s3://flink-test/checkpoints/}}.
> - Output files under the sink path contain no duplicate rows.
> - No errors related to multipart uploads, RecoverableWriter, or S3 access in
> the TaskManager logs.
> {*}Cleanup\{*}:
> {code:bash}
> aws s3 rm s3://flink-test/release-test/streaming-sink/ --recursive
> aws s3 rm s3://flink-test/checkpoints/ --recursive
> aws s3 rm s3://flink-test/savepoints/ --recursive
> rm -rf /tmp/s3-output
> ./bin/stop-cluster.sh
> {code}
> ----
> h3. Test 3: Checkpoint and Savepoint on S3 State Backend
> {*}Objective\{*}: Verify that Flink checkpoints and savepoints can be stored
> on S3 and successfully restored, ensuring state integrity across failures.
> {*}Steps\{*}:
> 1. Configure S3 as the checkpoint and savepoint directory in
> \{{conf/config.yaml}}:
> {code:yaml}
> execution.checkpointing.interval: 10s
> execution.checkpointing.mode: EXACTLY_ONCE
> state.checkpoints.dir: s3://flink-test/checkpoints/
> state.savepoints.dir: s3://flink-test/savepoints/
> state.backend.type: hashmap
> {code}
> 2. Start a Flink Session cluster:
> {code:bash}
> ./bin/start-cluster.sh
> {code}
> 3. Submit a stateful streaming job (using the built-in StateMachineExample):
> {code:bash}
> ./bin/flink run -d ./examples/streaming/StateMachineExample.jar
> {code}
> 4. Let the job run for at least 1 minute. Confirm in the Web UI that
> checkpoints are completing.
> 5. Verify checkpoint files exist on S3:
> {code:bash}
> aws s3 ls s3://flink-test/checkpoints/ --recursive | head -20
> {code}
> 6. Trigger a savepoint:
> {code:bash}
> ./bin/flink savepoint <JOB_ID> s3://flink-test/savepoints/
> {code}
> Record the savepoint path from the output.
> 7. Cancel the job:
> {code:bash}
> ./bin/flink cancel <JOB_ID>
> {code}
> 8. Restore from the savepoint:
> {code:bash}
> ./bin/flink run -d -s <SAVEPOINT_PATH>
> ./examples/streaming/StateMachineExample.jar
> {code}
> 9. Verify the restored job is RUNNING in the Web UI and checkpoints resume
> successfully.
> {*}Expected Results\{*}:
> - Checkpoint metadata and state files are stored on S3.
> - Savepoint completes and produces files on S3.
> - The job restores from the savepoint without errors.
> - Checkpoints resume after restore.
> {*}Cleanup\{*}:
> {code:bash}
> ./bin/flink cancel <RESTORED_JOB_ID>
> aws s3 rm s3://flink-test/checkpoints/ --recursive
> aws s3 rm s3://flink-test/savepoints/ --recursive
> ./bin/stop-cluster.sh
> {code}
> ----
> h3. Test 4: Failure Recovery with RecoverableWriter (Exactly-Once Under
> Failures)
> {*}Objective\{*}: Verify that the RecoverableWriter correctly recovers from
> TaskManager failures, producing exactly-once output with no data loss or
> duplication.
> {*}Steps\{*}:
> 1. Configure checkpointing in \{{conf/config.yaml}}:
> {code:yaml}
> execution.checkpointing.interval: 10s
> execution.checkpointing.mode: EXACTLY_ONCE
> state.checkpoints.dir: s3://flink-test/checkpoints/
> restart-strategy.type: fixed-delay
> restart-strategy.fixed-delay.attempts: 10
> restart-strategy.fixed-delay.delay: 5s
> {code}
> 2. Start a Flink cluster with 2 TaskManagers:
> {code:bash}
> ./bin/start-cluster.sh
> ./bin/taskmanager.sh start
> {code}
> 3. Submit a streaming job that writes to S3 using the SQL Client:
> {code:sql}
> SET 'execution.runtime-mode' = 'streaming';
> SET 'parallelism.default' = '2';
> CREATE TABLE datagen_source (
> id INT,
> payload STRING
> ) WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '50',
> 'fields.id.kind' = 'sequence',
> 'fields.id.start' = '1',
> 'fields.id.end' = '100000',
> 'fields.payload.length' = '32'
> );
> CREATE TABLE s3_recovery_sink (
> id INT,
> payload STRING
> ) WITH (
> 'connector' = 'filesystem',
> 'path' = 's3://flink-test/release-test/recovery-sink/',
> 'format' = 'csv',
> 'sink.rolling-policy.rollover-interval' = '20s',
> 'sink.rolling-policy.check-interval' = '5s'
> );
> INSERT INTO s3_recovery_sink SELECT * FROM datagen_source;
> {code}
> 4. Wait until at least 3 checkpoints complete (verify in Web UI).
> 5. Kill one TaskManager to simulate a failure:
> {code:bash}
> kill -9 $(jps | grep TaskManager | tail -1 | awk '\{print $1}')
> {code}
> 6. Observe in the Web UI that the job restarts from the last checkpoint.
> Start a replacement TaskManager if needed:
> {code:bash}
> ./bin/taskmanager.sh start
> {code}
> 7. Let the job run for another 2 minutes, then cancel it:
> {code:bash}
> ./bin/flink cancel <JOB_ID>
> {code}
> 8. Verify data integrity:
> {code:bash}
> aws s3 cp s3://flink-test/release-test/recovery-sink/ /tmp/recovery-output/
> --recursive
> # Check for duplicates by id
> cut -d',' -f1 /tmp/recovery-output/**/*.csv | sort -n | uniq -d | wc -l
> # Expected: 0 duplicates
> {code}
> {*}Expected Results\{*}:
> - The job recovers from the TaskManager failure automatically.
> - Output files contain no duplicate records (exactly-once guarantee).
> - No orphaned \{{.inprogress}} files remain on S3 after final checkpoint.
> - No errors about multipart upload abort or recovery in the TaskManager logs.
> {*}Cleanup\{*}:
> {code:bash}
> aws s3 rm s3://flink-test/release-test/recovery-sink/ --recursive
> aws s3 rm s3://flink-test/checkpoints/ --recursive
> rm -rf /tmp/recovery-output
> ./bin/stop-cluster.sh
> {code}
> ----
> h3. Test 5: Server-Side Encryption (SSE-S3 and SSE-KMS)
> {*}Objective\{*}: Verify that files written to S3 are encrypted using the
> configured server-side encryption type.
> {*}Steps\{*}:
> 1. *SSE-S3 (AES256)*: Add to \{{conf/config.yaml}}:
> {code:yaml}
> s3.sse.type: sse-s3
> {code}
> 2. Start the cluster and submit a batch write job:
> {code:bash}
> ./bin/start-cluster.sh
> {code}
> Using SQL Client:
> {code:sql}
> CREATE TABLE s3_sse_test (id INT, name STRING) WITH (
> 'connector' = 'filesystem',
> 'path' = 's3://flink-test/release-test/sse-test/',
> 'format' = 'csv'
> );
> INSERT INTO s3_sse_test VALUES (1, 'encrypted');
> {code}
> 3. Verify the object is encrypted with AES256:
> {code:bash}
> aws s3api head-object --bucket flink-test \
> --key release-test/sse-test/<output-file-name>
> # Verify: "ServerSideEncryption": "AES256"
> {code}
> 4. Stop the cluster:
> {code:bash}
> ./bin/stop-cluster.sh
> {code}
> 5. *SSE-KMS*: Update \{{conf/config.yaml}}:
> {code:yaml}
> s3.sse.type: sse-kms
> # Optionally specify a KMS key ID; if omitted, the default aws/s3 key is used:
> # s3.sse.kms.key-id: arn:aws:kms:us-east-1:123456789:key/your-key-id
> {code}
> 6. Repeat steps 2-3.
> 7. Verify the object is encrypted with \{{aws:kms}}:
> {code:bash}
> aws s3api head-object --bucket flink-test \
> --key release-test/sse-test/<output-file-name>
> # Verify: "ServerSideEncryption": "aws:kms"
> {code}
> {*}Expected Results\{*}:
> - With \{{s3.sse.type: sse-s3}}, objects have \{{ServerSideEncryption:
> AES256}}.
> - With \{{s3.sse.type: sse-kms}}, objects have \{{ServerSideEncryption:
> aws:kms}}.
> - Data is readable back via Flink (encryption/decryption is transparent).
> {*}Cleanup\{*}:
> {code:bash}
> aws s3 rm s3://flink-test/release-test/sse-test/ --recursive
> ./bin/stop-cluster.sh
> {code}
> ----
> h3. Test 6: Entropy Injection for S3 Partition Sharding
> {*}Objective\{*}: Verify that the \{{EntropyInjectingFileSystem}} correctly
> replaces the entropy key placeholder with random characters, distributing
> writes across S3 partitions.
> {*}Steps\{*}:
> 1. Configure entropy injection in \{{conf/config.yaml}}:
> {code:yaml}
> s3.entropy.key: _entropy_
> s3.entropy.length: 6
> state.checkpoints.dir: s3://flink-test/_entropy_/checkpoints/
> {code}
> 2. Start a Flink Session cluster and submit a stateful job:
> {code:bash}
> ./bin/start-cluster.sh
> ./bin/flink run -d ./examples/streaming/StateMachineExample.jar
> {code}
> 3. Let the job run until at least 3 checkpoints complete.
> 4. List checkpoint files on S3:
> {code:bash}
> aws s3 ls s3://flink-test/ --recursive | grep checkpoints | head -20
> {code}
> 5. Verify the checkpoint paths do NOT contain the literal string
> \{{_entropy_}}. Instead, they contain random 6-character alphanumeric strings
> in that position (e.g., \{{s3://flink-test/a3xk9m/checkpoints/...}}).
> {*}Expected Results\{*}:
> - Checkpoint files are distributed under random prefixes instead of a single
> static path.
> - The literal \{{_entropy_}} placeholder does not appear in any S3 key.
> - Each checkpoint may use a different random prefix.
> {*}Cleanup\{*}:
> {code:bash}
> ./bin/flink cancel <JOB_ID>
> aws s3 rm s3://flink-test/ --recursive --exclude "*" --include
> "*/checkpoints/*"
> ./bin/stop-cluster.sh
> {code}
> ----
> h3. Test 7: Configuration Validation and Edge Cases
> {*}Objective\{*}: Verify that invalid configurations are rejected with clear
> error messages at startup, not at runtime.
> {*}Steps\{*}:
> 1. *Part size below minimum*: Set \{{s3.upload.min.part.size: 1048576}} (1MB,
> below the 5MB S3 minimum). Start the cluster and submit a job that writes to
> S3. Verify the cluster rejects the configuration with an
> \{{IllegalArgumentException}} mentioning the 5MB minimum.
> 2. *Invalid entropy key characters*: Set \{{s3.entropy.key: _test@key_}}.
> Start the cluster and verify it fails with a clear error about invalid
> characters.
> 3. *Zero max concurrent uploads*: Set \{{s3.upload.max.concurrent.uploads:
> 0}}. Start the cluster and verify it fails with an
> \{{IllegalArgumentException}}.
> 4. *Read buffer clamping*: Set \{{s3.read.buffer.size: 10485760}} (10MB,
> above 4MB max). Start the cluster and submit a read job. Verify a WARN log
> appears stating the buffer was clamped to 4MB, and the read operation
> succeeds.
> 5. *Max connections validation*: Set \{{s3.connection.max: 0}}. Verify it
> fails at startup with an \{{IllegalArgumentException}}.
> {*}Expected Results\{*}:
> - Invalid configurations are rejected at filesystem initialization with
> descriptive error messages.
> - The read buffer size is clamped silently with a warning log rather than
> failing.
> ----
> h3. Test 8: Credential Provider Chain
> {*}Objective\{*}: Verify the credential resolution order: explicit static
> keys > environment variables/SDK default chain, and that anonymous access
> works for public buckets.
> {*}Steps\{*}:
> 1. *Static credentials*: Configure \{{s3.access-key}} and \{{s3.secret-key}}
> in \{{conf/config.yaml}}. Start the cluster, submit a write/read job. Verify
> it succeeds.
> 2. *SDK default chain*: Remove \{{s3.access-key}} and \{{s3.secret-key}} from
> config. Ensure credentials are available via environment variables
> (\{{AWS_ACCESS_KEY_ID}}/\{{AWS_SECRET_ACCESS_KEY}}) or
> \{{~/.aws/credentials}}. Restart the cluster, submit a write/read job. Verify
> it succeeds.
> 3. *No credentials*: Remove all credential sources. Start the cluster and
> attempt a write. Verify the job fails with a credentials-related error (not a
> generic 403 without explanation).
> {*}Expected Results\{*}:
> - Static keys take priority over environment credentials when both are
> present.
> - The SDK default chain (env vars, config file, instance profile) works when
> static keys are not set.
> - Missing credentials produce a clear, actionable error message.
> ----
> h3. Test 9: Multipart Upload Boundary — The "Tail" Problem
> {*}Objective\{*}: Verify the RecoverableWriter correctly handles the case
> where a checkpoint occurs before 5MB of data has accumulated (the "tail"
> problem), ensuring no data loss.
> {*}Steps\{*}:
> 1. Configure aggressive checkpointing and small part size in
> \{{conf/config.yaml}}:
> {code:yaml}
> execution.checkpointing.interval: 5s
> execution.checkpointing.mode: EXACTLY_ONCE
> state.checkpoints.dir: s3://flink-test/checkpoints/
> s3.upload.min.part.size: 5242880
> {code}
> 2. Start a cluster and submit a slow-producing streaming job to ensure
> checkpoints trigger before 5MB accumulates:
> {code:sql}
> SET 'execution.runtime-mode' = 'streaming';
> CREATE TABLE slow_source (id INT, data STRING) WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '5',
> 'fields.id.kind' = 'sequence',
> 'fields.id.start' = '1',
> 'fields.id.end' = '50000',
> 'fields.data.length' = '100'
> );
> CREATE TABLE s3_tail_sink (id INT, data STRING) WITH (
> 'connector' = 'filesystem',
> 'path' = 's3://flink-test/release-test/tail-test/',
> 'format' = 'csv',
> 'sink.rolling-policy.rollover-interval' = '60s',
> 'sink.rolling-policy.check-interval' = '5s'
> );
> INSERT INTO s3_tail_sink SELECT * FROM slow_source;
> {code}
> 3. Let the job run for 2 minutes so multiple checkpoints complete with
> sub-5MB data.
> 4. Kill a TaskManager to force recovery:
> {code:bash}
> kill -9 $(jps | grep TaskManager | head -1 | awk '\{print $1}')
> ./bin/taskmanager.sh start
> {code}
> 5. Let the job run for another minute, then cancel it.
> 6. Verify data integrity — no data loss:
> {code:bash}
> aws s3 cp s3://flink-test/release-test/tail-test/ /tmp/tail-output/
> --recursive
> wc -l /tmp/tail-output/**/*.csv
> # Rows should be present — no empty or missing files
> {code}
> {*}Expected Results\{*}:
> - Checkpoints succeed even with sub-5MB buffered data.
> - Recovery restores the writer state correctly, including the incomplete
> "tail" part.
> - No data loss after recovery.
> {*}Cleanup\{*}:
> {code:bash}
> aws s3 rm s3://flink-test/release-test/tail-test/ --recursive
> aws s3 rm s3://flink-test/checkpoints/ --recursive
> rm -rf /tmp/tail-output
> ./bin/stop-cluster.sh
> {code}
> ----
> h3. Test 10: Bulk Copy (PathsCopyingFileSystem) for State Recovery
> {*}Objective\{*}: Verify that the \{{PathsCopyingFileSystem}} implementation
> using \{{S3TransferManager}} correctly downloads state files during
> TaskManager recovery, and that the \{{s3.bulk-copy.enabled}} flag controls
> the behavior.
> {*}Steps\{*}:
> 1. Configure with RocksDB state backend (which triggers state download on
> recovery) in \{{conf/config.yaml}}:
> {code:yaml}
> execution.checkpointing.interval: 10s
> state.backend.type: rocksdb
> state.checkpoints.dir: s3://flink-test/checkpoints/
> s3.bulk-copy.enabled: true
> s3.bulk-copy.max-concurrent: 4
> {code}
> 2. Start a cluster:
> {code:bash}
> ./bin/start-cluster.sh
> ./bin/taskmanager.sh start
> {code}
> 3. Submit a stateful job with keyed state:
> {code:bash}
> ./bin/flink run -d ./examples/streaming/StateMachineExample.jar
> {code}
> 4. Let the job run until at least 5 checkpoints complete.
> 5. Kill one TaskManager:
> {code:bash}
> kill -9 $(jps | grep TaskManager | tail -1 | awk '\{print $1}')
> {code}
> 6. Start a replacement TaskManager:
> {code:bash}
> ./bin/taskmanager.sh start
> {code}
> 7. Verify recovery in the Web UI: the job restores from the checkpoint and
> resumes. Check TaskManager logs for bulk copy activity — look for
> \{{S3TransferManager}} or download-related log lines.
> 8. *Verify disabled bulk copy*: Repeat the test with \{{s3.bulk-copy.enabled:
> false}} and verify recovery still works (falls back to standard sequential
> download).
> {*}Expected Results\{*}:
> - With bulk copy enabled: state files are downloaded using parallel
> \{{S3TransferManager}} operations. Recovery completes successfully.
> - With bulk copy disabled: recovery still works via standard filesystem reads.
> - No \{{OutOfMemoryError}} or connection pool exhaustion errors during
> recovery.
> {*}Cleanup\{*}:
> {code:bash}
> ./bin/flink cancel <JOB_ID>
> aws s3 rm s3://flink-test/checkpoints/ --recursive
> ./bin/stop-cluster.sh
> {code}
> ----
> h3. Test 11: Plugin Isolation — No Classpath Conflicts
> {*}Objective\{*}: Verify that the native S3 plugin does not leak dependencies
> into the user classpath, preventing the \{{NoSuchMethodError}} and
> \{{ClassNotFoundException}} issues common with the legacy Hadoop-based
> connector.
> {*}Steps\{*}:
> 1. Create a simple Flink job that explicitly uses a version of a library also
> bundled inside the native S3 plugin (e.g., Netty, Jackson). For example, a
> job that calls \{{io.netty.util.Version.identify()}} or uses
> \{{com.fasterxml.jackson.databind.ObjectMapper}} directly.
> 2. Submit the job to a cluster with the native S3 plugin installed.
> 3. Verify the job uses its *own* version of the library, not the shaded
> version inside the plugin.
> {*}Expected Results\{*}:
> - No \{{ClassNotFoundException}}, \{{NoSuchMethodError}}, or version mismatch
> errors.
> - The user application's library version is not affected by the plugin's
> bundled dependencies.
> ----
> h3. Test 12: MinIO Compatibility (S3-Compatible Object Store)
> {*}Objective\{*}: Verify end-to-end functionality against MinIO, confirming
> that the \{{s3.endpoint}} and \{{s3.path-style-access}} configurations work
> correctly for S3-compatible stores.
> {*}Steps\{*}:
> 1. Start MinIO (see Prerequisites section).
> 2. Configure Flink in \{{conf/config.yaml}}:
> {code:yaml}
> s3.endpoint: http://localhost:9000
> s3.access-key: minioadmin
> s3.secret-key: minioadmin
> s3.path-style-access: true
> execution.checkpointing.interval: 10s
> state.checkpoints.dir: s3://flink-test/checkpoints/
> {code}
> 3. Start the cluster:
> {code:bash}
> ./bin/start-cluster.sh
> {code}
> 4. Submit a stateful streaming job:
> {code:bash}
> ./bin/flink run -d ./examples/streaming/StateMachineExample.jar
> {code}
> 5. Verify checkpoints complete in the Web UI.
> 6. Trigger and verify a savepoint:
> {code:bash}
> ./bin/flink savepoint <JOB_ID> s3://flink-test/savepoints/
> {code}
> 7. Cancel and restore from savepoint:
> {code:bash}
> ./bin/flink cancel <JOB_ID>
> ./bin/flink run -d -s <SAVEPOINT_PATH>
> ./examples/streaming/StateMachineExample.jar
> {code}
> 8. Verify the restored job is running and checkpoints resume.
> {*}Expected Results\{*}:
> - All operations work against MinIO without modifications.
> - \{{s3.path-style-access}} is auto-enabled when \{{s3.endpoint}} is set
> (verify by checking that it works even without explicitly setting
> \{{s3.path-style-access: true}}).
> - Chunked encoding and checksum validation are automatically disabled for
> custom endpoints.
> {*}Cleanup\{*}:
> {code:bash}
> ./bin/flink cancel <JOB_ID>
> aws --endpoint-url http://localhost:9000 s3 rm s3://flink-test/ --recursive
> ./bin/stop-cluster.sh
> docker stop minio && docker rm minio
> {code}
> ----
> h2. Breaking Change Considerations
> - *URI scheme conflict*: If both \{{flink-s3-fs-native}} and
> \{{flink-s3-fs-hadoop}} are in the \{{plugins/}} directory, only one will
> handle \{{s3://}}. The native plugin has priority \{{-1}} (lowest), so Hadoop
> takes precedence. Users must remove the legacy plugin to use the native one.
> This should be documented clearly.
> - *No \{{s3n://}} support*: The native plugin only registers \{{s3://}} and
> \{{s3a://}}. Jobs using \{{s3n://}} paths must migrate.
> - *Configuration namespace change*: Legacy Hadoop-specific keys (e.g.,
> \{{fs.s3a.*}}) are not recognized by the native plugin. Users must migrate to
> the \{{s3.*}} namespace.
> - *Directory rename*: \{{NativeS3FileSystem.rename()}} throws
> \{{UnsupportedOperationException}} for directories. Any user code relying on
> directory rename will fail. This matches S3 semantics but differs from the
> Hadoop connector which emulated directory rename via copy+delete.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)