[ 
https://issues.apache.org/jira/browse/FLINK-39472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18078577#comment-18078577
 ] 

Xiqian Yu commented on FLINK-39472:
-----------------------------------

Hello, may I ask if this ticket is still untaken? If not, I'd like to test and 
verify this feature.

> Release Testing: Verify FLIP-555: Native S3 FileSystem
> ------------------------------------------------------
>
>                 Key: FLINK-39472
>                 URL: https://issues.apache.org/jira/browse/FLINK-39472
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Connectors / FileSystem
>    Affects Versions: 2.3.0
>            Reporter: Samrat Deb
>            Priority: Blocker
>              Labels: release-testing
>
> h2. Background
> [FLIP-555|https://cwiki.apache.org/confluence/display/FLINK/FLIP-555%3A+Flink+Native+S3+FileSystem]
>  introduces \{{flink-s3-fs-native}}, a new S3 filesystem plugin built 
> directly on the AWS SDK for Java v2. It replaces the legacy 
> \{{flink-s3-fs-hadoop}} and \{{flink-s3-fs-presto}} connectors with a single, 
> dependency-isolated module that supports the full \{{FileSystem}}, 
> \{{RecoverableWriter}}, \{{EntropyInjectingFileSystem}}, and 
> \{{PathsCopyingFileSystem}} interfaces.
> h2. Prerequisites
> - A latest built Flink distribution (2.3).
> - An S3 bucket with read/write access (or [MinIO|https://min.io/] for local 
> testing).
> - AWS credentials configured (environment variables, \{{~/.aws/credentials}}, 
> or IAM role).
> - The \{{flink-s3-fs-native}} plugin JAR deployed:
> {code:bash}
> mkdir -p $FLINK_HOME/plugins/s3-fs-native
> cp flink-s3-fs-native-*.jar $FLINK_HOME/plugins/s3-fs-native/
> {code}
> - *Important*: Remove any existing \{{flink-s3-fs-hadoop}} or 
> \{{flink-s3-fs-presto}} JARs from the \{{plugins/}} directory to avoid scheme 
> conflicts.
> - For MinIO-based testing, start a local instance:
> {code:bash}
> docker run -d --name minio -p 9000:9000 -p 9001:9001 \
>   -e MINIO_ROOT_USER=minioadmin -e MINIO_ROOT_PASSWORD=minioadmin \
>   minio/minio server /data --console-address ":9001"
> # Create a test bucket
> aws --endpoint-url http://localhost:9000 s3 mb s3://flink-test
> {code}
> - For MinIO, add the following to \{{conf/config.yaml}}:
> {code:yaml}
> s3.endpoint: http://localhost:9000
> s3.access-key: minioadmin
> s3.secret-key: minioadmin
> s3.path-style-access: true
> {code}
> h2. Test Cases
> ----
> h3. Test 1: Plugin Loading and Basic FileSystem Operations
> {*}Objective\{*}: Verify the native S3 plugin loads correctly for both 
> \{{s3://}} and \{{s3a://}} URI schemes, and basic filesystem operations 
> (write, read, list, delete) work end-to-end.
> {*}Steps\{*}:
> 1. Start a Flink Session cluster:
> {code:bash}
> ./bin/start-cluster.sh
> {code}
> 2. Open the Flink SQL Client:
> {code:bash}
> ./bin/sql-client.sh
> {code}
> 3. Create a table writing to S3 and insert data:
> {code:sql}
> CREATE TABLE s3_test (
>   id INT,
>   name STRING
> ) WITH (
>   'connector' = 'filesystem',
>   'path' = 's3://flink-test/release-test/basic/',
>   'format' = 'csv'
> );
> INSERT INTO s3_test VALUES (1, 'alice'), (2, 'bob'), (3, 'charlie');
> {code}
> 4. Wait for the batch job to finish (check the Web UI at 
> \{{http://localhost:8081}}).
> 5. Read back the data:
> {code:sql}
> SELECT * FROM s3_test;
> {code}
> 6. Repeat steps 3-5 using the \{{s3a://}} scheme:
> {code:sql}
> CREATE TABLE s3a_test (
>   id INT,
>   name STRING
> ) WITH (
>   'connector' = 'filesystem',
>   'path' = 's3a://flink-test/release-test/basic-s3a/',
>   'format' = 'csv'
> );
> INSERT INTO s3a_test VALUES (10, 'delta'), (20, 'echo');
> SELECT * FROM s3a_test;
> {code}
> 7. Verify the files exist in S3:
> {code:bash}
> aws s3 ls s3://flink-test/release-test/basic/ --recursive
> aws s3 ls s3://flink-test/release-test/basic-s3a/ --recursive
> {code}
> {*}Expected Results\{*}:
> - The plugin loads without errors (no \{{ClassNotFoundException}} or 
> scheme-registration errors in the JobManager log).
> - Both \{{s3://}} and \{{s3a://}} produce identical behavior.
> - \{{SELECT}} returns exactly the rows inserted.
> - Files are visible in the S3 bucket.
> {*}Cleanup\{*}:
> {code:bash}
> aws s3 rm s3://flink-test/release-test/basic/ --recursive
> aws s3 rm s3://flink-test/release-test/basic-s3a/ --recursive
> ./bin/stop-cluster.sh
> {code}
> ----
> h3. Test 2: Streaming FileSink with Exactly-Once Semantics (RecoverableWriter)
> {*}Objective\{*}: Verify the \{{RecoverableWriter}} implementation correctly 
> handles streaming writes with checkpointing, producing exactly-once output 
> files on S3.
> {*}Steps\{*}:
> 1. Configure checkpointing in \{{conf/config.yaml}}:
> {code:yaml}
> execution.checkpointing.interval: 10s
> execution.checkpointing.mode: EXACTLY_ONCE
> state.checkpoints.dir: s3://flink-test/checkpoints/
> {code}
> 2. Start a Flink Session cluster:
> {code:bash}
> ./bin/start-cluster.sh
> {code}
> 3. Open the Flink SQL Client:
> {code:bash}
> ./bin/sql-client.sh
> {code}
> 4. Create a streaming source and an S3 sink:
> {code:sql}
> SET 'execution.runtime-mode' = 'streaming';
> CREATE TABLE datagen_source (
>   id INT,
>   ts TIMESTAMP(3),
>   payload STRING,
>   WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
> ) WITH (
>   'connector' = 'datagen',
>   'rows-per-second' = '10',
>   'fields.id.kind' = 'sequence',
>   'fields.id.start' = '1',
>   'fields.id.end' = '10000',
>   'fields.payload.length' = '64'
> );
> CREATE TABLE s3_streaming_sink (
>   id INT,
>   ts TIMESTAMP(3),
>   payload STRING
> ) WITH (
>   'connector' = 'filesystem',
>   'path' = 's3://flink-test/release-test/streaming-sink/',
>   'format' = 'csv',
>   'sink.rolling-policy.rollover-interval' = '30s',
>   'sink.rolling-policy.check-interval' = '5s'
> );
> INSERT INTO s3_streaming_sink SELECT * FROM datagen_source;
> {code}
> 5. Let the job run for at least 2 minutes. Verify in the Web UI that multiple 
> checkpoints complete successfully.
> 6. Cancel the job with a savepoint:
> {code:bash}
> ./bin/flink cancel -s s3://flink-test/savepoints/ <JOB_ID>
> {code}
> 7. Verify output files on S3:
> {code:bash}
> aws s3 ls s3://flink-test/release-test/streaming-sink/ --recursive
> {code}
> 8. Count total rows across all output files and verify no duplicates:
> {code:bash}
> aws s3 cp s3://flink-test/release-test/streaming-sink/ /tmp/s3-output/ 
> --recursive
> cat /tmp/s3-output/**/*.csv | sort -t',' -k1 -n | uniq -d | wc -l
> # Expected: 0 duplicates
> wc -l /tmp/s3-output/**/*.csv
> # Expected: total matches the number of records produced
> {code}
> {*}Expected Results\{*}:
> - Checkpoints complete successfully (visible in the Web UI checkpoint 
> history).
> - Checkpoint metadata is stored under \{{s3://flink-test/checkpoints/}}.
> - Output files under the sink path contain no duplicate rows.
> - No errors related to multipart uploads, RecoverableWriter, or S3 access in 
> the TaskManager logs.
> {*}Cleanup\{*}:
> {code:bash}
> aws s3 rm s3://flink-test/release-test/streaming-sink/ --recursive
> aws s3 rm s3://flink-test/checkpoints/ --recursive
> aws s3 rm s3://flink-test/savepoints/ --recursive
> rm -rf /tmp/s3-output
> ./bin/stop-cluster.sh
> {code}
> ----
> h3. Test 3: Checkpoint and Savepoint on S3 State Backend
> {*}Objective\{*}: Verify that Flink checkpoints and savepoints can be stored 
> on S3 and successfully restored, ensuring state integrity across failures.
> {*}Steps\{*}:
> 1. Configure S3 as the checkpoint and savepoint directory in 
> \{{conf/config.yaml}}:
> {code:yaml}
> execution.checkpointing.interval: 10s
> execution.checkpointing.mode: EXACTLY_ONCE
> state.checkpoints.dir: s3://flink-test/checkpoints/
> state.savepoints.dir: s3://flink-test/savepoints/
> state.backend.type: hashmap
> {code}
> 2. Start a Flink Session cluster:
> {code:bash}
> ./bin/start-cluster.sh
> {code}
> 3. Submit a stateful streaming job (using the built-in StateMachineExample):
> {code:bash}
> ./bin/flink run -d ./examples/streaming/StateMachineExample.jar
> {code}
> 4. Let the job run for at least 1 minute. Confirm in the Web UI that 
> checkpoints are completing.
> 5. Verify checkpoint files exist on S3:
> {code:bash}
> aws s3 ls s3://flink-test/checkpoints/ --recursive | head -20
> {code}
> 6. Trigger a savepoint:
> {code:bash}
> ./bin/flink savepoint <JOB_ID> s3://flink-test/savepoints/
> {code}
> Record the savepoint path from the output.
> 7. Cancel the job:
> {code:bash}
> ./bin/flink cancel <JOB_ID>
> {code}
> 8. Restore from the savepoint:
> {code:bash}
> ./bin/flink run -d -s <SAVEPOINT_PATH> 
> ./examples/streaming/StateMachineExample.jar
> {code}
> 9. Verify the restored job is RUNNING in the Web UI and checkpoints resume 
> successfully.
> {*}Expected Results\{*}:
> - Checkpoint metadata and state files are stored on S3.
> - Savepoint completes and produces files on S3.
> - The job restores from the savepoint without errors.
> - Checkpoints resume after restore.
> {*}Cleanup\{*}:
> {code:bash}
> ./bin/flink cancel <RESTORED_JOB_ID>
> aws s3 rm s3://flink-test/checkpoints/ --recursive
> aws s3 rm s3://flink-test/savepoints/ --recursive
> ./bin/stop-cluster.sh
> {code}
> ----
> h3. Test 4: Failure Recovery with RecoverableWriter (Exactly-Once Under 
> Failures)
> {*}Objective\{*}: Verify that the RecoverableWriter correctly recovers from 
> TaskManager failures, producing exactly-once output with no data loss or 
> duplication.
> {*}Steps\{*}:
> 1. Configure checkpointing in \{{conf/config.yaml}}:
> {code:yaml}
> execution.checkpointing.interval: 10s
> execution.checkpointing.mode: EXACTLY_ONCE
> state.checkpoints.dir: s3://flink-test/checkpoints/
> restart-strategy.type: fixed-delay
> restart-strategy.fixed-delay.attempts: 10
> restart-strategy.fixed-delay.delay: 5s
> {code}
> 2. Start a Flink cluster with 2 TaskManagers:
> {code:bash}
> ./bin/start-cluster.sh
> ./bin/taskmanager.sh start
> {code}
> 3. Submit a streaming job that writes to S3 using the SQL Client:
> {code:sql}
> SET 'execution.runtime-mode' = 'streaming';
> SET 'parallelism.default' = '2';
> CREATE TABLE datagen_source (
>   id INT,
>   payload STRING
> ) WITH (
>   'connector' = 'datagen',
>   'rows-per-second' = '50',
>   'fields.id.kind' = 'sequence',
>   'fields.id.start' = '1',
>   'fields.id.end' = '100000',
>   'fields.payload.length' = '32'
> );
> CREATE TABLE s3_recovery_sink (
>   id INT,
>   payload STRING
> ) WITH (
>   'connector' = 'filesystem',
>   'path' = 's3://flink-test/release-test/recovery-sink/',
>   'format' = 'csv',
>   'sink.rolling-policy.rollover-interval' = '20s',
>   'sink.rolling-policy.check-interval' = '5s'
> );
> INSERT INTO s3_recovery_sink SELECT * FROM datagen_source;
> {code}
> 4. Wait until at least 3 checkpoints complete (verify in Web UI).
> 5. Kill one TaskManager to simulate a failure:
> {code:bash}
> kill -9 $(jps | grep TaskManager | tail -1 | awk '\{print $1}')
> {code}
> 6. Observe in the Web UI that the job restarts from the last checkpoint. 
> Start a replacement TaskManager if needed:
> {code:bash}
> ./bin/taskmanager.sh start
> {code}
> 7. Let the job run for another 2 minutes, then cancel it:
> {code:bash}
> ./bin/flink cancel <JOB_ID>
> {code}
> 8. Verify data integrity:
> {code:bash}
> aws s3 cp s3://flink-test/release-test/recovery-sink/ /tmp/recovery-output/ 
> --recursive
> # Check for duplicates by id
> cut -d',' -f1 /tmp/recovery-output/**/*.csv | sort -n | uniq -d | wc -l
> # Expected: 0 duplicates
> {code}
> {*}Expected Results\{*}:
> - The job recovers from the TaskManager failure automatically.
> - Output files contain no duplicate records (exactly-once guarantee).
> - No orphaned \{{.inprogress}} files remain on S3 after final checkpoint.
> - No errors about multipart upload abort or recovery in the TaskManager logs.
> {*}Cleanup\{*}:
> {code:bash}
> aws s3 rm s3://flink-test/release-test/recovery-sink/ --recursive
> aws s3 rm s3://flink-test/checkpoints/ --recursive
> rm -rf /tmp/recovery-output
> ./bin/stop-cluster.sh
> {code}
> ----
> h3. Test 5: Server-Side Encryption (SSE-S3 and SSE-KMS)
> {*}Objective\{*}: Verify that files written to S3 are encrypted using the 
> configured server-side encryption type.
> {*}Steps\{*}:
> 1. *SSE-S3 (AES256)*: Add to \{{conf/config.yaml}}:
> {code:yaml}
> s3.sse.type: sse-s3
> {code}
> 2. Start the cluster and submit a batch write job:
> {code:bash}
> ./bin/start-cluster.sh
> {code}
> Using SQL Client:
> {code:sql}
> CREATE TABLE s3_sse_test (id INT, name STRING) WITH (
>   'connector' = 'filesystem',
>   'path' = 's3://flink-test/release-test/sse-test/',
>   'format' = 'csv'
> );
> INSERT INTO s3_sse_test VALUES (1, 'encrypted');
> {code}
> 3. Verify the object is encrypted with AES256:
> {code:bash}
> aws s3api head-object --bucket flink-test \
>   --key release-test/sse-test/<output-file-name>
> # Verify: "ServerSideEncryption": "AES256"
> {code}
> 4. Stop the cluster:
> {code:bash}
> ./bin/stop-cluster.sh
> {code}
> 5. *SSE-KMS*: Update \{{conf/config.yaml}}:
> {code:yaml}
> s3.sse.type: sse-kms
> # Optionally specify a KMS key ID; if omitted, the default aws/s3 key is used:
> # s3.sse.kms.key-id: arn:aws:kms:us-east-1:123456789:key/your-key-id
> {code}
> 6. Repeat steps 2-3.
> 7. Verify the object is encrypted with \{{aws:kms}}:
> {code:bash}
> aws s3api head-object --bucket flink-test \
>   --key release-test/sse-test/<output-file-name>
> # Verify: "ServerSideEncryption": "aws:kms"
> {code}
> {*}Expected Results\{*}:
> - With \{{s3.sse.type: sse-s3}}, objects have \{{ServerSideEncryption: 
> AES256}}.
> - With \{{s3.sse.type: sse-kms}}, objects have \{{ServerSideEncryption: 
> aws:kms}}.
> - Data is readable back via Flink (encryption/decryption is transparent).
> {*}Cleanup\{*}:
> {code:bash}
> aws s3 rm s3://flink-test/release-test/sse-test/ --recursive
> ./bin/stop-cluster.sh
> {code}
> ----
> h3. Test 6: Entropy Injection for S3 Partition Sharding
> {*}Objective\{*}: Verify that the \{{EntropyInjectingFileSystem}} correctly 
> replaces the entropy key placeholder with random characters, distributing 
> writes across S3 partitions.
> {*}Steps\{*}:
> 1. Configure entropy injection in \{{conf/config.yaml}}:
> {code:yaml}
> s3.entropy.key: _entropy_
> s3.entropy.length: 6
> state.checkpoints.dir: s3://flink-test/_entropy_/checkpoints/
> {code}
> 2. Start a Flink Session cluster and submit a stateful job:
> {code:bash}
> ./bin/start-cluster.sh
> ./bin/flink run -d ./examples/streaming/StateMachineExample.jar
> {code}
> 3. Let the job run until at least 3 checkpoints complete.
> 4. List checkpoint files on S3:
> {code:bash}
> aws s3 ls s3://flink-test/ --recursive | grep checkpoints | head -20
> {code}
> 5. Verify the checkpoint paths do NOT contain the literal string 
> \{{_entropy_}}. Instead, they contain random 6-character alphanumeric strings 
> in that position (e.g., \{{s3://flink-test/a3xk9m/checkpoints/...}}).
> {*}Expected Results\{*}:
> - Checkpoint files are distributed under random prefixes instead of a single 
> static path.
> - The literal \{{_entropy_}} placeholder does not appear in any S3 key.
> - Each checkpoint may use a different random prefix.
> {*}Cleanup\{*}:
> {code:bash}
> ./bin/flink cancel <JOB_ID>
> aws s3 rm s3://flink-test/ --recursive --exclude "*" --include 
> "*/checkpoints/*"
> ./bin/stop-cluster.sh
> {code}
> ----
> h3. Test 7: Configuration Validation and Edge Cases
> {*}Objective\{*}: Verify that invalid configurations are rejected with clear 
> error messages at startup, not at runtime.
> {*}Steps\{*}:
> 1. *Part size below minimum*: Set \{{s3.upload.min.part.size: 1048576}} (1MB, 
> below the 5MB S3 minimum). Start the cluster and submit a job that writes to 
> S3. Verify the cluster rejects the configuration with an 
> \{{IllegalArgumentException}} mentioning the 5MB minimum.
> 2. *Invalid entropy key characters*: Set \{{s3.entropy.key: _test@key_}}. 
> Start the cluster and verify it fails with a clear error about invalid 
> characters.
> 3. *Zero max concurrent uploads*: Set \{{s3.upload.max.concurrent.uploads: 
> 0}}. Start the cluster and verify it fails with an 
> \{{IllegalArgumentException}}.
> 4. *Read buffer clamping*: Set \{{s3.read.buffer.size: 10485760}} (10MB, 
> above 4MB max). Start the cluster and submit a read job. Verify a WARN log 
> appears stating the buffer was clamped to 4MB, and the read operation 
> succeeds.
> 5. *Max connections validation*: Set \{{s3.connection.max: 0}}. Verify it 
> fails at startup with an \{{IllegalArgumentException}}.
> {*}Expected Results\{*}:
> - Invalid configurations are rejected at filesystem initialization with 
> descriptive error messages.
> - The read buffer size is clamped silently with a warning log rather than 
> failing.
> ----
> h3. Test 8: Credential Provider Chain
> {*}Objective\{*}: Verify the credential resolution order: explicit static 
> keys > environment variables/SDK default chain, and that anonymous access 
> works for public buckets.
> {*}Steps\{*}:
> 1. *Static credentials*: Configure \{{s3.access-key}} and \{{s3.secret-key}} 
> in \{{conf/config.yaml}}. Start the cluster, submit a write/read job. Verify 
> it succeeds.
> 2. *SDK default chain*: Remove \{{s3.access-key}} and \{{s3.secret-key}} from 
> config. Ensure credentials are available via environment variables 
> (\{{AWS_ACCESS_KEY_ID}}/\{{AWS_SECRET_ACCESS_KEY}}) or 
> \{{~/.aws/credentials}}. Restart the cluster, submit a write/read job. Verify 
> it succeeds.
> 3. *No credentials*: Remove all credential sources. Start the cluster and 
> attempt a write. Verify the job fails with a credentials-related error (not a 
> generic 403 without explanation).
> {*}Expected Results\{*}:
> - Static keys take priority over environment credentials when both are 
> present.
> - The SDK default chain (env vars, config file, instance profile) works when 
> static keys are not set.
> - Missing credentials produce a clear, actionable error message.
> ----
> h3. Test 9: Multipart Upload Boundary — The "Tail" Problem
> {*}Objective\{*}: Verify the RecoverableWriter correctly handles the case 
> where a checkpoint occurs before 5MB of data has accumulated (the "tail" 
> problem), ensuring no data loss.
> {*}Steps\{*}:
> 1. Configure aggressive checkpointing and small part size in 
> \{{conf/config.yaml}}:
> {code:yaml}
> execution.checkpointing.interval: 5s
> execution.checkpointing.mode: EXACTLY_ONCE
> state.checkpoints.dir: s3://flink-test/checkpoints/
> s3.upload.min.part.size: 5242880
> {code}
> 2. Start a cluster and submit a slow-producing streaming job to ensure 
> checkpoints trigger before 5MB accumulates:
> {code:sql}
> SET 'execution.runtime-mode' = 'streaming';
> CREATE TABLE slow_source (id INT, data STRING) WITH (
>   'connector' = 'datagen',
>   'rows-per-second' = '5',
>   'fields.id.kind' = 'sequence',
>   'fields.id.start' = '1',
>   'fields.id.end' = '50000',
>   'fields.data.length' = '100'
> );
> CREATE TABLE s3_tail_sink (id INT, data STRING) WITH (
>   'connector' = 'filesystem',
>   'path' = 's3://flink-test/release-test/tail-test/',
>   'format' = 'csv',
>   'sink.rolling-policy.rollover-interval' = '60s',
>   'sink.rolling-policy.check-interval' = '5s'
> );
> INSERT INTO s3_tail_sink SELECT * FROM slow_source;
> {code}
> 3. Let the job run for 2 minutes so multiple checkpoints complete with 
> sub-5MB data.
> 4. Kill a TaskManager to force recovery:
> {code:bash}
> kill -9 $(jps | grep TaskManager | head -1 | awk '\{print $1}')
> ./bin/taskmanager.sh start
> {code}
> 5. Let the job run for another minute, then cancel it.
> 6. Verify data integrity — no data loss:
> {code:bash}
> aws s3 cp s3://flink-test/release-test/tail-test/ /tmp/tail-output/ 
> --recursive
> wc -l /tmp/tail-output/**/*.csv
> # Rows should be present — no empty or missing files
> {code}
> {*}Expected Results\{*}:
> - Checkpoints succeed even with sub-5MB buffered data.
> - Recovery restores the writer state correctly, including the incomplete 
> "tail" part.
> - No data loss after recovery.
> {*}Cleanup\{*}:
> {code:bash}
> aws s3 rm s3://flink-test/release-test/tail-test/ --recursive
> aws s3 rm s3://flink-test/checkpoints/ --recursive
> rm -rf /tmp/tail-output
> ./bin/stop-cluster.sh
> {code}
> ----
> h3. Test 10: Bulk Copy (PathsCopyingFileSystem) for State Recovery
> {*}Objective\{*}: Verify that the \{{PathsCopyingFileSystem}} implementation 
> using \{{S3TransferManager}} correctly downloads state files during 
> TaskManager recovery, and that the \{{s3.bulk-copy.enabled}} flag controls 
> the behavior.
> {*}Steps\{*}:
> 1. Configure with RocksDB state backend (which triggers state download on 
> recovery) in \{{conf/config.yaml}}:
> {code:yaml}
> execution.checkpointing.interval: 10s
> state.backend.type: rocksdb
> state.checkpoints.dir: s3://flink-test/checkpoints/
> s3.bulk-copy.enabled: true
> s3.bulk-copy.max-concurrent: 4
> {code}
> 2. Start a cluster:
> {code:bash}
> ./bin/start-cluster.sh
> ./bin/taskmanager.sh start
> {code}
> 3. Submit a stateful job with keyed state:
> {code:bash}
> ./bin/flink run -d ./examples/streaming/StateMachineExample.jar
> {code}
> 4. Let the job run until at least 5 checkpoints complete.
> 5. Kill one TaskManager:
> {code:bash}
> kill -9 $(jps | grep TaskManager | tail -1 | awk '\{print $1}')
> {code}
> 6. Start a replacement TaskManager:
> {code:bash}
> ./bin/taskmanager.sh start
> {code}
> 7. Verify recovery in the Web UI: the job restores from the checkpoint and 
> resumes. Check TaskManager logs for bulk copy activity — look for 
> \{{S3TransferManager}} or download-related log lines.
> 8. *Verify disabled bulk copy*: Repeat the test with \{{s3.bulk-copy.enabled: 
> false}} and verify recovery still works (falls back to standard sequential 
> download).
> {*}Expected Results\{*}:
> - With bulk copy enabled: state files are downloaded using parallel 
> \{{S3TransferManager}} operations. Recovery completes successfully.
> - With bulk copy disabled: recovery still works via standard filesystem reads.
> - No \{{OutOfMemoryError}} or connection pool exhaustion errors during 
> recovery.
> {*}Cleanup\{*}:
> {code:bash}
> ./bin/flink cancel <JOB_ID>
> aws s3 rm s3://flink-test/checkpoints/ --recursive
> ./bin/stop-cluster.sh
> {code}
> ----
> h3. Test 11: Plugin Isolation — No Classpath Conflicts
> {*}Objective\{*}: Verify that the native S3 plugin does not leak dependencies 
> into the user classpath, preventing the \{{NoSuchMethodError}} and 
> \{{ClassNotFoundException}} issues common with the legacy Hadoop-based 
> connector.
> {*}Steps\{*}:
> 1. Create a simple Flink job that explicitly uses a version of a library also 
> bundled inside the native S3 plugin (e.g., Netty, Jackson). For example, a 
> job that calls \{{io.netty.util.Version.identify()}} or uses 
> \{{com.fasterxml.jackson.databind.ObjectMapper}} directly.
> 2. Submit the job to a cluster with the native S3 plugin installed.
> 3. Verify the job uses its *own* version of the library, not the shaded 
> version inside the plugin.
> {*}Expected Results\{*}:
> - No \{{ClassNotFoundException}}, \{{NoSuchMethodError}}, or version mismatch 
> errors.
> - The user application's library version is not affected by the plugin's 
> bundled dependencies.
> ----
> h3. Test 12: MinIO Compatibility (S3-Compatible Object Store)
> {*}Objective\{*}: Verify end-to-end functionality against MinIO, confirming 
> that the \{{s3.endpoint}} and \{{s3.path-style-access}} configurations work 
> correctly for S3-compatible stores.
> {*}Steps\{*}:
> 1. Start MinIO (see Prerequisites section).
> 2. Configure Flink in \{{conf/config.yaml}}:
> {code:yaml}
> s3.endpoint: http://localhost:9000
> s3.access-key: minioadmin
> s3.secret-key: minioadmin
> s3.path-style-access: true
> execution.checkpointing.interval: 10s
> state.checkpoints.dir: s3://flink-test/checkpoints/
> {code}
> 3. Start the cluster:
> {code:bash}
> ./bin/start-cluster.sh
> {code}
> 4. Submit a stateful streaming job:
> {code:bash}
> ./bin/flink run -d ./examples/streaming/StateMachineExample.jar
> {code}
> 5. Verify checkpoints complete in the Web UI.
> 6. Trigger and verify a savepoint:
> {code:bash}
> ./bin/flink savepoint <JOB_ID> s3://flink-test/savepoints/
> {code}
> 7. Cancel and restore from savepoint:
> {code:bash}
> ./bin/flink cancel <JOB_ID>
> ./bin/flink run -d -s <SAVEPOINT_PATH> 
> ./examples/streaming/StateMachineExample.jar
> {code}
> 8. Verify the restored job is running and checkpoints resume.
> {*}Expected Results\{*}:
> - All operations work against MinIO without modifications.
> - \{{s3.path-style-access}} is auto-enabled when \{{s3.endpoint}} is set 
> (verify by checking that it works even without explicitly setting 
> \{{s3.path-style-access: true}}).
> - Chunked encoding and checksum validation are automatically disabled for 
> custom endpoints.
> {*}Cleanup\{*}:
> {code:bash}
> ./bin/flink cancel <JOB_ID>
> aws --endpoint-url http://localhost:9000 s3 rm s3://flink-test/ --recursive
> ./bin/stop-cluster.sh
> docker stop minio && docker rm minio
> {code}
> ----
> h2. Breaking Change Considerations
> - *URI scheme conflict*: If both \{{flink-s3-fs-native}} and 
> \{{flink-s3-fs-hadoop}} are in the \{{plugins/}} directory, only one will 
> handle \{{s3://}}. The native plugin has priority \{{-1}} (lowest), so Hadoop 
> takes precedence. Users must remove the legacy plugin to use the native one. 
> This should be documented clearly.
> - *No \{{s3n://}} support*: The native plugin only registers \{{s3://}} and 
> \{{s3a://}}. Jobs using \{{s3n://}} paths must migrate.
> - *Configuration namespace change*: Legacy Hadoop-specific keys (e.g., 
> \{{fs.s3a.*}}) are not recognized by the native plugin. Users must migrate to 
> the \{{s3.*}} namespace.
> - *Directory rename*: \{{NativeS3FileSystem.rename()}} throws 
> \{{UnsupportedOperationException}} for directories. Any user code relying on 
> directory rename will fail. This matches S3 semantics but differs from the 
> Hadoop connector which emulated directory rename via copy+delete.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to