This is an automated email from the ASF dual-hosted git repository. guangning pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push: new 522af14 [C++] Handle OAuth 2.0 exceptional cases gracefully (#12335) 522af14 is described below commit 522af14a098a05862e61f19228a7b291fcd7cba6 Author: Yunze Xu <xyzinfern...@163.com> AuthorDate: Wed Oct 13 11:05:26 2021 +0800 [C++] Handle OAuth 2.0 exceptional cases gracefully (#12335) Fixes #12324 Currently if any error happened during OAuth 2.0 authentication in C++ client, a runtime error would be thrown and could only be caught when creating an `AuthOauth` object, but could not be caught in `Client`'s method like `createProducer`. It's not graceful. What's worse, there's no way for Python client that is a wrapper of C++ client to caught this exception. When `ClientCredentialFlow::authenticate` returns an invalid `Oauth2TokenResult`, catch the `runtime_error` thrown in `Oauth2CachedToken`'s constructor and returns `ResultAuthenticationError` as `AuthOauth2::getAuthData`'s returned value. Since `getAuthData` always returns `ResultOk` before this PR, the related docs are also modified. Then when a CONNECT or AUTH_RESPONSE command is created, expose the result of `getAuthData`. If it's not `ResultOk`, close the connection and complete the connection's future with the result. After that, the `Client`'s API will be completed with the result. In addition, this PR also makes the error code of libcurl human readable by configuring `CURLOPT_ERRORBUFFER`. - [x] Make sure that the change passes the CI checks. This change added tests `AuthPluginTest.testOauth2Failure` to verify when OAuth 2.0 authentication failed, the `createProducer` would return `ResultAuthenticationError` without any exception thrown. (cherry picked from commit 06b68bb1d5859fd969f66c29888feda271817ec5) --- pulsar-client-cpp/include/pulsar/Authentication.h | 6 +-- pulsar-client-cpp/lib/BinaryProtoLookupService.cc | 2 +- pulsar-client-cpp/lib/ClientConnection.cc | 21 ++++++-- pulsar-client-cpp/lib/ClientConnection.h | 2 +- pulsar-client-cpp/lib/Commands.cc | 18 +++++-- pulsar-client-cpp/lib/Commands.h | 4 +- pulsar-client-cpp/lib/HTTPLookupService.cc | 5 +- pulsar-client-cpp/lib/auth/AuthOauth2.cc | 42 +++++++++++---- pulsar-client-cpp/tests/AuthPluginTest.cc | 62 ++++++++++++++++++----- 9 files changed, 121 insertions(+), 41 deletions(-) diff --git a/pulsar-client-cpp/include/pulsar/Authentication.h b/pulsar-client-cpp/include/pulsar/Authentication.h index 991e35b..185ac33 100644 --- a/pulsar-client-cpp/include/pulsar/Authentication.h +++ b/pulsar-client-cpp/include/pulsar/Authentication.h @@ -98,7 +98,7 @@ class PULSAR_PUBLIC Authentication { * * @param[out] authDataContent the shared pointer of AuthenticationData. The content of AuthenticationData * is changed to the internal data of the current instance. - * @return ResultOk + * @return ResultOk or ResultAuthenticationError if authentication failed */ virtual Result getAuthData(AuthenticationDataPtr& authDataContent) { authDataContent = authData_; @@ -450,7 +450,7 @@ class CachedToken { /** * Get AuthenticationData from the current instance * - * @return ResultOk + * @return ResultOk or ResultAuthenticationError if authentication failed */ virtual AuthenticationDataPtr getAuthData() = 0; @@ -504,7 +504,7 @@ class PULSAR_PUBLIC AuthOauth2 : public Authentication { * * @param[out] authDataOauth2 the shared pointer of AuthenticationData. The content of AuthenticationData * is changed to the internal data of the current instance. - * @return ResultOk + * @return ResultOk or ResultAuthenticationError if authentication failed */ Result getAuthData(AuthenticationDataPtr& authDataOauth2); diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc index 069e0e2..c7b7b09 100644 --- a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc +++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc @@ -130,7 +130,7 @@ void BinaryProtoLookupService::sendPartitionMetadataLookupRequest(const std::str const ClientConnectionWeakPtr& clientCnx, LookupDataResultPromisePtr promise) { if (result != ResultOk) { - promise->setFailed(ResultConnectError); + promise->setFailed(result); Future<Result, LookupDataResultPtr> future = promise->getFuture(); return; } diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc index 82cb48c..3bcacc1 100644 --- a/pulsar-client-cpp/lib/ClientConnection.cc +++ b/pulsar-client-cpp/lib/ClientConnection.cc @@ -467,7 +467,14 @@ void ClientConnection::handleHandshake(const boost::system::error_code& err) { } bool connectingThroughProxy = logicalAddress_ != physicalAddress_; - SharedBuffer buffer = Commands::newConnect(authentication_, logicalAddress_, connectingThroughProxy); + Result result = ResultOk; + SharedBuffer buffer = + Commands::newConnect(authentication_, logicalAddress_, connectingThroughProxy, result); + if (result != ResultOk) { + LOG_ERROR(cnxString_ << "Failed to establish connection: " << result); + close(result); + return; + } // Send CONNECT command to broker asyncWrite(buffer.const_asio_buffer(), std::bind(&ClientConnection::handleSentPulsarConnect, shared_from_this(), std::placeholders::_1, buffer)); @@ -1133,7 +1140,13 @@ void ClientConnection::handleIncomingCommand() { case BaseCommand::AUTH_CHALLENGE: { LOG_DEBUG(cnxString_ << "Received auth challenge from broker"); - SharedBuffer buffer = Commands::newAuthResponse(authentication_); + Result result; + SharedBuffer buffer = Commands::newAuthResponse(authentication_, result); + if (result != ResultOk) { + LOG_ERROR(cnxString_ << "Failed to send auth response: " << result); + close(result); + break; + } asyncWrite(buffer.const_asio_buffer(), std::bind(&ClientConnection::handleSentAuthResponse, shared_from_this(), std::placeholders::_1, buffer)); @@ -1458,7 +1471,7 @@ void ClientConnection::handleConsumerStatsTimeout(const boost::system::error_cod startConsumerStatsTimer(consumerStatsRequests); } -void ClientConnection::close() { +void ClientConnection::close(Result result) { Lock lock(mutex_); if (isClosed()) { return; @@ -1515,7 +1528,7 @@ void ClientConnection::close() { HandlerBase::handleDisconnection(ResultConnectError, shared_from_this(), it->second); } - connectPromise_.setFailed(ResultConnectError); + connectPromise_.setFailed(result); // Fail all pending requests, all these type are map whose value type contains the Promise object for (auto& kv : pendingRequests) { diff --git a/pulsar-client-cpp/lib/ClientConnection.h b/pulsar-client-cpp/lib/ClientConnection.h index a20219c..cd88f6e 100644 --- a/pulsar-client-cpp/lib/ClientConnection.h +++ b/pulsar-client-cpp/lib/ClientConnection.h @@ -113,7 +113,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien */ void tcpConnectAsync(); - void close(); + void close(Result result = ResultConnectError); bool isClosed() const; diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc index 5b17d24..b90f5a8 100644 --- a/pulsar-client-cpp/lib/Commands.cc +++ b/pulsar-client-cpp/lib/Commands.cc @@ -209,7 +209,7 @@ PairSharedBuffer Commands::newSend(SharedBuffer& headers, BaseCommand& cmd, uint } SharedBuffer Commands::newConnect(const AuthenticationPtr& authentication, const std::string& logicalAddress, - bool connectingThroughProxy) { + bool connectingThroughProxy, Result& result) { BaseCommand cmd; cmd.set_type(BaseCommand::CONNECT); CommandConnect* connect = cmd.mutable_connect(); @@ -226,13 +226,18 @@ SharedBuffer Commands::newConnect(const AuthenticationPtr& authentication, const } AuthenticationDataPtr authDataContent; - if (authentication->getAuthData(authDataContent) == ResultOk && authDataContent->hasDataFromCommand()) { + result = authentication->getAuthData(authDataContent); + if (result != ResultOk) { + return SharedBuffer{}; + } + + if (authDataContent->hasDataFromCommand()) { connect->set_auth_data(authDataContent->getCommandData()); } return writeMessageWithSize(cmd); } -SharedBuffer Commands::newAuthResponse(const AuthenticationPtr& authentication) { +SharedBuffer Commands::newAuthResponse(const AuthenticationPtr& authentication, Result& result) { BaseCommand cmd; cmd.set_type(BaseCommand::AUTH_RESPONSE); CommandAuthResponse* authResponse = cmd.mutable_authresponse(); @@ -242,7 +247,12 @@ SharedBuffer Commands::newAuthResponse(const AuthenticationPtr& authentication) authData->set_auth_method_name(authentication->getAuthMethodName()); AuthenticationDataPtr authDataContent; - if (authentication->getAuthData(authDataContent) == ResultOk && authDataContent->hasDataFromCommand()) { + result = authentication->getAuthData(authDataContent); + if (result != ResultOk) { + return SharedBuffer{}; + } + + if (authDataContent->hasDataFromCommand()) { authData->set_auth_data(authDataContent->getCommandData()); } diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h index 02ebaad..c3d8343 100644 --- a/pulsar-client-cpp/lib/Commands.h +++ b/pulsar-client-cpp/lib/Commands.h @@ -70,9 +70,9 @@ class Commands { const static int checksumSize = 4; static SharedBuffer newConnect(const AuthenticationPtr& authentication, const std::string& logicalAddress, - bool connectingThroughProxy); + bool connectingThroughProxy, Result& result); - static SharedBuffer newAuthResponse(const AuthenticationPtr& authentication); + static SharedBuffer newAuthResponse(const AuthenticationPtr& authentication, Result& result); static SharedBuffer newPartitionMetadataRequest(const std::string& topic, uint64_t requestId); diff --git a/pulsar-client-cpp/lib/HTTPLookupService.cc b/pulsar-client-cpp/lib/HTTPLookupService.cc index e881ac4..45e56d0 100644 --- a/pulsar-client-cpp/lib/HTTPLookupService.cc +++ b/pulsar-client-cpp/lib/HTTPLookupService.cc @@ -190,10 +190,7 @@ Result HTTPLookupService::sendHTTPRequest(const std::string completeUrl, std::st AuthenticationDataPtr authDataContent; Result authResult = authenticationPtr_->getAuthData(authDataContent); if (authResult != ResultOk) { - LOG_ERROR( - "All Authentication methods should have AuthenticationData and return true on getAuthData for " - "url " - << completeUrl); + LOG_ERROR("Failed to getAuthData: " << authResult); curl_easy_cleanup(handle); return authResult; } diff --git a/pulsar-client-cpp/lib/auth/AuthOauth2.cc b/pulsar-client-cpp/lib/auth/AuthOauth2.cc index c7cc2bf..a9a1498 100644 --- a/pulsar-client-cpp/lib/auth/AuthOauth2.cc +++ b/pulsar-client-cpp/lib/auth/AuthOauth2.cc @@ -159,6 +159,10 @@ static size_t curlWriteCallback(void* contents, size_t size, size_t nmemb, void* } void ClientCredentialFlow::initialize() { + if (issuerUrl_.empty()) { + LOG_ERROR("Failed to initialize ClientCredentialFlow: issuer_url is not set"); + return; + } if (!keyFile_.isValid()) { return; } @@ -188,6 +192,9 @@ void ClientCredentialFlow::initialize() { curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 0L); curl_easy_setopt(handle, CURLOPT_SSL_VERIFYHOST, 0L); + char errorBuffer[CURL_ERROR_SIZE]; + curl_easy_setopt(handle, CURLOPT_ERRORBUFFER, errorBuffer); + // Make get call to server res = curl_easy_perform(handle); @@ -217,8 +224,8 @@ void ClientCredentialFlow::initialize() { } break; default: - LOG_ERROR("Response failed for getting the well-known configuration " << issuerUrl_ - << ". Error Code " << res); + LOG_ERROR("Response failed for getting the well-known configuration " + << issuerUrl_ << ". Error Code " << res << ": " << errorBuffer); break; } // Free header list @@ -282,6 +289,9 @@ Oauth2TokenResultPtr ClientCredentialFlow::authenticate() { curl_easy_setopt(handle, CURLOPT_POSTFIELDS, jsonBody.c_str()); + char errorBuffer[CURL_ERROR_SIZE]; + curl_easy_setopt(handle, CURLOPT_ERRORBUFFER, errorBuffer); + // Make get call to server res = curl_easy_perform(handle); @@ -302,19 +312,26 @@ Oauth2TokenResultPtr ClientCredentialFlow::authenticate() { break; } - resultPtr->setAccessToken(root.get<std::string>("access_token")); - resultPtr->setExpiresIn(root.get<uint32_t>("expires_in")); - - LOG_DEBUG("access_token: " << resultPtr->getAccessToken() - << " expires_in: " << resultPtr->getExpiresIn()); + resultPtr->setAccessToken(root.get<std::string>("access_token", "")); + resultPtr->setExpiresIn( + root.get<uint32_t>("expires_in", Oauth2TokenResult::undefined_expiration)); + resultPtr->setRefreshToken(root.get<std::string>("refresh_token", "")); + resultPtr->setIdToken(root.get<std::string>("id_token", "")); + + if (!resultPtr->getAccessToken().empty()) { + LOG_DEBUG("access_token: " << resultPtr->getAccessToken() + << " expires_in: " << resultPtr->getExpiresIn()); + } else { + LOG_ERROR("Response doesn't contain access_token, the response is: " << responseData); + } } else { LOG_ERROR("Response failed for issuerurl " << issuerUrl_ << ". response Code " << response_code << " passedin: " << jsonBody); } break; default: - LOG_ERROR("Response failed for issuerurl " << issuerUrl_ << ". Error Code " << res - << " passedin: " << jsonBody); + LOG_ERROR("Response failed for issuerurl " << issuerUrl_ << ". ErrorCode " << res << ": " + << errorBuffer << " passedin: " << jsonBody); break; } // Free header list @@ -362,7 +379,12 @@ const std::string AuthOauth2::getAuthMethodName() const { return "token"; } Result AuthOauth2::getAuthData(AuthenticationDataPtr& authDataContent) { if (cachedTokenPtr_ == nullptr || cachedTokenPtr_->isExpired()) { - cachedTokenPtr_ = CachedTokenPtr(new Oauth2CachedToken(flowPtr_->authenticate())); + try { + cachedTokenPtr_ = CachedTokenPtr(new Oauth2CachedToken(flowPtr_->authenticate())); + } catch (const std::runtime_error& e) { + // The real error logs have already been printed in authenticate() + return ResultAuthenticationError; + } } authDataContent = cachedTokenPtr_->getAuthData(); diff --git a/pulsar-client-cpp/tests/AuthPluginTest.cc b/pulsar-client-cpp/tests/AuthPluginTest.cc index d15a383..5794723 100644 --- a/pulsar-client-cpp/tests/AuthPluginTest.cc +++ b/pulsar-client-cpp/tests/AuthPluginTest.cc @@ -364,18 +364,10 @@ TEST(AuthPluginTest, testOauth2WrongSecret) { "client_secret": "rT7ps7WY8uhdVuBTKWZkttwLdQotmdEliaM5rLfmgNibvqziZ", "audience": "https://dev-kt-aa9ne.us.auth0.com/api/v2/"})"; - int expectedTokenLength = 3379; - LOG_INFO("PARAMS: " << params); - pulsar::AuthenticationPtr auth = pulsar::AuthOauth2::create(params); - ASSERT_EQ(auth->getAuthMethodName(), "token"); - - auth->getAuthData(data); - - FAIL() << "Expected fail for wrong secret when to get token from server"; - - } catch (...) { - // expected - } + LOG_INFO("PARAMS: " << params); + pulsar::AuthenticationPtr auth = pulsar::AuthOauth2::create(params); + ASSERT_EQ(auth->getAuthMethodName(), "token"); + ASSERT_EQ(auth->getAuthData(data), ResultAuthenticationError); } TEST(AuthPluginTest, testOauth2CredentialFile) { @@ -427,3 +419,49 @@ TEST(AuthPluginTest, testOauth2RequestBody) { ClientCredentialFlow flow2(params); ASSERT_EQ(flow2.generateJsonBody(), expectedJson); } + +TEST(AuthPluginTest, testOauth2Failure) { + ParamMap params; + auto addKeyValue = [&](const std::string& key, const std::string& value) { + params[key] = value; + LOG_INFO("Configure \"" << key << "\" to \"" << value << "\""); + }; + + auto createClient = [&]() -> Client { + ClientConfiguration conf; + conf.setAuth(AuthOauth2::create(params)); + return {"pulsar://localhost:6650", conf}; + }; + + const std::string topic = "AuthPluginTest-testOauth2Failure"; + Producer producer; + + // No issuer_url + auto client1 = createClient(); + ASSERT_EQ(client1.createProducer(topic, producer), ResultAuthenticationError); + client1.close(); + + // Invalid issuer_url + addKeyValue("issuer_url", "hello"); + auto client2 = createClient(); + ASSERT_EQ(client2.createProducer(topic, producer), ResultAuthenticationError); + client2.close(); + + addKeyValue("issuer_url", "https://google.com"); + auto client3 = createClient(); + ASSERT_EQ(client3.createProducer(topic, producer), ResultAuthenticationError); + client3.close(); + + // No client id and secret + addKeyValue("issuer_url", "https://dev-kt-aa9ne.us.auth0.com"); + auto client4 = createClient(); + ASSERT_EQ(client4.createProducer(topic, producer), ResultAuthenticationError); + client4.close(); + + // Invalid client_id and client_secret + addKeyValue("client_id", "my_id"); + addKeyValue("client_secret", "my-secret"); + auto client5 = createClient(); + ASSERT_EQ(client5.createProducer(topic, producer), ResultAuthenticationError); + client5.close(); +}