This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch 1.10.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/1.10.x by this push:
new c6ec670891 AWS: Fix leaked credentials when contacting multiple
catalogs (#14586)
c6ec670891 is described below
commit c6ec67089137db4974a27a7340c8e690cb0a9130
Author: Alexandre Dutra <[email protected]>
AuthorDate: Fri Nov 14 15:42:24 2025 +0100
AWS: Fix leaked credentials when contacting multiple catalogs (#14586)
Cherry-pick of 08d9ee020.
---
.../aws/s3/signer/S3V4RestSignerClient.java | 17 +++++++----
.../apache/iceberg/rest/auth/OAuth2Manager.java | 29 +++++++++++++-----
.../iceberg/rest/auth/TestOAuth2Manager.java | 34 +++++++++++++++++++++-
3 files changed, 66 insertions(+), 14 deletions(-)
diff --git
a/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java
b/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java
index a50f196142..6385d8875d 100644
---
a/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java
+++
b/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java
@@ -38,6 +38,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.rest.ErrorHandlers;
import org.apache.iceberg.rest.HTTPClient;
import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.RESTUtil;
import org.apache.iceberg.rest.ResourcePaths;
import org.apache.iceberg.rest.auth.AuthManager;
import org.apache.iceberg.rest.auth.AuthManagers;
@@ -98,7 +99,8 @@ public abstract class S3V4RestSignerClient
@Value.Lazy
public String endpoint() {
- return properties().getOrDefault(S3_SIGNER_ENDPOINT,
S3_SIGNER_DEFAULT_ENDPOINT);
+ return RESTUtil.resolveEndpoint(
+ baseSignerUri(), properties().getOrDefault(S3_SIGNER_ENDPOINT,
S3_SIGNER_DEFAULT_ENDPOINT));
}
/** A credential to exchange for a token in the OAuth2 client credentials
flow. */
@@ -111,7 +113,11 @@ public abstract class S3V4RestSignerClient
/** Token endpoint URI to fetch token from if the Rest Catalog is not the
authorization server. */
@Value.Lazy
public String oauth2ServerUri() {
- return properties().getOrDefault(OAuth2Properties.OAUTH2_SERVER_URI,
ResourcePaths.tokens());
+ String oauth2ServerUri =
+ properties().getOrDefault(OAuth2Properties.OAUTH2_SERVER_URI,
ResourcePaths.tokens());
+ return oauth2ServerUri.startsWith("http")
+ ? oauth2ServerUri
+ : RESTUtil.resolveEndpoint(baseSignerUri(), oauth2ServerUri);
}
@Value.Lazy
@@ -149,11 +155,10 @@ public abstract class S3V4RestSignerClient
if (null == httpClient) {
synchronized (S3V4RestSignerClient.class) {
if (null == httpClient) {
+ // Don't include a base URI because this client may be used for
contacting different
+ // catalogs.
httpClient =
- HTTPClient.builder(properties())
- .uri(baseSignerUri())
- .withObjectMapper(S3ObjectMapper.mapper())
- .build();
+
HTTPClient.builder(properties()).withObjectMapper(S3ObjectMapper.mapper()).build();
}
}
}
diff --git a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Manager.java
b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Manager.java
index 5f74db0e2e..5e01fbacb1 100644
--- a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Manager.java
+++ b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Manager.java
@@ -58,10 +58,10 @@ public class OAuth2Manager extends RefreshingAuthManager {
private final String name;
- private RESTClient refreshClient;
+ private volatile RESTClient refreshClient;
private long startTimeMillis;
private OAuthTokenResponse authResponse;
- private AuthSessionCache sessionCache;
+ private volatile AuthSessionCache sessionCache;
public OAuth2Manager(String managerName) {
super(managerName + "-token-refresh");
@@ -165,22 +165,37 @@ public class OAuth2Manager extends RefreshingAuthManager {
// Important: this method is invoked from standalone components; we must
not assume that
// the refresh client and session cache have been initialized, because
catalogSession()
// won't be called.
+ // We also assume that this method may be called from multiple threads, so
we must
+ // synchronize access to the refresh client and session cache.
+
if (refreshClient == null) {
- refreshClient = sharedClient.withAuthSession(parent);
+ synchronized (this) {
+ if (refreshClient == null) {
+ this.refreshClient = sharedClient.withAuthSession(parent);
+ }
+ }
}
if (sessionCache == null) {
- sessionCache = newSessionCache(name, properties);
+ synchronized (this) {
+ if (sessionCache == null) {
+ this.sessionCache = newSessionCache(name, properties);
+ }
+ }
}
+ String oauth2ServerUri =
+ properties.getOrDefault(OAuth2Properties.OAUTH2_SERVER_URI,
ResourcePaths.tokens());
+
if (config.token() != null) {
+ String cacheKey = oauth2ServerUri + ":" + config.token();
return sessionCache.cachedSession(
- config.token(), k -> newSessionFromAccessToken(config.token(),
properties, parent));
+ cacheKey, k -> newSessionFromAccessToken(config.token(), properties,
parent));
}
if (config.credential() != null && !config.credential().isEmpty()) {
- return sessionCache.cachedSession(
- config.credential(), k -> newSessionFromTokenResponse(config,
parent));
+ String cacheKey = oauth2ServerUri + ":" + config.credential();
+ return sessionCache.cachedSession(cacheKey, k ->
newSessionFromTokenResponse(config, parent));
}
return parent;
diff --git
a/core/src/test/java/org/apache/iceberg/rest/auth/TestOAuth2Manager.java
b/core/src/test/java/org/apache/iceberg/rest/auth/TestOAuth2Manager.java
index 677d7768b0..59b5eafa35 100644
--- a/core/src/test/java/org/apache/iceberg/rest/auth/TestOAuth2Manager.java
+++ b/core/src/test/java/org/apache/iceberg/rest/auth/TestOAuth2Manager.java
@@ -521,7 +521,7 @@ class TestOAuth2Manager {
assertThat(manager)
.extracting("sessionCache")
.asInstanceOf(type(AuthSessionCache.class))
- .as("should create session cache for table with token")
+ .as("should create session cache for empty table properties")
.satisfies(cache ->
assertThat(cache.sessionCache().asMap()).isEmpty());
}
Mockito.verify(client).withAuthSession(any());
@@ -582,6 +582,38 @@ class TestOAuth2Manager {
Mockito.verifyNoMoreInteractions(client);
}
+ @Test
+ void standaloneTableSessionCredentialProvidedMultipleAuthServers() {
+ Map<String, String> tableProperties1 = Map.of(OAuth2Properties.CREDENTIAL,
"client:secret");
+ Map<String, String> tableProperties2 =
+ Map.of(
+ OAuth2Properties.OAUTH2_SERVER_URI,
"https://auth-server2.com/v1/token",
+ OAuth2Properties.CREDENTIAL, "client:secret");
+ try (OAuth2Manager manager = new OAuth2Manager("test");
+ OAuth2Util.AuthSession tableSession1 =
+ (OAuth2Util.AuthSession) manager.tableSession(client,
tableProperties1);
+ OAuth2Util.AuthSession tableSession2 =
+ (OAuth2Util.AuthSession) manager.tableSession(client,
tableProperties2)) {
+ assertThat(tableSession1.headers()).containsOnly(entry("Authorization",
"Bearer test"));
+ assertThat(tableSession2.headers()).containsOnly(entry("Authorization",
"Bearer test"));
+ assertThat(tableSession1).isNotSameAs(tableSession2);
+ }
+ Mockito.verify(client).withAuthSession(any());
+ Mockito.verify(client, times(2))
+ .postForm(
+ any(),
+ eq(
+ Map.of(
+ "grant_type", "client_credentials",
+ "client_id", "client",
+ "client_secret", "secret",
+ "scope", "catalog")),
+ eq(OAuthTokenResponse.class),
+ eq(Map.of()),
+ any());
+ Mockito.verifyNoMoreInteractions(client);
+ }
+
@Test
void close() {
Map<String, String> catalogProperties = Map.of();