This is an automated email from the ASF dual-hosted git repository.

guangning pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new 522af14  [C++] Handle OAuth 2.0 exceptional cases gracefully (#12335)
522af14 is described below

commit 522af14a098a05862e61f19228a7b291fcd7cba6
Author: Yunze Xu <xyzinfern...@163.com>
AuthorDate: Wed Oct 13 11:05:26 2021 +0800

    [C++] Handle OAuth 2.0 exceptional cases gracefully (#12335)
    
    Fixes #12324
    
    Currently if any error happened during OAuth 2.0 authentication in C++ 
client, a runtime error would be thrown and could only be caught when creating 
an `AuthOauth` object, but could not be caught in `Client`'s method like 
`createProducer`. It's not graceful. What's worse, there's no way for Python 
client that is a wrapper of C++ client to caught this exception.
    
    When `ClientCredentialFlow::authenticate` returns an invalid 
`Oauth2TokenResult`, catch the `runtime_error` thrown in `Oauth2CachedToken`'s 
constructor and returns `ResultAuthenticationError` as 
`AuthOauth2::getAuthData`'s returned value. Since `getAuthData` always returns 
`ResultOk` before this PR, the related docs are also modified.
    
    Then when a CONNECT or AUTH_RESPONSE command is created, expose the result 
of `getAuthData`. If it's not `ResultOk`, close the connection and complete the 
connection's future with the result. After that, the `Client`'s API will be 
completed with the result.
    
    In addition, this PR also makes the error code of libcurl human readable by 
configuring `CURLOPT_ERRORBUFFER`.
    
    - [x] Make sure that the change passes the CI checks.
    
    This change added tests `AuthPluginTest.testOauth2Failure` to verify when 
OAuth 2.0 authentication failed, the `createProducer` would return 
`ResultAuthenticationError` without any exception thrown.
    
    (cherry picked from commit 06b68bb1d5859fd969f66c29888feda271817ec5)
---
 pulsar-client-cpp/include/pulsar/Authentication.h |  6 +--
 pulsar-client-cpp/lib/BinaryProtoLookupService.cc |  2 +-
 pulsar-client-cpp/lib/ClientConnection.cc         | 21 ++++++--
 pulsar-client-cpp/lib/ClientConnection.h          |  2 +-
 pulsar-client-cpp/lib/Commands.cc                 | 18 +++++--
 pulsar-client-cpp/lib/Commands.h                  |  4 +-
 pulsar-client-cpp/lib/HTTPLookupService.cc        |  5 +-
 pulsar-client-cpp/lib/auth/AuthOauth2.cc          | 42 +++++++++++----
 pulsar-client-cpp/tests/AuthPluginTest.cc         | 62 ++++++++++++++++++-----
 9 files changed, 121 insertions(+), 41 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/Authentication.h 
b/pulsar-client-cpp/include/pulsar/Authentication.h
index 991e35b..185ac33 100644
--- a/pulsar-client-cpp/include/pulsar/Authentication.h
+++ b/pulsar-client-cpp/include/pulsar/Authentication.h
@@ -98,7 +98,7 @@ class PULSAR_PUBLIC Authentication {
      *
      * @param[out] authDataContent the shared pointer of AuthenticationData. 
The content of AuthenticationData
      * is changed to the internal data of the current instance.
-     * @return ResultOk
+     * @return ResultOk or ResultAuthenticationError if authentication failed
      */
     virtual Result getAuthData(AuthenticationDataPtr& authDataContent) {
         authDataContent = authData_;
@@ -450,7 +450,7 @@ class CachedToken {
     /**
      * Get AuthenticationData from the current instance
      *
-     * @return ResultOk
+     * @return ResultOk or ResultAuthenticationError if authentication failed
      */
     virtual AuthenticationDataPtr getAuthData() = 0;
 
@@ -504,7 +504,7 @@ class PULSAR_PUBLIC AuthOauth2 : public Authentication {
      *
      * @param[out] authDataOauth2 the shared pointer of AuthenticationData. 
The content of AuthenticationData
      * is changed to the internal data of the current instance.
-     * @return ResultOk
+     * @return ResultOk or ResultAuthenticationError if authentication failed
      */
     Result getAuthData(AuthenticationDataPtr& authDataOauth2);
 
diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc 
b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
index 069e0e2..c7b7b09 100644
--- a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
+++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
@@ -130,7 +130,7 @@ void 
BinaryProtoLookupService::sendPartitionMetadataLookupRequest(const std::str
                                                                   const 
ClientConnectionWeakPtr& clientCnx,
                                                                   
LookupDataResultPromisePtr promise) {
     if (result != ResultOk) {
-        promise->setFailed(ResultConnectError);
+        promise->setFailed(result);
         Future<Result, LookupDataResultPtr> future = promise->getFuture();
         return;
     }
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc 
b/pulsar-client-cpp/lib/ClientConnection.cc
index 82cb48c..3bcacc1 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -467,7 +467,14 @@ void ClientConnection::handleHandshake(const 
boost::system::error_code& err) {
     }
 
     bool connectingThroughProxy = logicalAddress_ != physicalAddress_;
-    SharedBuffer buffer = Commands::newConnect(authentication_, 
logicalAddress_, connectingThroughProxy);
+    Result result = ResultOk;
+    SharedBuffer buffer =
+        Commands::newConnect(authentication_, logicalAddress_, 
connectingThroughProxy, result);
+    if (result != ResultOk) {
+        LOG_ERROR(cnxString_ << "Failed to establish connection: " << result);
+        close(result);
+        return;
+    }
     // Send CONNECT command to broker
     asyncWrite(buffer.const_asio_buffer(), 
std::bind(&ClientConnection::handleSentPulsarConnect,
                                                      shared_from_this(), 
std::placeholders::_1, buffer));
@@ -1133,7 +1140,13 @@ void ClientConnection::handleIncomingCommand() {
                 case BaseCommand::AUTH_CHALLENGE: {
                     LOG_DEBUG(cnxString_ << "Received auth challenge from 
broker");
 
-                    SharedBuffer buffer = 
Commands::newAuthResponse(authentication_);
+                    Result result;
+                    SharedBuffer buffer = 
Commands::newAuthResponse(authentication_, result);
+                    if (result != ResultOk) {
+                        LOG_ERROR(cnxString_ << "Failed to send auth response: 
" << result);
+                        close(result);
+                        break;
+                    }
                     asyncWrite(buffer.const_asio_buffer(),
                                
std::bind(&ClientConnection::handleSentAuthResponse, shared_from_this(),
                                          std::placeholders::_1, buffer));
@@ -1458,7 +1471,7 @@ void ClientConnection::handleConsumerStatsTimeout(const 
boost::system::error_cod
     startConsumerStatsTimer(consumerStatsRequests);
 }
 
-void ClientConnection::close() {
+void ClientConnection::close(Result result) {
     Lock lock(mutex_);
     if (isClosed()) {
         return;
@@ -1515,7 +1528,7 @@ void ClientConnection::close() {
         HandlerBase::handleDisconnection(ResultConnectError, 
shared_from_this(), it->second);
     }
 
-    connectPromise_.setFailed(ResultConnectError);
+    connectPromise_.setFailed(result);
 
     // Fail all pending requests, all these type are map whose value type 
contains the Promise object
     for (auto& kv : pendingRequests) {
diff --git a/pulsar-client-cpp/lib/ClientConnection.h 
b/pulsar-client-cpp/lib/ClientConnection.h
index a20219c..cd88f6e 100644
--- a/pulsar-client-cpp/lib/ClientConnection.h
+++ b/pulsar-client-cpp/lib/ClientConnection.h
@@ -113,7 +113,7 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
      */
     void tcpConnectAsync();
 
-    void close();
+    void close(Result result = ResultConnectError);
 
     bool isClosed() const;
 
diff --git a/pulsar-client-cpp/lib/Commands.cc 
b/pulsar-client-cpp/lib/Commands.cc
index 5b17d24..b90f5a8 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -209,7 +209,7 @@ PairSharedBuffer Commands::newSend(SharedBuffer& headers, 
BaseCommand& cmd, uint
 }
 
 SharedBuffer Commands::newConnect(const AuthenticationPtr& authentication, 
const std::string& logicalAddress,
-                                  bool connectingThroughProxy) {
+                                  bool connectingThroughProxy, Result& result) 
{
     BaseCommand cmd;
     cmd.set_type(BaseCommand::CONNECT);
     CommandConnect* connect = cmd.mutable_connect();
@@ -226,13 +226,18 @@ SharedBuffer Commands::newConnect(const 
AuthenticationPtr& authentication, const
     }
 
     AuthenticationDataPtr authDataContent;
-    if (authentication->getAuthData(authDataContent) == ResultOk && 
authDataContent->hasDataFromCommand()) {
+    result = authentication->getAuthData(authDataContent);
+    if (result != ResultOk) {
+        return SharedBuffer{};
+    }
+
+    if (authDataContent->hasDataFromCommand()) {
         connect->set_auth_data(authDataContent->getCommandData());
     }
     return writeMessageWithSize(cmd);
 }
 
-SharedBuffer Commands::newAuthResponse(const AuthenticationPtr& 
authentication) {
+SharedBuffer Commands::newAuthResponse(const AuthenticationPtr& 
authentication, Result& result) {
     BaseCommand cmd;
     cmd.set_type(BaseCommand::AUTH_RESPONSE);
     CommandAuthResponse* authResponse = cmd.mutable_authresponse();
@@ -242,7 +247,12 @@ SharedBuffer Commands::newAuthResponse(const 
AuthenticationPtr& authentication)
     authData->set_auth_method_name(authentication->getAuthMethodName());
 
     AuthenticationDataPtr authDataContent;
-    if (authentication->getAuthData(authDataContent) == ResultOk && 
authDataContent->hasDataFromCommand()) {
+    result = authentication->getAuthData(authDataContent);
+    if (result != ResultOk) {
+        return SharedBuffer{};
+    }
+
+    if (authDataContent->hasDataFromCommand()) {
         authData->set_auth_data(authDataContent->getCommandData());
     }
 
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index 02ebaad..c3d8343 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -70,9 +70,9 @@ class Commands {
     const static int checksumSize = 4;
 
     static SharedBuffer newConnect(const AuthenticationPtr& authentication, 
const std::string& logicalAddress,
-                                   bool connectingThroughProxy);
+                                   bool connectingThroughProxy, Result& 
result);
 
-    static SharedBuffer newAuthResponse(const AuthenticationPtr& 
authentication);
+    static SharedBuffer newAuthResponse(const AuthenticationPtr& 
authentication, Result& result);
 
     static SharedBuffer newPartitionMetadataRequest(const std::string& topic, 
uint64_t requestId);
 
diff --git a/pulsar-client-cpp/lib/HTTPLookupService.cc 
b/pulsar-client-cpp/lib/HTTPLookupService.cc
index e881ac4..45e56d0 100644
--- a/pulsar-client-cpp/lib/HTTPLookupService.cc
+++ b/pulsar-client-cpp/lib/HTTPLookupService.cc
@@ -190,10 +190,7 @@ Result HTTPLookupService::sendHTTPRequest(const 
std::string completeUrl, std::st
     AuthenticationDataPtr authDataContent;
     Result authResult = authenticationPtr_->getAuthData(authDataContent);
     if (authResult != ResultOk) {
-        LOG_ERROR(
-            "All Authentication methods should have AuthenticationData and 
return true on getAuthData for "
-            "url "
-            << completeUrl);
+        LOG_ERROR("Failed to getAuthData: " << authResult);
         curl_easy_cleanup(handle);
         return authResult;
     }
diff --git a/pulsar-client-cpp/lib/auth/AuthOauth2.cc 
b/pulsar-client-cpp/lib/auth/AuthOauth2.cc
index c7cc2bf..a9a1498 100644
--- a/pulsar-client-cpp/lib/auth/AuthOauth2.cc
+++ b/pulsar-client-cpp/lib/auth/AuthOauth2.cc
@@ -159,6 +159,10 @@ static size_t curlWriteCallback(void* contents, size_t 
size, size_t nmemb, void*
 }
 
 void ClientCredentialFlow::initialize() {
+    if (issuerUrl_.empty()) {
+        LOG_ERROR("Failed to initialize ClientCredentialFlow: issuer_url is 
not set");
+        return;
+    }
     if (!keyFile_.isValid()) {
         return;
     }
@@ -188,6 +192,9 @@ void ClientCredentialFlow::initialize() {
     curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 0L);
     curl_easy_setopt(handle, CURLOPT_SSL_VERIFYHOST, 0L);
 
+    char errorBuffer[CURL_ERROR_SIZE];
+    curl_easy_setopt(handle, CURLOPT_ERRORBUFFER, errorBuffer);
+
     // Make get call to server
     res = curl_easy_perform(handle);
 
@@ -217,8 +224,8 @@ void ClientCredentialFlow::initialize() {
             }
             break;
         default:
-            LOG_ERROR("Response failed for getting the well-known 
configuration " << issuerUrl_
-                                                                               
   << ". Error Code " << res);
+            LOG_ERROR("Response failed for getting the well-known 
configuration "
+                      << issuerUrl_ << ". Error Code " << res << ": " << 
errorBuffer);
             break;
     }
     // Free header list
@@ -282,6 +289,9 @@ Oauth2TokenResultPtr ClientCredentialFlow::authenticate() {
 
     curl_easy_setopt(handle, CURLOPT_POSTFIELDS, jsonBody.c_str());
 
+    char errorBuffer[CURL_ERROR_SIZE];
+    curl_easy_setopt(handle, CURLOPT_ERRORBUFFER, errorBuffer);
+
     // Make get call to server
     res = curl_easy_perform(handle);
 
@@ -302,19 +312,26 @@ Oauth2TokenResultPtr ClientCredentialFlow::authenticate() 
{
                     break;
                 }
 
-                
resultPtr->setAccessToken(root.get<std::string>("access_token"));
-                resultPtr->setExpiresIn(root.get<uint32_t>("expires_in"));
-
-                LOG_DEBUG("access_token: " << resultPtr->getAccessToken()
-                                           << " expires_in: " << 
resultPtr->getExpiresIn());
+                
resultPtr->setAccessToken(root.get<std::string>("access_token", ""));
+                resultPtr->setExpiresIn(
+                    root.get<uint32_t>("expires_in", 
Oauth2TokenResult::undefined_expiration));
+                
resultPtr->setRefreshToken(root.get<std::string>("refresh_token", ""));
+                resultPtr->setIdToken(root.get<std::string>("id_token", ""));
+
+                if (!resultPtr->getAccessToken().empty()) {
+                    LOG_DEBUG("access_token: " << resultPtr->getAccessToken()
+                                               << " expires_in: " << 
resultPtr->getExpiresIn());
+                } else {
+                    LOG_ERROR("Response doesn't contain access_token, the 
response is: " << responseData);
+                }
             } else {
                 LOG_ERROR("Response failed for issuerurl " << issuerUrl_ << ". 
response Code "
                                                            << response_code << 
" passedin: " << jsonBody);
             }
             break;
         default:
-            LOG_ERROR("Response failed for issuerurl " << issuerUrl_ << ". 
Error Code " << res
-                                                       << " passedin: " << 
jsonBody);
+            LOG_ERROR("Response failed for issuerurl " << issuerUrl_ << ". 
ErrorCode " << res << ": "
+                                                       << errorBuffer << " 
passedin: " << jsonBody);
             break;
     }
     // Free header list
@@ -362,7 +379,12 @@ const std::string AuthOauth2::getAuthMethodName() const { 
return "token"; }
 
 Result AuthOauth2::getAuthData(AuthenticationDataPtr& authDataContent) {
     if (cachedTokenPtr_ == nullptr || cachedTokenPtr_->isExpired()) {
-        cachedTokenPtr_ = CachedTokenPtr(new 
Oauth2CachedToken(flowPtr_->authenticate()));
+        try {
+            cachedTokenPtr_ = CachedTokenPtr(new 
Oauth2CachedToken(flowPtr_->authenticate()));
+        } catch (const std::runtime_error& e) {
+            // The real error logs have already been printed in authenticate()
+            return ResultAuthenticationError;
+        }
     }
 
     authDataContent = cachedTokenPtr_->getAuthData();
diff --git a/pulsar-client-cpp/tests/AuthPluginTest.cc 
b/pulsar-client-cpp/tests/AuthPluginTest.cc
index d15a383..5794723 100644
--- a/pulsar-client-cpp/tests/AuthPluginTest.cc
+++ b/pulsar-client-cpp/tests/AuthPluginTest.cc
@@ -364,18 +364,10 @@ TEST(AuthPluginTest, testOauth2WrongSecret) {
         "client_secret": "rT7ps7WY8uhdVuBTKWZkttwLdQotmdEliaM5rLfmgNibvqziZ",
         "audience": "https://dev-kt-aa9ne.us.auth0.com/api/v2/"})";
 
-        int expectedTokenLength = 3379;
-        LOG_INFO("PARAMS: " << params);
-        pulsar::AuthenticationPtr auth = pulsar::AuthOauth2::create(params);
-        ASSERT_EQ(auth->getAuthMethodName(), "token");
-
-        auth->getAuthData(data);
-
-        FAIL() << "Expected fail for wrong secret when to get token from 
server";
-
-    } catch (...) {
-        // expected
-    }
+    LOG_INFO("PARAMS: " << params);
+    pulsar::AuthenticationPtr auth = pulsar::AuthOauth2::create(params);
+    ASSERT_EQ(auth->getAuthMethodName(), "token");
+    ASSERT_EQ(auth->getAuthData(data), ResultAuthenticationError);
 }
 
 TEST(AuthPluginTest, testOauth2CredentialFile) {
@@ -427,3 +419,49 @@ TEST(AuthPluginTest, testOauth2RequestBody) {
     ClientCredentialFlow flow2(params);
     ASSERT_EQ(flow2.generateJsonBody(), expectedJson);
 }
+
+TEST(AuthPluginTest, testOauth2Failure) {
+    ParamMap params;
+    auto addKeyValue = [&](const std::string& key, const std::string& value) {
+        params[key] = value;
+        LOG_INFO("Configure \"" << key << "\" to \"" << value << "\"");
+    };
+
+    auto createClient = [&]() -> Client {
+        ClientConfiguration conf;
+        conf.setAuth(AuthOauth2::create(params));
+        return {"pulsar://localhost:6650", conf};
+    };
+
+    const std::string topic = "AuthPluginTest-testOauth2Failure";
+    Producer producer;
+
+    // No issuer_url
+    auto client1 = createClient();
+    ASSERT_EQ(client1.createProducer(topic, producer), 
ResultAuthenticationError);
+    client1.close();
+
+    // Invalid issuer_url
+    addKeyValue("issuer_url", "hello");
+    auto client2 = createClient();
+    ASSERT_EQ(client2.createProducer(topic, producer), 
ResultAuthenticationError);
+    client2.close();
+
+    addKeyValue("issuer_url", "https://google.com";);
+    auto client3 = createClient();
+    ASSERT_EQ(client3.createProducer(topic, producer), 
ResultAuthenticationError);
+    client3.close();
+
+    // No client id and secret
+    addKeyValue("issuer_url", "https://dev-kt-aa9ne.us.auth0.com";);
+    auto client4 = createClient();
+    ASSERT_EQ(client4.createProducer(topic, producer), 
ResultAuthenticationError);
+    client4.close();
+
+    // Invalid client_id and client_secret
+    addKeyValue("client_id", "my_id");
+    addKeyValue("client_secret", "my-secret");
+    auto client5 = createClient();
+    ASSERT_EQ(client5.createProducer(topic, producer), 
ResultAuthenticationError);
+    client5.close();
+}

Reply via email to