This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 72d3831e feat(rest): add initial oauth2 support to rest catalog (#577)
72d3831e is described below
commit 72d3831e7be25b86f04c508d70c3e17029e77d68
Author: lishuxu <[email protected]>
AuthorDate: Thu Mar 19 11:35:37 2026 +0800
feat(rest): add initial oauth2 support to rest catalog (#577)
Add OAuth2 authentication support for the REST catalog, including:
- OAuth2Manager with static token and client_credentials grant flows
TODO:
- RefreshToken and ExchangeToken will be supported later
---
src/iceberg/catalog/rest/CMakeLists.txt | 2 +
src/iceberg/catalog/rest/auth/auth_manager.cc | 78 +++++++++-
.../catalog/rest/auth/auth_manager_internal.h | 5 +
src/iceberg/catalog/rest/auth/auth_managers.cc | 5 +-
src/iceberg/catalog/rest/auth/auth_properties.cc | 83 +++++++++++
src/iceberg/catalog/rest/auth/auth_properties.h | 93 ++++++++----
src/iceberg/catalog/rest/auth/auth_session.cc | 11 ++
src/iceberg/catalog/rest/auth/auth_session.h | 22 +++
src/iceberg/catalog/rest/auth/oauth2_util.cc | 77 ++++++++++
src/iceberg/catalog/rest/auth/oauth2_util.h | 56 +++++++
src/iceberg/catalog/rest/http_client.cc | 2 +-
src/iceberg/catalog/rest/json_serde.cc | 45 ++++++
src/iceberg/catalog/rest/json_serde_internal.h | 1 +
src/iceberg/catalog/rest/meson.build | 3 +
src/iceberg/catalog/rest/rest_catalog.cc | 13 +-
src/iceberg/catalog/rest/type_fwd.h | 3 +
src/iceberg/catalog/rest/types.cc | 19 +++
src/iceberg/catalog/rest/types.h | 17 +++
src/iceberg/test/auth_manager_test.cc | 163 +++++++++++++++++++++
19 files changed, 663 insertions(+), 35 deletions(-)
diff --git a/src/iceberg/catalog/rest/CMakeLists.txt
b/src/iceberg/catalog/rest/CMakeLists.txt
index 1da47d68..e91b1296 100644
--- a/src/iceberg/catalog/rest/CMakeLists.txt
+++ b/src/iceberg/catalog/rest/CMakeLists.txt
@@ -20,7 +20,9 @@ add_subdirectory(auth)
set(ICEBERG_REST_SOURCES
auth/auth_manager.cc
auth/auth_managers.cc
+ auth/auth_properties.cc
auth/auth_session.cc
+ auth/oauth2_util.cc
catalog_properties.cc
endpoint.cc
error_handlers.cc
diff --git a/src/iceberg/catalog/rest/auth/auth_manager.cc
b/src/iceberg/catalog/rest/auth/auth_manager.cc
index 14946aef..47370bd3 100644
--- a/src/iceberg/catalog/rest/auth/auth_manager.cc
+++ b/src/iceberg/catalog/rest/auth/auth_manager.cc
@@ -19,9 +19,12 @@
#include "iceberg/catalog/rest/auth/auth_manager.h"
+#include <optional>
+
#include "iceberg/catalog/rest/auth/auth_manager_internal.h"
#include "iceberg/catalog/rest/auth/auth_properties.h"
#include "iceberg/catalog/rest/auth/auth_session.h"
+#include "iceberg/catalog/rest/auth/oauth2_util.h"
#include "iceberg/util/macros.h"
#include "iceberg/util/transform_util.h"
@@ -80,7 +83,8 @@ class BasicAuthManager : public AuthManager {
"Missing required property '{}'",
AuthProperties::kBasicPassword);
std::string credential = username_it->second + ":" + password_it->second;
return AuthSession::MakeDefault(
- {{"Authorization", "Basic " +
TransformUtil::Base64Encode(credential)}});
+ {{std::string(kAuthorizationHeader),
+ "Basic " + TransformUtil::Base64Encode(credential)}});
}
};
@@ -90,4 +94,76 @@ Result<std::unique_ptr<AuthManager>> MakeBasicAuthManager(
return std::make_unique<BasicAuthManager>();
}
+/// \brief OAuth2 authentication manager.
+class OAuth2Manager : public AuthManager {
+ public:
+ Result<std::shared_ptr<AuthSession>> InitSession(
+ HttpClient& init_client,
+ const std::unordered_map<std::string, std::string>& properties) override
{
+ ICEBERG_ASSIGN_OR_RAISE(auto config,
AuthProperties::FromProperties(properties));
+ // No token refresh during init (short-lived session).
+ config.Set(AuthProperties::kKeepRefreshed, false);
+
+ // Credential takes priority: fetch a fresh token for the config request.
+ if (!config.credential().empty()) {
+ auto init_session =
AuthSession::MakeDefault(AuthHeaders(config.token()));
+ ICEBERG_ASSIGN_OR_RAISE(init_token_response_,
+ FetchToken(init_client, *init_session, config));
+ return
AuthSession::MakeDefault(AuthHeaders(init_token_response_->access_token));
+ }
+
+ if (!config.token().empty()) {
+ return AuthSession::MakeDefault(AuthHeaders(config.token()));
+ }
+
+ return AuthSession::MakeDefault({});
+ }
+
+ Result<std::shared_ptr<AuthSession>> CatalogSession(
+ HttpClient& client,
+ const std::unordered_map<std::string, std::string>& properties) override
{
+ ICEBERG_ASSIGN_OR_RAISE(auto config,
AuthProperties::FromProperties(properties));
+
+ // Reuse token from init phase.
+ if (init_token_response_.has_value()) {
+ auto token_response = std::move(*init_token_response_);
+ init_token_response_.reset();
+ return AuthSession::MakeOAuth2(token_response,
config.oauth2_server_uri(),
+ config.client_id(),
config.client_secret(),
+ config.scope(), client);
+ }
+
+ // If token is provided, use it directly.
+ if (!config.token().empty()) {
+ return AuthSession::MakeDefault(AuthHeaders(config.token()));
+ }
+
+ // Fetch a new token using client_credentials grant.
+ if (!config.credential().empty()) {
+ auto base_session =
AuthSession::MakeDefault(AuthHeaders(config.token()));
+ OAuthTokenResponse token_response;
+ ICEBERG_ASSIGN_OR_RAISE(token_response, FetchToken(client,
*base_session, config));
+ // TODO(lishuxu): should we directly pass config to the MakeOAuth2 call?
+ return AuthSession::MakeOAuth2(token_response,
config.oauth2_server_uri(),
+ config.client_id(),
config.client_secret(),
+ config.scope(), client);
+ }
+
+ return AuthSession::MakeDefault({});
+ }
+
+ // TODO(lishuxu): Override TableSession() for token exchange (RFC 8693).
+ // TODO(lishuxu): Override ContextualSession() for per-context exchange.
+
+ private:
+ /// Cached token from InitSession
+ std::optional<OAuthTokenResponse> init_token_response_;
+};
+
+Result<std::unique_ptr<AuthManager>> MakeOAuth2Manager(
+ [[maybe_unused]] std::string_view name,
+ [[maybe_unused]] const std::unordered_map<std::string, std::string>&
properties) {
+ return std::make_unique<OAuth2Manager>();
+}
+
} // namespace iceberg::rest::auth
diff --git a/src/iceberg/catalog/rest/auth/auth_manager_internal.h
b/src/iceberg/catalog/rest/auth/auth_manager_internal.h
index 96e45239..051d0550 100644
--- a/src/iceberg/catalog/rest/auth/auth_manager_internal.h
+++ b/src/iceberg/catalog/rest/auth/auth_manager_internal.h
@@ -42,4 +42,9 @@ Result<std::unique_ptr<AuthManager>> MakeBasicAuthManager(
std::string_view name,
const std::unordered_map<std::string, std::string>& properties);
+/// \brief Create an OAuth2 authentication manager.
+Result<std::unique_ptr<AuthManager>> MakeOAuth2Manager(
+ std::string_view name,
+ const std::unordered_map<std::string, std::string>& properties);
+
} // namespace iceberg::rest::auth
diff --git a/src/iceberg/catalog/rest/auth/auth_managers.cc
b/src/iceberg/catalog/rest/auth/auth_managers.cc
index d0bf2484..f55885d7 100644
--- a/src/iceberg/catalog/rest/auth/auth_managers.cc
+++ b/src/iceberg/catalog/rest/auth/auth_managers.cc
@@ -52,8 +52,8 @@ std::string InferAuthType(
}
// Infer from OAuth2 properties (credential or token)
- bool has_credential = properties.contains(AuthProperties::kOAuth2Credential);
- bool has_token = properties.contains(AuthProperties::kOAuth2Token);
+ bool has_credential = properties.contains(AuthProperties::kCredential.key());
+ bool has_token = properties.contains(AuthProperties::kToken.key());
if (has_credential || has_token) {
return AuthProperties::kAuthTypeOAuth2;
}
@@ -65,6 +65,7 @@ AuthManagerRegistry CreateDefaultRegistry() {
return {
{AuthProperties::kAuthTypeNone, MakeNoopAuthManager},
{AuthProperties::kAuthTypeBasic, MakeBasicAuthManager},
+ {AuthProperties::kAuthTypeOAuth2, MakeOAuth2Manager},
};
}
diff --git a/src/iceberg/catalog/rest/auth/auth_properties.cc
b/src/iceberg/catalog/rest/auth/auth_properties.cc
new file mode 100644
index 00000000..dcf16782
--- /dev/null
+++ b/src/iceberg/catalog/rest/auth/auth_properties.cc
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/catalog/rest/auth/auth_properties.h"
+
+#include <utility>
+
+#include "iceberg/catalog/rest/catalog_properties.h"
+
+namespace iceberg::rest::auth {
+
+namespace {
+
+std::pair<std::string, std::string> ParseCredential(const std::string&
credential) {
+ auto colon_pos = credential.find(':');
+ if (colon_pos == std::string::npos) {
+ return {"", credential};
+ }
+ return {credential.substr(0, colon_pos), credential.substr(colon_pos + 1)};
+}
+
+} // namespace
+
+std::unordered_map<std::string, std::string>
AuthProperties::optional_oauth_params()
+ const {
+ std::unordered_map<std::string, std::string> params;
+ if (auto audience = Get(kAudience); !audience.empty()) {
+ params.emplace(kAudience.key(), std::move(audience));
+ }
+ if (auto resource = Get(kResource); !resource.empty()) {
+ params.emplace(kResource.key(), std::move(resource));
+ }
+ return params;
+}
+
+Result<AuthProperties> AuthProperties::FromProperties(
+ const std::unordered_map<std::string, std::string>& properties) {
+ AuthProperties config;
+ config.configs_ = properties;
+
+ // Parse client_id/client_secret from credential
+ if (auto cred = config.credential(); !cred.empty()) {
+ auto [id, secret] = ParseCredential(cred);
+ config.client_id_ = std::move(id);
+ config.client_secret_ = std::move(secret);
+ }
+
+ // Resolve token endpoint: if not explicitly set, derive from catalog URI
+ if (properties.find(kOAuth2ServerUri.key()) == properties.end() ||
+ properties.at(kOAuth2ServerUri.key()).empty()) {
+ auto uri_it = properties.find(RestCatalogProperties::kUri.key());
+ if (uri_it != properties.end() && !uri_it->second.empty()) {
+ std::string_view base = uri_it->second;
+ while (!base.empty() && base.back() == '/') {
+ base.remove_suffix(1);
+ }
+ config.Set(kOAuth2ServerUri,
+ std::string(base) + "/" +
std::string(kOAuth2ServerUri.value()));
+ }
+ }
+
+ // TODO(lishuxu): Parse JWT exp claim from token to set expires_at_millis_.
+
+ return config;
+}
+
+} // namespace iceberg::rest::auth
diff --git a/src/iceberg/catalog/rest/auth/auth_properties.h
b/src/iceberg/catalog/rest/auth/auth_properties.h
index e14b7fcf..05a7ea2c 100644
--- a/src/iceberg/catalog/rest/auth/auth_properties.h
+++ b/src/iceberg/catalog/rest/auth/auth_properties.h
@@ -19,55 +19,88 @@
#pragma once
+#include <cstdint>
+#include <optional>
#include <string>
-#include <string_view>
+#include <unordered_map>
+
+#include "iceberg/catalog/rest/iceberg_rest_export.h"
+#include "iceberg/result.h"
+#include "iceberg/util/config.h"
/// \file iceberg/catalog/rest/auth/auth_properties.h
-/// \brief Property keys and constants for REST catalog authentication.
+/// \brief Property keys and configuration for REST catalog authentication.
namespace iceberg::rest::auth {
-/// \brief Property keys and constants for authentication configuration.
-///
-/// This struct defines all the property keys used to configure authentication
-/// for the REST catalog. It follows the same naming conventions as Java
Iceberg.
-struct AuthProperties {
- /// \brief Property key for specifying the authentication type.
+/// \brief Authentication properties
+class ICEBERG_REST_EXPORT AuthProperties : public ConfigBase<AuthProperties> {
+ public:
+ template <typename T>
+ using Entry = const ConfigBase<AuthProperties>::Entry<T>;
+
+ // ---- Authentication type constants (not Entry-based) ----
+
inline static const std::string kAuthType = "rest.auth.type";
- /// \brief Authentication type: no authentication.
inline static const std::string kAuthTypeNone = "none";
- /// \brief Authentication type: HTTP Basic authentication.
inline static const std::string kAuthTypeBasic = "basic";
- /// \brief Authentication type: OAuth2 authentication.
inline static const std::string kAuthTypeOAuth2 = "oauth2";
- /// \brief Authentication type: AWS SigV4 authentication.
inline static const std::string kAuthTypeSigV4 = "sigv4";
- /// \brief Property key for Basic auth username.
+ // ---- Basic auth entries ----
+
inline static const std::string kBasicUsername = "rest.auth.basic.username";
- /// \brief Property key for Basic auth password.
inline static const std::string kBasicPassword = "rest.auth.basic.password";
- /// \brief Property key for OAuth2 token (bearer token).
- inline static const std::string kOAuth2Token = "token";
- /// \brief Property key for OAuth2 credential (client_id:client_secret).
- inline static const std::string kOAuth2Credential = "credential";
- /// \brief Property key for OAuth2 scope.
- inline static const std::string kOAuth2Scope = "scope";
- /// \brief Property key for OAuth2 server URI.
- inline static const std::string kOAuth2ServerUri = "oauth2-server-uri";
- /// \brief Property key for enabling token refresh.
- inline static const std::string kOAuth2TokenRefreshEnabled =
"token-refresh-enabled";
- /// \brief Default OAuth2 scope for catalog operations.
- inline static const std::string kOAuth2DefaultScope = "catalog";
-
- /// \brief Property key for SigV4 region.
+ // ---- SigV4 entries ----
+
inline static const std::string kSigV4Region = "rest.auth.sigv4.region";
- /// \brief Property key for SigV4 service name.
inline static const std::string kSigV4Service = "rest.auth.sigv4.service";
- /// \brief Property key for SigV4 delegate auth type.
inline static const std::string kSigV4DelegateAuthType =
"rest.auth.sigv4.delegate-auth-type";
+
+ // ---- OAuth2 entries ----
+
+ inline static Entry<std::string> kToken{"token", ""};
+ inline static Entry<std::string> kCredential{"credential", ""};
+ inline static Entry<std::string> kScope{"scope", "catalog"};
+ inline static Entry<std::string> kOAuth2ServerUri{"oauth2-server-uri",
+ "v1/oauth/tokens"};
+ inline static Entry<bool> kKeepRefreshed{"token-refresh-enabled", true};
+ inline static Entry<bool> kExchangeEnabled{"token-exchange-enabled", true};
+ inline static Entry<std::string> kAudience{"audience", ""};
+ inline static Entry<std::string> kResource{"resource", ""};
+
+ /// \brief Build an AuthProperties from a properties map.
+ static Result<AuthProperties> FromProperties(
+ const std::unordered_map<std::string, std::string>& properties);
+
+ /// \brief Get the bearer token.
+ std::string token() const { return Get(kToken); }
+ /// \brief Get the raw credential string.
+ std::string credential() const { return Get(kCredential); }
+ /// \brief Get the OAuth2 scope.
+ std::string scope() const { return Get(kScope); }
+ /// \brief Get the token endpoint URI.
+ std::string oauth2_server_uri() const { return Get(kOAuth2ServerUri); }
+ /// \brief Whether token refresh is enabled.
+ bool keep_refreshed() const { return Get(kKeepRefreshed); }
+ /// \brief Whether token exchange is enabled.
+ bool exchange_enabled() const { return Get(kExchangeEnabled); }
+
+ /// \brief Parsed client_id from credential (empty if no colon).
+ const std::string& client_id() const { return client_id_; }
+ /// \brief Parsed client_secret from credential.
+ const std::string& client_secret() const { return client_secret_; }
+
+ /// \brief Build optional OAuth params (audience, resource) from config.
+ std::unordered_map<std::string, std::string> optional_oauth_params() const;
+
+ private:
+ std::string client_id_;
+ std::string client_secret_;
+ std::string token_type_;
+ std::optional<int64_t> expires_at_millis_;
};
} // namespace iceberg::rest::auth
diff --git a/src/iceberg/catalog/rest/auth/auth_session.cc
b/src/iceberg/catalog/rest/auth/auth_session.cc
index 00ed946a..7251dc4a 100644
--- a/src/iceberg/catalog/rest/auth/auth_session.cc
+++ b/src/iceberg/catalog/rest/auth/auth_session.cc
@@ -21,6 +21,8 @@
#include <utility>
+#include "iceberg/catalog/rest/auth/oauth2_util.h"
+
namespace iceberg::rest::auth {
namespace {
@@ -49,4 +51,13 @@ std::shared_ptr<AuthSession> AuthSession::MakeDefault(
return std::make_shared<DefaultAuthSession>(std::move(headers));
}
+std::shared_ptr<AuthSession> AuthSession::MakeOAuth2(
+ const OAuthTokenResponse& initial_token, const std::string&
/*token_endpoint*/,
+ const std::string& /*client_id*/, const std::string& /*client_secret*/,
+ const std::string& /*scope*/, HttpClient& /*client*/) {
+ // TODO(lishuxu): Create OAuth2AuthSession with auto-refresh support.
+ return MakeDefault({{std::string(kAuthorizationHeader),
+ std::string(kBearerPrefix) +
initial_token.access_token}});
+}
+
} // namespace iceberg::rest::auth
diff --git a/src/iceberg/catalog/rest/auth/auth_session.h
b/src/iceberg/catalog/rest/auth/auth_session.h
index d81b3d93..26b93877 100644
--- a/src/iceberg/catalog/rest/auth/auth_session.h
+++ b/src/iceberg/catalog/rest/auth/auth_session.h
@@ -24,6 +24,7 @@
#include <unordered_map>
#include "iceberg/catalog/rest/iceberg_rest_export.h"
+#include "iceberg/catalog/rest/type_fwd.h"
#include "iceberg/result.h"
/// \file iceberg/catalog/rest/auth/auth_session.h
@@ -70,6 +71,27 @@ class ICEBERG_REST_EXPORT AuthSession {
/// \return A new session that adds the given headers to requests.
static std::shared_ptr<AuthSession> MakeDefault(
std::unordered_map<std::string, std::string> headers);
+
+ /// \brief Create an OAuth2 session with automatic token refresh.
+ ///
+ /// This factory method creates a session that holds an access token and
+ /// optionally a refresh token. When Authenticate() is called and the token
+ /// is expired, it transparently refreshes the token before setting the
+ /// Authorization header.
+ ///
+ /// \param initial_token The initial token response from FetchToken().
+ /// \param token_endpoint Full URL of the OAuth2 token endpoint for refresh.
+ /// \param client_id OAuth2 client ID for refresh requests.
+ /// \param client_secret OAuth2 client secret for re-fetch if refresh fails.
+ /// \param scope OAuth2 scope for refresh requests.
+ /// \param client HTTP client for making refresh requests.
+ /// \return A new session that manages token lifecycle automatically.
+ static std::shared_ptr<AuthSession> MakeOAuth2(const OAuthTokenResponse&
initial_token,
+ const std::string&
token_endpoint,
+ const std::string& client_id,
+ const std::string&
client_secret,
+ const std::string& scope,
+ HttpClient& client);
};
} // namespace iceberg::rest::auth
diff --git a/src/iceberg/catalog/rest/auth/oauth2_util.cc
b/src/iceberg/catalog/rest/auth/oauth2_util.cc
new file mode 100644
index 00000000..3d209d2b
--- /dev/null
+++ b/src/iceberg/catalog/rest/auth/oauth2_util.cc
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/catalog/rest/auth/oauth2_util.h"
+
+#include <utility>
+
+#include <nlohmann/json.hpp>
+
+#include "iceberg/catalog/rest/auth/auth_session.h"
+#include "iceberg/catalog/rest/error_handlers.h"
+#include "iceberg/catalog/rest/http_client.h"
+#include "iceberg/catalog/rest/json_serde_internal.h"
+#include "iceberg/json_serde_internal.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg::rest::auth {
+
+namespace {
+
+constexpr std::string_view kGrantType = "grant_type";
+constexpr std::string_view kClientCredentials = "client_credentials";
+constexpr std::string_view kClientId = "client_id";
+constexpr std::string_view kClientSecret = "client_secret";
+constexpr std::string_view kScope = "scope";
+
+} // namespace
+
+std::unordered_map<std::string, std::string> AuthHeaders(const std::string&
token) {
+ if (!token.empty()) {
+ return {{std::string(kAuthorizationHeader), std::string(kBearerPrefix) +
token}};
+ }
+ return {};
+}
+
+Result<OAuthTokenResponse> FetchToken(HttpClient& client, AuthSession& session,
+ const AuthProperties& properties) {
+ std::unordered_map<std::string, std::string> form_data{
+ {std::string(kGrantType), std::string(kClientCredentials)},
+ {std::string(kClientSecret), properties.client_secret()},
+ {std::string(kScope), properties.scope()},
+ };
+ if (!properties.client_id().empty()) {
+ form_data.emplace(std::string(kClientId), properties.client_id());
+ }
+ for (const auto& [key, value] : properties.optional_oauth_params()) {
+ form_data.emplace(key, value);
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto response,
+ client.PostForm(properties.oauth2_server_uri(), form_data,
+ /*headers=*/{}, *DefaultErrorHandler::Instance(),
session));
+
+ ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
+ ICEBERG_ASSIGN_OR_RAISE(auto token_response,
FromJson<OAuthTokenResponse>(json));
+ ICEBERG_RETURN_UNEXPECTED(token_response.Validate());
+ return token_response;
+}
+
+} // namespace iceberg::rest::auth
diff --git a/src/iceberg/catalog/rest/auth/oauth2_util.h
b/src/iceberg/catalog/rest/auth/oauth2_util.h
new file mode 100644
index 00000000..39dd1296
--- /dev/null
+++ b/src/iceberg/catalog/rest/auth/oauth2_util.h
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <string_view>
+#include <unordered_map>
+
+#include "iceberg/catalog/rest/iceberg_rest_export.h"
+#include "iceberg/catalog/rest/type_fwd.h"
+#include "iceberg/catalog/rest/types.h"
+#include "iceberg/result.h"
+
+/// \file iceberg/catalog/rest/auth/oauth2_util.h
+/// \brief OAuth2 token utilities for REST catalog authentication.
+
+namespace iceberg::rest::auth {
+
+inline constexpr std::string_view kAuthorizationHeader = "Authorization";
+inline constexpr std::string_view kBearerPrefix = "Bearer ";
+
+/// \brief Fetch an OAuth2 token using the client_credentials grant type.
+///
+/// \param client HTTP client to use for the request.
+/// \param session Auth session for the request headers.
+/// \param properties Auth configuration containing credential, scope,
+/// token endpoint, and optional OAuth params.
+/// \return The token response or an error.
+ICEBERG_REST_EXPORT Result<OAuthTokenResponse> FetchToken(
+ HttpClient& client, AuthSession& session, const AuthProperties&
properties);
+
+/// \brief Build auth headers from a token string.
+///
+/// \param token Bearer token string (may be empty).
+/// \return Headers map with Authorization header if token is non-empty.
+ICEBERG_REST_EXPORT std::unordered_map<std::string, std::string> AuthHeaders(
+ const std::string& token);
+
+} // namespace iceberg::rest::auth
diff --git a/src/iceberg/catalog/rest/http_client.cc
b/src/iceberg/catalog/rest/http_client.cc
index b0824621..2e383b0a 100644
--- a/src/iceberg/catalog/rest/http_client.cc
+++ b/src/iceberg/catalog/rest/http_client.cc
@@ -75,7 +75,7 @@ Result<cpr::Header> BuildHeaders(
auth::AuthSession& session) {
std::unordered_map<std::string, std::string> headers(default_headers);
for (const auto& [key, val] : request_headers) {
- headers.emplace(key, val);
+ headers.insert_or_assign(key, val);
}
ICEBERG_RETURN_UNEXPECTED(session.Authenticate(headers));
return cpr::Header(headers.begin(), headers.end());
diff --git a/src/iceberg/catalog/rest/json_serde.cc
b/src/iceberg/catalog/rest/json_serde.cc
index 727881f6..eebdc196 100644
--- a/src/iceberg/catalog/rest/json_serde.cc
+++ b/src/iceberg/catalog/rest/json_serde.cc
@@ -72,6 +72,12 @@ constexpr std::string_view kStack = "stack";
constexpr std::string_view kError = "error";
constexpr std::string_view kIdentifier = "identifier";
constexpr std::string_view kRequirements = "requirements";
+constexpr std::string_view kAccessToken = "access_token";
+constexpr std::string_view kTokenType = "token_type";
+constexpr std::string_view kExpiresIn = "expires_in";
+constexpr std::string_view kIssuedTokenType = "issued_token_type";
+constexpr std::string_view kRefreshToken = "refresh_token";
+constexpr std::string_view kOAuthScope = "scope";
} // namespace
@@ -462,6 +468,44 @@ Result<CommitTableResponse>
CommitTableResponseFromJson(const nlohmann::json& js
return response;
}
+nlohmann::json ToJson(const OAuthTokenResponse& response) {
+ nlohmann::json json;
+ json[kAccessToken] = response.access_token;
+ json[kTokenType] = response.token_type;
+ if (response.expires_in_secs.has_value()) {
+ json[kExpiresIn] = response.expires_in_secs.value();
+ }
+ if (!response.issued_token_type.empty()) {
+ json[kIssuedTokenType] = response.issued_token_type;
+ }
+ if (!response.scope.empty()) {
+ json[kOAuthScope] = response.scope;
+ }
+ return json;
+}
+
+Result<OAuthTokenResponse> OAuthTokenResponseFromJson(const nlohmann::json&
json) {
+ OAuthTokenResponse response;
+ ICEBERG_ASSIGN_OR_RAISE(response.access_token,
+ GetJsonValue<std::string>(json, kAccessToken));
+ ICEBERG_ASSIGN_OR_RAISE(response.token_type,
+ GetJsonValue<std::string>(json, kTokenType));
+ // TODO(lishuxu): When implementing auto-refresh, extract exp claim
+ // from JWT if expires_in is missing.
+ if (json.contains(std::string(kExpiresIn))) {
+ ICEBERG_ASSIGN_OR_RAISE(auto val, GetJsonValue<int64_t>(json, kExpiresIn));
+ response.expires_in_secs = val;
+ }
+ ICEBERG_ASSIGN_OR_RAISE(response.issued_token_type,
+ GetJsonValueOrDefault<std::string>(json,
kIssuedTokenType));
+ ICEBERG_ASSIGN_OR_RAISE(response.refresh_token,
+ GetJsonValueOrDefault<std::string>(json,
kRefreshToken));
+ ICEBERG_ASSIGN_OR_RAISE(response.scope,
+ GetJsonValueOrDefault<std::string>(json,
kOAuthScope));
+ ICEBERG_RETURN_UNEXPECTED(response.Validate());
+ return response;
+}
+
#define ICEBERG_DEFINE_FROM_JSON(Model) \
template <> \
Result<Model> FromJson<Model>(const nlohmann::json& json) { \
@@ -483,5 +527,6 @@ ICEBERG_DEFINE_FROM_JSON(RenameTableRequest)
ICEBERG_DEFINE_FROM_JSON(CreateTableRequest)
ICEBERG_DEFINE_FROM_JSON(CommitTableRequest)
ICEBERG_DEFINE_FROM_JSON(CommitTableResponse)
+ICEBERG_DEFINE_FROM_JSON(OAuthTokenResponse)
} // namespace iceberg::rest
diff --git a/src/iceberg/catalog/rest/json_serde_internal.h
b/src/iceberg/catalog/rest/json_serde_internal.h
index f8bdd78f..820e077d 100644
--- a/src/iceberg/catalog/rest/json_serde_internal.h
+++ b/src/iceberg/catalog/rest/json_serde_internal.h
@@ -58,6 +58,7 @@ ICEBERG_DECLARE_JSON_SERDE(RenameTableRequest)
ICEBERG_DECLARE_JSON_SERDE(CreateTableRequest)
ICEBERG_DECLARE_JSON_SERDE(CommitTableRequest)
ICEBERG_DECLARE_JSON_SERDE(CommitTableResponse)
+ICEBERG_DECLARE_JSON_SERDE(OAuthTokenResponse)
#undef ICEBERG_DECLARE_JSON_SERDE
diff --git a/src/iceberg/catalog/rest/meson.build
b/src/iceberg/catalog/rest/meson.build
index 3a333963..ef250045 100644
--- a/src/iceberg/catalog/rest/meson.build
+++ b/src/iceberg/catalog/rest/meson.build
@@ -18,7 +18,9 @@
iceberg_rest_sources = files(
'auth/auth_manager.cc',
'auth/auth_managers.cc',
+ 'auth/auth_properties.cc',
'auth/auth_session.cc',
+ 'auth/oauth2_util.cc',
'catalog_properties.cc',
'endpoint.cc',
'error_handlers.cc',
@@ -82,6 +84,7 @@ install_headers(
'auth/auth_managers.h',
'auth/auth_properties.h',
'auth/auth_session.h',
+ 'auth/oauth2_util.h',
],
subdir: 'iceberg/catalog/rest/auth',
)
diff --git a/src/iceberg/catalog/rest/rest_catalog.cc
b/src/iceberg/catalog/rest/rest_catalog.cc
index 42dc8659..caef5041 100644
--- a/src/iceberg/catalog/rest/rest_catalog.cc
+++ b/src/iceberg/catalog/rest/rest_catalog.cc
@@ -71,8 +71,19 @@ Result<CatalogConfig> FetchServerConfig(const ResourcePaths&
paths,
auth::AuthSession& session) {
ICEBERG_ASSIGN_OR_RAISE(auto config_path, paths.Config());
HttpClient client(current_config.ExtractHeaders());
+
+ // Send the client's warehouse location to the service to keep in sync.
+ // This is needed for cases where the warehouse is configured client side,
but may
+ // be used on the server side, like the Hive Metastore, where both client
and service
+ // may have a warehouse location.
+ std::unordered_map<std::string, std::string> params;
+ std::string warehouse =
current_config.Get(RestCatalogProperties::kWarehouse);
+ if (!warehouse.empty()) {
+ params[RestCatalogProperties::kWarehouse.key()] = std::move(warehouse);
+ }
+
ICEBERG_ASSIGN_OR_RAISE(const auto response,
- client.Get(config_path, /*params=*/{},
/*headers=*/{},
+ client.Get(config_path, params, /*headers=*/{},
*DefaultErrorHandler::Instance(),
session));
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
return CatalogConfigFromJson(json);
diff --git a/src/iceberg/catalog/rest/type_fwd.h
b/src/iceberg/catalog/rest/type_fwd.h
index e7286105..62bc14d1 100644
--- a/src/iceberg/catalog/rest/type_fwd.h
+++ b/src/iceberg/catalog/rest/type_fwd.h
@@ -22,10 +22,12 @@
/// \file iceberg/catalog/rest/type_fwd.h
/// Forward declarations and enum definitions for Iceberg REST API types.
+#include "iceberg/catalog/rest/auth/auth_properties.h"
namespace iceberg::rest {
struct ErrorResponse;
struct LoadTableResult;
+struct OAuthTokenResponse;
class Endpoint;
class ErrorHandler;
@@ -39,6 +41,7 @@ class RestCatalogProperties;
namespace iceberg::rest::auth {
class AuthManager;
+class AuthProperties;
class AuthSession;
} // namespace iceberg::rest::auth
diff --git a/src/iceberg/catalog/rest/types.cc
b/src/iceberg/catalog/rest/types.cc
index 3416bfe3..3abfb140 100644
--- a/src/iceberg/catalog/rest/types.cc
+++ b/src/iceberg/catalog/rest/types.cc
@@ -19,6 +19,8 @@
#include "iceberg/catalog/rest/types.h"
+#include <algorithm>
+
#include "iceberg/partition_spec.h"
#include "iceberg/schema.h"
#include "iceberg/sort_order.h"
@@ -116,4 +118,21 @@ bool CommitTableResponse::operator==(const
CommitTableResponse& other) const {
return true;
}
+Status OAuthTokenResponse::Validate() const {
+ if (access_token.empty()) {
+ return ValidationFailed("OAuth2 token response missing required
'access_token'");
+ }
+ if (token_type.empty()) {
+ return ValidationFailed("OAuth2 token response missing required
'token_type'");
+ }
+ // token_type must be "bearer" or "N_A" (case-insensitive).
+ std::string lower_type = token_type;
+ std::ranges::transform(lower_type, lower_type.begin(), ::tolower);
+ if (lower_type != "bearer" && lower_type != "n_a") {
+ return ValidationFailed(R"(Unsupported token type: {} (must be "bearer" or
"N_A"))",
+ token_type);
+ }
+ return {};
+}
+
} // namespace iceberg::rest
diff --git a/src/iceberg/catalog/rest/types.h b/src/iceberg/catalog/rest/types.h
index 93e7048a..6495a651 100644
--- a/src/iceberg/catalog/rest/types.h
+++ b/src/iceberg/catalog/rest/types.h
@@ -19,7 +19,9 @@
#pragma once
+#include <cstdint>
#include <memory>
+#include <optional>
#include <string>
#include <unordered_map>
#include <vector>
@@ -278,4 +280,19 @@ struct ICEBERG_REST_EXPORT CommitTableResponse {
bool operator==(const CommitTableResponse& other) const;
};
+/// \brief Response from an OAuth2 token endpoint.
+struct ICEBERG_REST_EXPORT OAuthTokenResponse {
+ std::string access_token; // required
+ std::string token_type; // required, "bearer" or "N_A"
+ std::optional<int64_t> expires_in_secs; // optional, seconds until
expiration
+ std::string issued_token_type; // optional, for token exchange
+ std::string refresh_token; // optional
+ std::string scope; // optional
+
+ /// \brief Validates the token response.
+ Status Validate() const;
+
+ bool operator==(const OAuthTokenResponse&) const = default;
+};
+
} // namespace iceberg::rest
diff --git a/src/iceberg/test/auth_manager_test.cc
b/src/iceberg/test/auth_manager_test.cc
index 82db393d..bd06fee3 100644
--- a/src/iceberg/test/auth_manager_test.cc
+++ b/src/iceberg/test/auth_manager_test.cc
@@ -24,15 +24,29 @@
#include <gmock/gmock.h>
#include <gtest/gtest.h>
+#include <nlohmann/json.hpp>
#include "iceberg/catalog/rest/auth/auth_managers.h"
#include "iceberg/catalog/rest/auth/auth_properties.h"
#include "iceberg/catalog/rest/auth/auth_session.h"
+#include "iceberg/catalog/rest/auth/oauth2_util.h"
#include "iceberg/catalog/rest/http_client.h"
+#include "iceberg/catalog/rest/json_serde_internal.h"
+#include "iceberg/json_serde_internal.h"
#include "iceberg/test/matchers.h"
namespace iceberg::rest::auth {
+namespace {
+
+/// Helper to parse OAuthTokenResponse from a JSON string.
+Result<OAuthTokenResponse> ParseTokenResponse(const std::string& str) {
+ ICEBERG_ASSIGN_OR_RAISE(auto json, iceberg::FromJsonString(str));
+ return iceberg::rest::FromJson<OAuthTokenResponse>(json);
+}
+
+} // namespace
+
class AuthManagerTest : public ::testing::Test {
protected:
HttpClient client_{{}};
@@ -195,4 +209,153 @@ TEST_F(AuthManagerTest, RegisterCustomAuthManager) {
EXPECT_EQ(headers["X-Custom-Auth"], "custom-value");
}
+// Verifies OAuth2 with static token
+TEST_F(AuthManagerTest, OAuth2StaticToken) {
+ std::unordered_map<std::string, std::string> properties = {
+ {AuthProperties::kAuthType, "oauth2"},
+ {AuthProperties::kToken.key(), "my-static-token"},
+ };
+
+ auto manager_result = AuthManagers::Load("test-catalog", properties);
+ ASSERT_THAT(manager_result, IsOk());
+
+ auto session_result = manager_result.value()->CatalogSession(client_,
properties);
+ ASSERT_THAT(session_result, IsOk());
+
+ std::unordered_map<std::string, std::string> headers;
+ EXPECT_THAT(session_result.value()->Authenticate(headers), IsOk());
+ EXPECT_EQ(headers["Authorization"], "Bearer my-static-token");
+}
+
+// Verifies OAuth2 type is inferred from token property
+TEST_F(AuthManagerTest, OAuth2InferredFromToken) {
+ std::unordered_map<std::string, std::string> properties = {
+ {AuthProperties::kToken.key(), "inferred-token"},
+ };
+
+ auto manager_result = AuthManagers::Load("test-catalog", properties);
+ ASSERT_THAT(manager_result, IsOk());
+
+ auto session_result = manager_result.value()->CatalogSession(client_,
properties);
+ ASSERT_THAT(session_result, IsOk());
+
+ std::unordered_map<std::string, std::string> headers;
+ EXPECT_THAT(session_result.value()->Authenticate(headers), IsOk());
+ EXPECT_EQ(headers["Authorization"], "Bearer inferred-token");
+}
+
+// Verifies OAuth2 returns unauthenticated session when neither token nor
credential is
+// provided
+TEST_F(AuthManagerTest, OAuth2MissingCredentials) {
+ std::unordered_map<std::string, std::string> properties = {
+ {AuthProperties::kAuthType, "oauth2"},
+ };
+
+ auto manager_result = AuthManagers::Load("test-catalog", properties);
+ ASSERT_THAT(manager_result, IsOk());
+
+ auto session_result = manager_result.value()->CatalogSession(client_,
properties);
+ ASSERT_THAT(session_result, IsOk());
+
+ // Session should have no auth headers
+ std::unordered_map<std::string, std::string> headers;
+ ASSERT_TRUE(session_result.value()->Authenticate(headers).has_value());
+ EXPECT_EQ(headers.find("Authorization"), headers.end());
+}
+
+// Verifies that when both token and credential are provided, token takes
priority
+// in CatalogSession (without a prior InitSession call)
+TEST_F(AuthManagerTest, OAuth2TokenTakesPriorityOverCredential) {
+ std::unordered_map<std::string, std::string> properties = {
+ {AuthProperties::kAuthType, "oauth2"},
+ {AuthProperties::kCredential.key(), "secret-only"},
+ {AuthProperties::kToken.key(), "my-static-token"},
+ {"uri", "http://localhost:8181"},
+ };
+
+ auto manager_result = AuthManagers::Load("test-catalog", properties);
+ ASSERT_THAT(manager_result, IsOk());
+
+ auto session_result = manager_result.value()->CatalogSession(client_,
properties);
+ ASSERT_THAT(session_result, IsOk());
+
+ std::unordered_map<std::string, std::string> headers;
+ ASSERT_THAT(session_result.value()->Authenticate(headers), IsOk());
+ EXPECT_EQ(headers["Authorization"], "Bearer my-static-token");
+}
+
+// Verifies OAuthTokenResponse JSON parsing
+TEST_F(AuthManagerTest, OAuthTokenResponseParsing) {
+ std::string json = R"({
+ "access_token": "test-access-token",
+ "token_type": "bearer",
+ "expires_in": 3600,
+ "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
+ "refresh_token": "test-refresh-token",
+ "scope": "catalog"
+ })";
+
+ auto result = ParseTokenResponse(json);
+ ASSERT_THAT(result, IsOk());
+ EXPECT_EQ(result->access_token, "test-access-token");
+ EXPECT_EQ(result->token_type, "bearer");
+ ASSERT_TRUE(result->expires_in_secs.has_value());
+ EXPECT_EQ(result->expires_in_secs.value(), 3600);
+ EXPECT_EQ(result->issued_token_type,
"urn:ietf:params:oauth:token-type:access_token");
+ EXPECT_EQ(result->refresh_token, "test-refresh-token");
+ EXPECT_EQ(result->scope, "catalog");
+}
+
+// Verifies OAuthTokenResponse parsing with minimal fields
+TEST_F(AuthManagerTest, OAuthTokenResponseMinimal) {
+ std::string json = R"({
+ "access_token": "token123",
+ "token_type": "Bearer"
+ })";
+
+ auto result = ParseTokenResponse(json);
+ ASSERT_THAT(result, IsOk());
+ EXPECT_EQ(result->access_token, "token123");
+ EXPECT_EQ(result->token_type, "Bearer");
+ EXPECT_FALSE(result->expires_in_secs.has_value());
+ EXPECT_TRUE(result->issued_token_type.empty());
+ EXPECT_TRUE(result->refresh_token.empty());
+ EXPECT_TRUE(result->scope.empty());
+}
+
+// Verifies OAuthTokenResponse validation fails when access_token is missing
+TEST_F(AuthManagerTest, OAuthTokenResponseMissingAccessToken) {
+ std::string json = R"({"token_type": "bearer"})";
+ auto result = ParseTokenResponse(json);
+ EXPECT_THAT(result, ::testing::Not(IsOk()));
+}
+
+// Verifies OAuthTokenResponse validation fails when token_type is missing
+TEST_F(AuthManagerTest, OAuthTokenResponseMissingTokenType) {
+ std::string json = R"({"access_token": "token123"})";
+ auto result = ParseTokenResponse(json);
+ EXPECT_THAT(result, ::testing::Not(IsOk()));
+}
+
+// Verifies OAuthTokenResponse validation fails for unsupported token_type
+TEST_F(AuthManagerTest, OAuthTokenResponseUnsupportedTokenType) {
+ std::string json = R"({
+ "access_token": "token123",
+ "token_type": "mac"
+ })";
+ auto result = ParseTokenResponse(json);
+ EXPECT_THAT(result, ::testing::Not(IsOk()));
+}
+
+// Verifies OAuthTokenResponse accepts N_A token type
+TEST_F(AuthManagerTest, OAuthTokenResponseNATokenType) {
+ std::string json = R"({
+ "access_token": "token123",
+ "token_type": "N_A"
+ })";
+ auto result = ParseTokenResponse(json);
+ ASSERT_THAT(result, IsOk());
+ EXPECT_EQ(result->token_type, "N_A");
+}
+
} // namespace iceberg::rest::auth