nastra commented on code in PR #15678:
URL: https://github.com/apache/iceberg/pull/15678#discussion_r2959411347
##########
aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java:
##########
@@ -419,22 +436,106 @@ private Map<String, PrefixedS3Client> clientByPrefix() {
new PrefixedS3Client(
storageCredential.prefix(),
propertiesWithCredentials, s3, s3Async));
});
+
this.clientByPrefix = localClientByPrefix;
+ // Note: the s3 clients separately refresh via the
VendedCredentialsProvider but are
+ // not directly referencable from the FileIO
+ scheduleRefresh();
}
}
}
return clientByPrefix;
}
- private ExecutorService executorService() {
+ private void scheduleRefresh() {
+ storageCredentials.stream()
+ .map(
+ storageCredential ->
+
storageCredential.config().get(S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS))
+ .filter(Objects::nonNull)
+ .map(expiresAtString ->
Instant.ofEpochMilli(Long.parseLong(expiresAtString)))
+ .min(Comparator.naturalOrder())
+ .ifPresent(
+ expiresAt -> {
+ Instant prefetchAt = expiresAt.minus(5, ChronoUnit.MINUTES);
+ long delay = Duration.between(Instant.now(),
prefetchAt).toMillis();
+ this.refreshFuture =
+ executorService()
+ .schedule(this::refreshStorageCredentials, delay,
TimeUnit.MILLISECONDS);
+ });
+ }
+
+ private void refreshStorageCredentials() {
+ String credentialsEndpoint = properties.get(VendedCredentialsProvider.URI);
+ String catalogEndpoint = properties.get(CatalogProperties.URI);
+ if (credentialsEndpoint == null || catalogEndpoint == null) {
+ return;
+ }
+
+ // Refresh should be infrequent, so single use http client here is
acceptable
+ AuthManager authManager = null;
+ AuthSession authSession = null;
+ HTTPClient httpClient = null;
+ try {
+ authManager = AuthManagers.loadAuthManager("s3-credentials-refresh",
properties);
Review Comment:
I feel like we're duplicating a lot of the logic that we're already using
inside `VendedCredentialsProvider`. Maybe we could just use the
`VendedCredentialsProvider` here to do the refresh? Something like
```
private void refreshStorageCredentials() {
if (isResourceClosed.get()) {
return;
}
try (VendedCredentialsProvider provider =
VendedCredentialsProvider.create(properties)) {
List<StorageCredential> refreshed =
provider.fetchCredentials().credentials().stream()
.filter(c -> c.prefix().startsWith(ROOT_PREFIX))
.map(c -> StorageCredential.create(c.prefix(), c.config()))
.collect(Collectors.toList());
if (!refreshed.isEmpty() && !isResourceClosed.get()) {
this.storageCredentials = Lists.newArrayList(refreshed);
invalidateClients();
}
} catch (Exception e) {
LOG.warn("Failed to refresh storage credentials", e);
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]