davidradl commented on code in PR #27937:
URL: https://github.com/apache/flink/pull/27937#discussion_r3094193585
##########
docs/content/docs/deployment/filesystems/s3.md:
##########
@@ -64,94 +64,288 @@ env.configure(config);
Note that these examples are *not* exhaustive and you can use S3 in other
places as well, including your [high availability setup]({{< ref
"docs/deployment/ha/overview" >}}) or the [EmbeddedRocksDBStateBackend]({{< ref
"docs/ops/state/state_backends" >}}#the-rocksdbstatebackend); everywhere that
Flink expects a FileSystem URI (unless otherwise stated).
-For most use cases, you may use one of our `flink-s3-fs-hadoop` and
`flink-s3-fs-presto` S3 filesystem plugins which are self-contained and easy to
set up.
-For some cases, however, e.g., for using S3 as YARN's resource storage dir, it
may be necessary to set up a specific Hadoop S3 filesystem implementation.
+## S3 FileSystem Implementations
-### Hadoop/Presto S3 File Systems plugins
+Flink provides three independent S3 filesystem implementations, each with
different trade-offs:
+
+- **Native S3 FileSystem** (`flink-s3-fs-native`): Built directly on AWS SDK
v2 with async I/O and parallel transfers, removing the dependency from Hadoop
entirely. Supports both checkpointing and the FileSink in a single plugin,
removing the need to choose between Presto (checkpointing) and Hadoop
(FileSink).
[Benchmarks](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406620396)
show ~2x higher checkpoint throughput (~200 MB/s vs ~90 MB/s) compared to the
Presto implementation at state sizes up to 15 GB. **Experimental** in Flink 2.3.
+- **Presto S3 FileSystem** (`flink-s3-fs-presto`): Based on Presto project
code. The proven choice for checkpointing in production.
+- **Hadoop S3 FileSystem** (`flink-s3-fs-hadoop`): Based on Hadoop project
code. Supports both checkpointing and the FileSink.
+
+All three are self-contained with no dependency footprint, so there is no need
to add Hadoop to the classpath to use them.
+
+## Common Configuration
+
+### Configure Access Credentials
+
+After setting up the S3 FileSystem implementation, you need to make sure that
Flink is allowed to access your S3 buckets.
+
+#### Identity and Access Management (IAM) (Recommended)
+
+The recommended way of setting up credentials on AWS is via [Identity and
Access Management
(IAM)](http://docs.aws.amazon.com/IAM/latest/UserGuide/introduction.html). You
can use IAM features to securely give Flink instances the credentials that they
need to access S3 buckets. Details about how to do this are beyond the scope of
this documentation. Please refer to the AWS user guide. What you are looking
for are [IAM
Roles](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html).
+
+If you set this up correctly, you can manage access to S3 within AWS and don't
need to distribute any access keys to Flink.
+
+#### Delegation Tokens
+
+[Delegation tokens]({{< ref
"docs/deployment/security/security-delegation-token" >}}) provide time-bounded,
automatically negotiated credentials. The JobManager uses long-lived
credentials (access key and secret key) to call [AWS
STS](https://docs.aws.amazon.com/STS/latest/APIReference/welcome.html) and
obtain short-lived session tokens, which are then automatically distributed to
TaskManagers.
+
+Each S3 implementation has its own delegation token provider with a dedicated
configuration prefix. You must set the `access-key`, `secret-key`, and `region`
under the corresponding prefix for the implementation you are using:
+
+```yaml
+# For Native S3 implementation
+security.delegation.token.provider.s3-native.access-key: your-access-key
+security.delegation.token.provider.s3-native.secret-key: your-secret-key
+security.delegation.token.provider.s3-native.region: us-east-1
+
+# For Hadoop implementation
+security.delegation.token.provider.s3-hadoop.access-key: your-access-key
+security.delegation.token.provider.s3-hadoop.secret-key: your-secret-key
+security.delegation.token.provider.s3-hadoop.region: us-east-1
+
+# For Presto implementation
+security.delegation.token.provider.s3-presto.access-key: your-access-key
+security.delegation.token.provider.s3-presto.secret-key: your-secret-key
+security.delegation.token.provider.s3-presto.region: us-east-1
+```
+
+All three values (`access-key`, `secret-key`, `region`) must be set for
delegation tokens to be issued. The `DynamicTemporaryAWSCredentialsProvider` is
automatically included in the credentials provider chain for each
implementation, so TaskManagers will consume the distributed tokens without
additional configuration.
+
+#### Access Keys
+
+Access to S3 can be granted via your **access and secret key pair**. While
access keys are not inherently insecure, IAM roles are preferred as they avoid
the need to manage and distribute static credentials. See the [introduction of
IAM
roles](https://blogs.aws.amazon.com/security/post/Tx1XG3FX6VMU6O5/A-safer-way-to-distribute-AWS-credentials-to-EC2)
for more context.
+
+You need to configure both `s3.access-key` and `s3.secret-key` in Flink's
[configuration file]({{< ref "docs/deployment/config#flink-configuration-file"
>}}):
+
+```yaml
+s3.access-key: your-access-key
+s3.secret-key: your-secret-key
+```
+
+### Configure Non-S3 Endpoint
+
+The S3 filesystems also support using S3 compliant object stores.
+To do so, configure your endpoint in [Flink configuration file]({{< ref
"docs/deployment/config#flink-configuration-file" >}}):
+
+```yaml
+s3.endpoint: your-endpoint-hostname
+```
+
+### Configure Path Style Access
+
+Some S3 compliant object stores might not have virtual host style addressing
enabled by default. In such cases, you will have to provide the property to
enable path style access in [Flink configuration file]({{< ref
"docs/deployment/config#flink-configuration-file" >}}):
+
+```yaml
+s3.path-style-access: true
+```
{{< hint info >}}
-You don't have to configure this manually if you are running [Flink on
EMR](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html).
+The legacy configuration key `s3.path.style.access` is still supported as a
fallback for backward compatibility.
{{< /hint >}}
-Flink provides two file systems to talk to Amazon S3, `flink-s3-fs-presto` and
`flink-s3-fs-hadoop`.
-Both implementations are self-contained with no dependency footprint, so there
is no need to add Hadoop to the classpath to use them.
+## Implementation Details
- - `flink-s3-fs-presto`, registered under the scheme *s3://* and *s3p://*, is
based on code from the [Presto project](https://prestodb.io/).
- You can configure it using [the same configuration keys as the Presto file
system](https://prestodb.io/docs/0.272/connector/hive.html#amazon-s3-configuration),
by adding the configurations to your [Flink configuration file]({{< ref
"docs/deployment/config#flink-configuration-file" >}}). The Presto S3
implementation is the recommended file system for checkpointing to S3.
+### Native S3 FileSystem (Experimental)
- - `flink-s3-fs-hadoop`, registered under *s3://* and *s3a://*, based on code
from the [Hadoop Project](https://hadoop.apache.org/).
- The file system can be [configured using Hadoop's s3a configuration
keys](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A)
by adding the configurations to your [Flink configuration file]({{< ref
"docs/deployment/config#flink-configuration-file" >}}).
-
- For example, Hadoop has a `fs.s3a.connection.maximum` configuration key.
If you want to change it, you need to put `s3.connection.maximum: xyz` to the
[Flink configuration file]({{< ref
"docs/deployment/config#flink-configuration-file" >}}). Flink will internally
translate this back to `fs.s3a.connection.maximum`. There is no need to pass
configuration parameters using Hadoop's XML configuration files.
-
- It is the only S3 file system with support for the [FileSystem]({{< ref
"docs/connectors/datastream/filesystem" >}}).
-
+{{< hint warning >}}
+**Experimental**: The Native S3 FileSystem is experimental in Flink 2.3. It is
functionally complete and has demonstrated strong performance in benchmarks.
+{{< /hint >}}
-Both `flink-s3-fs-hadoop` and `flink-s3-fs-presto` register default FileSystem
-wrappers for URIs with the *s3://* scheme, `flink-s3-fs-hadoop` also registers
-for *s3a://* and `flink-s3-fs-presto` also registers for *s3p://*, so you can
-use this to use both at the same time.
-For example, the job uses the [FileSystem]({{< ref
"docs/connectors/datastream/filesystem" >}}) which only supports Hadoop, but
uses Presto for checkpointing.
-In this case, you should explicitly use *s3a://* as a scheme for the sink
(Hadoop) and *s3p://* for checkpointing (Presto).
+The Native S3 FileSystem is a pure-Java implementation built on the AWS SDK v2
completely removing the dependencies from hadoop. It is registered under the
schemes *s3://* and *s3a://*. It requires no additional dependencies and
provides a drop-in replacement for the Presto and Hadoop implementations.
-To use `flink-s3-fs-hadoop` or `flink-s3-fs-presto`, copy the respective JAR
file from the `opt` directory to the `plugins` directory of your Flink
distribution before starting Flink, e.g.
+#### Setup
+
+To use the Native S3 FileSystem, copy the JAR file from the `opt` directory to
the `plugins` directory:
```bash
-mkdir ./plugins/s3-fs-presto
-cp ./opt/flink-s3-fs-presto-{{< version >}}.jar ./plugins/s3-fs-presto/
+mkdir -p ./plugins/s3-fs-native
+cp ./opt/flink-s3-fs-native-{{< version >}}.jar ./plugins/s3-fs-native/
```
-#### Configure Access Credentials
+#### Features
-After setting up the S3 FileSystem wrapper, you need to make sure that Flink
is allowed to access your S3 buckets.
+- **No external dependencies**: Built on AWS SDK v2 with minimal footprint
+- **Drop-in replacement**: Compatible with the same S3 URI schemes (`s3://`,
`s3a://`)
+- **FileSystem sink support**: Supports the [FileSystem sink]({{< ref
"docs/connectors/datastream/filesystem" >}}) via `RecoverableWriter`
+- **Encryption support**: Server-side encryption (SSE-S3, SSE-KMS)
+- **Assume role**: Cross-account access via IAM role assumption
+- **Entropy injection**: Optimize S3 scalability through random key prefixes
+- **Bulk copy**: Efficient multi-part copy operations via S3TransferManager
-##### Identity and Access Management (IAM) (Recommended)
+#### Configuration
-The recommended way of setting up credentials on AWS is via [Identity and
Access Management
(IAM)](http://docs.aws.amazon.com/IAM/latest/UserGuide/introduction.html). You
can use IAM features to securely give Flink instances the credentials that they
need to access S3 buckets. Details about how to do this are beyond the scope of
this documentation. Please refer to the AWS user guide. What you are looking
for are [IAM
Roles](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html).
+The Native S3 FileSystem uses the following configuration options:
-If you set this up correctly, you can manage access to S3 within AWS and don't
need to distribute any access keys to Flink.
+```yaml
+# AWS credentials (if using static credentials)
+s3.access-key: your-access-key
+s3.secret-key: your-secret-key
-##### Access Keys (Discouraged)
+# AWS region (optional; auto-detected if not specified)
+s3.region: us-east-1
-Access to S3 can be granted via your **access and secret key pair**. Please
note that this is discouraged since the [introduction of IAM
roles](https://blogs.aws.amazon.com/security/post/Tx1XG3FX6VMU6O5/A-safer-way-to-distribute-AWS-credentials-to-EC2).
+# Custom S3 endpoint for S3-compatible storage
+s3.endpoint: your-endpoint-hostname
+
+# Path style access for S3-compatible storage
+s3.path-style-access: true
+
+# Server-side encryption
+s3.sse.type: sse-s3 # or sse-kms, aws:kms, AES256, none (default)
+s3.sse.kms.key-id: arn:aws:kms:region:account:key/id # For SSE-KMS
+
+# IAM role assumption for cross-account access
+s3.assume-role.arn: arn:aws:iam::account:role/RoleName
+s3.assume-role.external-id: external-id-if-required
+s3.assume-role.session-name: flink-s3-session
+s3.assume-role.session-duration: 3600
+
+# Performance tuning
+s3.upload.min.part.size: 5242880 # 5MB default
+s3.upload.max.concurrent.uploads: 4 # Based on CPU cores
+s3.read.buffer.size: 262144 # 256KB default
+s3.async.enabled: true # Enable async operations
+s3.bulk-copy.enabled: true # Enable bulk copy
+s3.bulk-copy.max-concurrent: 16 # Max concurrent copy ops
+
+# Entropy injection for scalability
+s3.entropy.key: _entropy_
+s3.entropy.length: 4
+
+# Retry configuration
+s3.retry.max-num-retries: 3
+
+# Credentials provider (optional; see note below)
+# fs.s3.aws.credentials.provider:
software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider
+```
+
+When `fs.s3.aws.credentials.provider` is not set, the Native S3 FileSystem
automatically builds a credentials chain in the following order: delegation
tokens, static credentials (if `s3.access-key` and `s3.secret-key` are
configured), and the AWS SDK v2 `DefaultCredentialsProvider` (environment
variables, instance profiles, etc.). You only need to set this option if you
require a custom provider chain.
+
+See the [AWS SDK v2 documentation](https://docs.aws.amazon.com/sdk-for-java/)
for additional configuration details.
+
+---
-You need to configure both `s3.access-key` and `s3.secret-key` in Flink's
[Flink configuration file]({{< ref
"docs/deployment/config#flink-configuration-file" >}}):
+### Presto S3 FileSystem
+
+{{< hint info >}}
+You don't have to configure this manually if you are running [Flink on
EMR](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html).
+{{< /hint >}}
+
+The Presto S3 FileSystem is based on code from the [Presto
project](https://prestodb.io/). It is registered under the schemes *s3://* and
*s3p://*.
+
+#### Features
+
+- **Recommended for checkpointing**: The Presto implementation is the
recommended file system for checkpointing to S3
+- **Self-contained**: No Hadoop dependency required
+- **Production-ready**: Stable and widely used
+
+#### Setup
+
+To use the Presto S3 FileSystem, copy the JAR file from the `opt` directory to
the `plugins` directory:
+
+```bash
+mkdir -p ./plugins/s3-fs-presto
+cp ./opt/flink-s3-fs-presto-{{< version >}}.jar ./plugins/s3-fs-presto/
+```
+
+#### Configuration
+
+Configure it using [the same configuration keys as the Presto file
system](https://prestodb.io/docs/0.272/connector/hive.html#amazon-s3-configuration),
by adding the configurations to your [Flink configuration file]({{< ref
"docs/deployment/config#flink-configuration-file" >}}):
```yaml
+# AWS credentials
s3.access-key: your-access-key
s3.secret-key: your-secret-key
-```
-You can limit this configuration to JobManagers by using [Flink configuration
file]({{< ref "docs/deployment/security/security-delegation-token" >}}).
+# Custom endpoint
+s3.endpoint: your-endpoint-hostname
-```yaml
-# flink-s3-fs-hadoop
-fs.s3a.aws.credentials.provider:
org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider
-# flink-s3-fs-presto
+# Path style access
+s3.path-style-access: true
+
+# Credentials provider
presto.s3.credentials-provider:
org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider
```
-## Configure Non-S3 Endpoint
+Refer to the [Presto
documentation](https://prestodb.io/docs/0.272/connector/hive.html#amazon-s3-configuration)
for all available configuration options.
Review Comment:
isnt this a a duplicate of line 253. I suggest removing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]