This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 8136127f0 [filesystem] Support AssumeRole STS for RustFS (#2989)
8136127f0 is described below
commit 8136127f0c722f5da80c832e11dde30f8317f5df
Author: Anton Borisov <[email protected]>
AuthorDate: Wed Apr 8 09:16:00 2026 +0100
[filesystem] Support AssumeRole STS for RustFS (#2989)
---
.../fs/s3/token/S3DelegationTokenProvider.java | 77 +++++++++++++++++-----
1 file changed, 62 insertions(+), 15 deletions(-)
diff --git
a/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java
b/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java
index 33caaacdd..74c178a34 100644
---
a/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java
+++
b/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java
@@ -22,17 +22,23 @@ import org.apache.fluss.fs.token.ObtainedSecurityToken;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
import
com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
+import com.amazonaws.services.securitytoken.model.AssumeRoleRequest;
+import com.amazonaws.services.securitytoken.model.AssumeRoleResult;
import com.amazonaws.services.securitytoken.model.Credentials;
import com.amazonaws.services.securitytoken.model.GetSessionTokenResult;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
+import java.util.UUID;
import static org.apache.fluss.utils.Preconditions.checkNotNull;
@@ -47,10 +53,15 @@ public class S3DelegationTokenProvider {
private static final String REGION_KEY = "fs.s3a.region";
private static final String ENDPOINT_KEY = "fs.s3a.endpoint";
+ private static final String ROLE_ARN_KEY = "fs.s3a.assumed.role.arn";
+ private static final String STS_ENDPOINT_KEY =
"fs.s3a.assumed.role.sts.endpoint";
+
private final String scheme;
private final String region;
private final String accessKey;
private final String secretKey;
+ @Nullable private final String roleArn;
+ @Nullable private final String stsEndpoint;
private final Map<String, String> additionInfos;
public S3DelegationTokenProvider(String scheme, Configuration conf) {
@@ -59,6 +70,8 @@ public class S3DelegationTokenProvider {
checkNotNull(region, "Region is not set.");
this.accessKey = conf.get(ACCESS_KEY_ID);
this.secretKey = conf.get(ACCESS_KEY_SECRET);
+ this.roleArn = conf.get(ROLE_ARN_KEY);
+ this.stsEndpoint = conf.get(STS_ENDPOINT_KEY);
this.additionInfos = new HashMap<>();
for (String key : Arrays.asList(REGION_KEY, ENDPOINT_KEY)) {
if (conf.get(key) != null) {
@@ -68,25 +81,59 @@ public class S3DelegationTokenProvider {
}
public ObtainedSecurityToken obtainSecurityToken() {
- LOG.info("Obtaining session credentials token with access key: {}",
accessKey);
+ AWSSecurityTokenService stsClient = buildStsClient();
+ try {
+ Credentials credentials;
+
+ if (roleArn != null) {
+ LOG.info(
+ "Obtaining session credentials via AssumeRole with
access key: {}, role: {}",
+ accessKey,
+ roleArn);
+ AssumeRoleRequest request =
+ new AssumeRoleRequest()
+ .withRoleArn(roleArn)
+ .withRoleSessionName("fluss-" +
UUID.randomUUID());
+ AssumeRoleResult result = stsClient.assumeRole(request);
+ credentials = result.getCredentials();
+ } else {
+ LOG.info(
+ "Obtaining session credentials via GetSessionToken
with access key: {}",
+ accessKey);
+ GetSessionTokenResult result = stsClient.getSessionToken();
+ credentials = result.getCredentials();
+ }
- AWSSecurityTokenService stsClient =
+ LOG.info(
+ "Session credentials obtained successfully with access
key: {} expiration: {}",
+ credentials.getAccessKeyId(),
+ credentials.getExpiration());
+
+ return new ObtainedSecurityToken(
+ scheme,
+ toJson(credentials),
+ credentials.getExpiration().getTime(),
+ additionInfos);
+ } finally {
+ stsClient.shutdown();
+ }
+ }
+
+ private AWSSecurityTokenService buildStsClient() {
+ AWSSecurityTokenServiceClientBuilder builder =
AWSSecurityTokenServiceClientBuilder.standard()
- .withRegion(region)
.withCredentials(
new AWSStaticCredentialsProvider(
- new BasicAWSCredentials(accessKey,
secretKey)))
- .build();
- GetSessionTokenResult sessionTokenResult = stsClient.getSessionToken();
- Credentials credentials = sessionTokenResult.getCredentials();
-
- LOG.info(
- "Session credentials obtained successfully with access key: {}
expiration: {}",
- credentials.getAccessKeyId(),
- credentials.getExpiration());
-
- return new ObtainedSecurityToken(
- scheme, toJson(credentials),
credentials.getExpiration().getTime(), additionInfos);
+ new BasicAWSCredentials(accessKey,
secretKey)));
+
+ if (stsEndpoint != null) {
+ builder.withEndpointConfiguration(
+ new AwsClientBuilder.EndpointConfiguration(stsEndpoint,
region));
+ } else {
+ builder.withRegion(region);
+ }
+
+ return builder.build();
}
private byte[] toJson(Credentials credentials) {