Samrat002 commented on code in PR #27841:
URL: https://github.com/apache/flink/pull/27841#discussion_r3031365503


##########
docs/content/docs/deployment/filesystems/s3.md:
##########
@@ -64,94 +64,267 @@ env.configure(config);
 
 Note that these examples are *not* exhaustive and you can use S3 in other 
places as well, including your [high availability setup]({{< ref 
"docs/deployment/ha/overview" >}}) or the [EmbeddedRocksDBStateBackend]({{< ref 
"docs/ops/state/state_backends" >}}#the-rocksdbstatebackend); everywhere that 
Flink expects a FileSystem URI (unless otherwise stated).
 
-For most use cases, you may use one of our `flink-s3-fs-hadoop` and 
`flink-s3-fs-presto` S3 filesystem plugins which are self-contained and easy to 
set up.
-For some cases, however, e.g., for using S3 as YARN's resource storage dir, it 
may be necessary to set up a specific Hadoop S3 filesystem implementation.
+## S3 FileSystem Implementations
 
-### Hadoop/Presto S3 File Systems plugins
+Flink provides three independent S3 filesystem implementations, each with 
different trade-offs:
 
-{{< hint info >}}
-You don't have to configure this manually if you are running [Flink on 
EMR](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html).
-{{< /hint >}}
-
-Flink provides two file systems to talk to Amazon S3, `flink-s3-fs-presto` and 
`flink-s3-fs-hadoop`.
-Both implementations are self-contained with no dependency footprint, so there 
is no need to add Hadoop to the classpath to use them.
+- **Native S3 FileSystem** (`flink-s3-fs-native`): Built directly on AWS SDK 
v2 with async I/O and parallel transfers, this implementation supports both 
checkpointing and the FileSystem sink. 
[Benchmarks](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406620396)
 show ~2x higher checkpoint throughput (~200 MB/s vs ~90 MB/s) compared to the 
Presto implementation at state sizes up to 15 GB. **Experimental** in Flink 
2.3; the API and behavior may change in future releases.
+- **Presto S3 FileSystem** (`flink-s3-fs-presto`): Based on Presto project 
code, recommended for checkpointing.
+- **Hadoop S3 FileSystem** (`flink-s3-fs-hadoop`): Based on Hadoop project 
code, has FileSystem sink support.
 
-  - `flink-s3-fs-presto`, registered under the scheme *s3://* and *s3p://*, is 
based on code from the [Presto project](https://prestodb.io/).
-  You can configure it using [the same configuration keys as the Presto file 
system](https://prestodb.io/docs/0.272/connector/hive.html#amazon-s3-configuration),
 by adding the configurations to your [Flink configuration file]({{< ref 
"docs/deployment/config#flink-configuration-file" >}}). The Presto S3 
implementation is the recommended file system for checkpointing to S3.
+All three are self-contained with no dependency footprint, so there is no need 
to add Hadoop to the classpath to use them.
 
-  - `flink-s3-fs-hadoop`, registered under *s3://* and *s3a://*, based on code 
from the [Hadoop Project](https://hadoop.apache.org/).
-  The file system can be [configured using Hadoop's s3a configuration 
keys](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A)
 by adding the configurations to your [Flink configuration file]({{< ref 
"docs/deployment/config#flink-configuration-file" >}}). 
-  
-     For example, Hadoop has a `fs.s3a.connection.maximum` configuration key. 
If you want to change it, you need to put `s3.connection.maximum: xyz` to the 
[Flink configuration file]({{< ref 
"docs/deployment/config#flink-configuration-file" >}}). Flink will internally 
translate this back to `fs.s3a.connection.maximum`. There is no need to pass 
configuration parameters using Hadoop's XML configuration files.
-  
-    It is the only S3 file system with support for the [FileSystem]({{< ref 
"docs/connectors/datastream/filesystem" >}}).
-  
+## Common Configuration
 
-Both `flink-s3-fs-hadoop` and `flink-s3-fs-presto` register default FileSystem
-wrappers for URIs with the *s3://* scheme, `flink-s3-fs-hadoop` also registers
-for *s3a://* and `flink-s3-fs-presto` also registers for *s3p://*, so you can
-use this to use both at the same time.
-For example, the job uses the [FileSystem]({{< ref 
"docs/connectors/datastream/filesystem" >}}) which only supports Hadoop, but 
uses Presto for checkpointing.
-In this case, you should explicitly use *s3a://* as a scheme for the sink 
(Hadoop) and *s3p://* for checkpointing (Presto).
+### Configure Access Credentials
 
-To use `flink-s3-fs-hadoop` or `flink-s3-fs-presto`, copy the respective JAR 
file from the `opt` directory to the `plugins` directory of your Flink 
distribution before starting Flink, e.g.
-
-```bash
-mkdir ./plugins/s3-fs-presto
-cp ./opt/flink-s3-fs-presto-{{< version >}}.jar ./plugins/s3-fs-presto/
-```
+After setting up the S3 FileSystem implementation, you need to make sure that 
Flink is allowed to access your S3 buckets.
 
-#### Configure Access Credentials
-
-After setting up the S3 FileSystem wrapper, you need to make sure that Flink 
is allowed to access your S3 buckets.
-
-##### Identity and Access Management (IAM) (Recommended)
+#### Identity and Access Management (IAM) (Recommended)
 
 The recommended way of setting up credentials on AWS is via [Identity and 
Access Management 
(IAM)](http://docs.aws.amazon.com/IAM/latest/UserGuide/introduction.html). You 
can use IAM features to securely give Flink instances the credentials that they 
need to access S3 buckets. Details about how to do this are beyond the scope of 
this documentation. Please refer to the AWS user guide. What you are looking 
for are [IAM 
Roles](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html).
 
 If you set this up correctly, you can manage access to S3 within AWS and don't 
need to distribute any access keys to Flink.
 
-##### Access Keys (Discouraged)
+#### Access Keys (Discouraged)
 
 Access to S3 can be granted via your **access and secret key pair**. Please 
note that this is discouraged since the [introduction of IAM 
roles](https://blogs.aws.amazon.com/security/post/Tx1XG3FX6VMU6O5/A-safer-way-to-distribute-AWS-credentials-to-EC2).
 
-You need to configure both `s3.access-key` and `s3.secret-key`  in Flink's  
[Flink configuration file]({{< ref 
"docs/deployment/config#flink-configuration-file" >}}):
+You need to configure both `s3.access-key` and `s3.secret-key` in Flink's 
[configuration file]({{< ref "docs/deployment/config#flink-configuration-file" 
>}}):
 
 ```yaml
 s3.access-key: your-access-key
 s3.secret-key: your-secret-key
 ```
 
-You can limit this configuration to JobManagers by using [Flink configuration 
file]({{< ref "docs/deployment/security/security-delegation-token" >}}).
+You can limit this configuration to JobManagers by using [delegation 
tokens]({{< ref "docs/deployment/security/security-delegation-token" >}}):
 
 ```yaml
-# flink-s3-fs-hadoop
-fs.s3a.aws.credentials.provider: 
org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider
-# flink-s3-fs-presto
+# For Native S3 or Hadoop implementations
+fs.s3.aws.credentials.provider: 
org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider
+# For Presto implementation
 presto.s3.credentials-provider: 
org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider
 ```
 
-## Configure Non-S3 Endpoint
+### Configure Non-S3 Endpoint
 
-The S3 Filesystems also support using S3 compliant object stores such as 
[IBM's Cloud Object Storage](https://www.ibm.com/cloud/object-storage) and 
[MinIO](https://min.io/).
-To do so, configure your endpoint in [Flink configuration file]({{< ref 
"docs/deployment/config#flink-configuration-file" >}}).
+The S3 filesystems also support using S3 compliant object stores.
+To do so, configure your endpoint in [Flink configuration file]({{< ref 
"docs/deployment/config#flink-configuration-file" >}}):
 
 ```yaml
 s3.endpoint: your-endpoint-hostname
 ```
 
-## Configure Path Style Access
+### Configure Path Style Access
 
-Some S3 compliant object stores might not have virtual host style addressing 
enabled by default, for example when using Standalone MinIO for testing 
purpose. In such cases, you will have to provide the property to enable path 
style access in [Flink configuration file]({{< ref 
"docs/deployment/config#flink-configuration-file" >}}).
+Some S3 compliant object stores might not have virtual host style addressing 
enabled by default. In such cases, you will have to provide the property to 
enable path style access in [Flink configuration file]({{< ref 
"docs/deployment/config#flink-configuration-file" >}}):
 
 ```yaml
-s3.path.style.access: true
+s3.path-style-access: true
+```
+
+## S3 FileSystem Implementations
+
+### Native S3 FileSystem
+
+{{< hint warning >}}
+**Experimental**: The Native S3 FileSystem implementation is experimental in 
Flink 2.3. While functionally complete, it should not yet be used in production 
environments. Please use Presto or Hadoop implementations for production 
deployments.
+{{< /hint >}}
+
+The Native S3 FileSystem is a pure-Java implementation built on the AWS SDK 
v2. It is registered under the schemes *s3://* and *s3a://*. It requires no 
additional dependencies and provides a drop-in replacement for the Presto and 
Hadoop implementations.
+
+#### Setup
+
+To use the Native S3 FileSystem, copy the JAR file from the `opt` directory to 
the `plugins` directory:
+
+```bash
+mkdir -p ./plugins/s3-fs-native
+cp ./opt/flink-s3-fs-native-{{< version >}}.jar ./plugins/s3-fs-native/
 ```
 
-## Entropy injection for S3 file systems
+#### Features
+
+- **No external dependencies**: Built on AWS SDK v2 with minimal footprint
+- **Drop-in replacement**: Compatible with the same S3 URI schemes (`s3://`)
+- **Encryption support**: Server-side encryption (SSE) and KMS encryption
+- **Assume role**: Cross-account access via IAM role assumption
+- **Entropy injection**: Optimize S3 scalability through random key prefixes
+- **Bulk copy**: Efficient multi-part copy operations via S3TransferManager
+
+#### Configuration
 
-The bundled S3 file systems (`flink-s3-fs-presto` and `flink-s3-fs-hadoop`) 
support entropy injection. Entropy injection is
-a technique to improve the scalability of AWS S3 buckets through adding some 
random characters near the beginning of the key.
+The Native S3 FileSystem uses the following configuration options:
+
+```yaml
+# AWS credentials (if using static credentials)
+s3.access-key: your-access-key
+s3.secret-key: your-secret-key
+
+# AWS region (optional; auto-detected if not specified)
+s3.region: us-east-1
+
+# Custom S3 endpoint for S3-compatible storage
+s3.endpoint: your-endpoint-hostname
+
+# Path style access for S3-compatible storage
+s3.path-style-access: true
+
+# Server-side encryption
+s3.sse.type: sse-s3         # or sse-kms, aws:kms, AES256, none (default)
+s3.sse.kms.key-id: arn:aws:kms:region:account:key/id   # For SSE-KMS
+
+# IAM role assumption for cross-account access
+s3.assume-role.arn: arn:aws:iam::account:role/RoleName
+s3.assume-role.external-id: external-id-if-required
+s3.assume-role.session-name: flink-s3-session
+s3.assume-role.session-duration: 3600
+
+# Performance tuning
+s3.upload.min.part.size: 5242880        # 5MB default
+s3.upload.max.concurrent.uploads: 4     # Based on CPU cores
+s3.read.buffer.size: 262144             # 256KB default
+s3.async.enabled: true                  # Enable async operations
+s3.bulk-copy.enabled: true              # Enable bulk copy
+s3.bulk-copy.max-concurrent: 16         # Max concurrent copy ops
+
+# Entropy injection for scalability
+s3.entropy.key: _entropy_
+s3.entropy.length: 4
+
+# Retry configuration
+s3.retry.max-num-retries: 3
+
+# Credentials provider
+fs.s3.aws.credentials.provider: 
software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
+```
+
+See the [AWS SDK v2 documentation](https://docs.aws.amazon.com/sdk-for-java/) 
for additional configuration details.
+
+---
+
+### Presto S3 FileSystem
+
+{{< hint info >}}
+You don't have to configure this manually if you are running [Flink on 
EMR](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html).
+{{< /hint >}}
+
+The Presto S3 FileSystem is based on code from the [Presto 
project](https://prestodb.io/). It is registered under the schemes *s3://* and 
*s3p://*.
+
+#### Features
+
+- **Recommended for checkpointing**: The Presto implementation is the 
recommended file system for checkpointing to S3
+- **Self-contained**: No Hadoop dependency required
+- **Production-ready**: Stable and widely used
+
+#### Setup
+
+To use the Presto S3 FileSystem, copy the JAR file from the `opt` directory to 
the `plugins` directory:
+
+```bash
+mkdir -p ./plugins/s3-fs-presto
+cp ./opt/flink-s3-fs-presto-{{< version >}}.jar ./plugins/s3-fs-presto/
+```
+
+#### Configuration
+
+Configure it using [the same configuration keys as the Presto file 
system](https://prestodb.io/docs/0.272/connector/hive.html#amazon-s3-configuration),
 by adding the configurations to your [Flink configuration file]({{< ref 
"docs/deployment/config#flink-configuration-file" >}}):
+
+```yaml
+# AWS credentials
+s3.access-key: your-access-key
+s3.secret-key: your-secret-key
+
+# Custom endpoint
+s3.endpoint: your-endpoint-hostname
+
+# Path style access
+s3.path-style-access: true
+
+# Credentials provider
+presto.s3.credentials-provider: 
org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider
+```
+
+Refer to the [Presto 
documentation](https://prestodb.io/docs/0.272/connector/hive.html#amazon-s3-configuration)
 for all available configuration options.
+
+---
+
+### Hadoop S3 FileSystem
+
+The Hadoop S3 FileSystem is based on code from the [Hadoop 
Project](https://hadoop.apache.org/). It is registered under the schemes 
*s3://* and *s3a://*.
+
+#### Features
+
+- **FileSystem sink support**: The only S3 implementation with support for the 
[FileSystem sink]({{< ref "docs/connectors/datastream/filesystem" >}})
+- **Self-contained**: No additional Hadoop installation required
+- **Mature implementation**: Long-established code from the Hadoop ecosystem
+
+#### Setup
+
+To use the Hadoop S3 FileSystem, copy the JAR file from the `opt` directory to 
the `plugins` directory:
+
+```bash
+mkdir -p ./plugins/s3-fs-hadoop
+cp ./opt/flink-s3-fs-hadoop-{{< version >}}.jar ./plugins/s3-fs-hadoop/
+```
+
+#### Configuration
+
+Configure it using [Hadoop's s3a configuration 
keys](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A)
 by adding the configurations to your [Flink configuration file]({{< ref 
"docs/deployment/config#flink-configuration-file" >}}):
+
+```yaml
+# AWS credentials
+s3.access-key: your-access-key
+s3.secret-key: your-secret-key
+
+# Custom endpoint
+s3.endpoint: your-endpoint-hostname
+
+# Path style access
+s3.path-style-access: true
+
+# Connection settings
+s3.connection.maximum: 10
+
+# Credentials provider
+fs.s3a.aws.credentials.provider: 
org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider
+```
+
+Hadoop configuration keys are automatically translated. For example, 
`fs.s3a.connection.maximum` becomes `s3.connection.maximum`. Refer to the 
[Hadoop S3A 
documentation](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A)
 for all available options.
+
+---
+
+## Using Multiple S3 Implementations
+
+All three S3 implementations register as handlers for the *s3://* scheme. 
Additionally, each implementation supports alternative schemes:
+
+| Implementation | Schemes |
+|---------------|---------|
+| Native S3     | *s3://*, *s3a://* |
+| Presto        | *s3://*, *s3p://* |
+| Hadoop        | *s3://*, *s3a://* |
+
+Only one implementation can handle a given scheme at a time. The Native S3 
implementation has the lowest priority, so when another implementation is 
present, it will take precedence for the *s3://* scheme.
+
+You can use multiple S3 implementations simultaneously by leveraging their 
different URI schemes. For example, if a job uses the [FileSystem]({{< ref 
"docs/connectors/datastream/filesystem" >}}) sink (Hadoop-only) but Presto for 
checkpointing:
+
+- Use *s3a://* scheme for the sink (Hadoop)
+- Use *s3p://* scheme for checkpointing (Presto)
+
+{{< hint info >}}
+The Native S3 implementation does not introduce a new URI scheme. It supports 
the existing *s3://* and *s3a://* schemes. Since both the Native S3 and Hadoop 
implementations handle the same schemes, only one can be active at a time. To 
use the Native S3 implementation, ensure only the Native S3 plugin JAR is in 
the `plugins` directory.  
+
+
+
+**Caution** :  Do not load `flink-s3-fs-native` and `flink-s3-fs-hadoop` 
plugins simultaneously.

Review Comment:
    Loading both JARs simultaneously is technically safe. The priority 
mechanism ensures only one factory handles each scheme (Hadoop wins by default 
with priority 0
     vs Native's -1). The "Caution" was overly restrictive. I've updated the 
doc to explain the priority-based resolution and the 
fs.<scheme>.priority.<factoryClassName> config      
     option for users who want to control which implementation handles which 
scheme. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to