michaeljmarshall commented on code in PR #19292:
URL: https://github.com/apache/pulsar/pull/19292#discussion_r1086824187


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -423,31 +421,65 @@ public void brokerConnected(DirectProxyHandler 
directProxyHandler, CommandConnec
     // According to auth result, send newConnected or newAuthChallenge command.
     private void doAuthentication(AuthData clientData)
             throws Exception {
-        AuthData brokerData = authState.authenticate(clientData);
-        // authentication has completed, will send newConnected command.
-        if (authState.isComplete()) {
-            clientAuthRole = authState.getAuthRole();
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("[{}] Client successfully authenticated with {} role 
{}",
-                        remoteAddress, authMethod, clientAuthRole);
+        CompletableFuture<AuthData> authChallengeFuture = 
authState.authenticateAsync(clientData);
+        if (authChallengeFuture.isDone()) {
+            if (!authChallengeFuture.isCompletedExceptionally()) {
+                authChallengeSuccessCallback(authChallengeFuture.get());
+            } else {
+                try {
+                    authChallengeFuture.get();
+                } catch (ExecutionException e) {
+                    authenticationFailedCallback(e.getCause());
+                }
             }
+        } else {
+            state = State.Connecting;
+            authChallengeFuture.whenCompleteAsync((authChallenge, throwable) 
-> {
+                if (throwable == null) {
+                    authChallengeSuccessCallback(authChallenge);
+                } else {
+                    authenticationFailedCallback(throwable);
+                }
+            }, ctx.executor());
+        }
+    }
+
+    protected void authenticationFailedCallback(Throwable t) {
+        LOG.warn("[{}] Unable to authenticate: ", remoteAddress, t);
+        final ByteBuf msg = Commands.newError(-1, 
ServerError.AuthenticationError, "Failed to authenticate");
+        writeAndFlushAndClose(msg);
+    }
+
+    // Always run in this class's event loop.
+    protected void authChallengeSuccessCallback(AuthData authChallenge) {
+        try {
+            // authentication has completed, will send newConnected command.
+            if (authChallenge == null) {
+                clientAuthRole = authState.getAuthRole();
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("[{}] Client successfully authenticated with {} 
role {}",
+                            remoteAddress, authMethod, clientAuthRole);
+                }
 
-            // First connection
-            if (this.connectionPool == null || state == State.Connecting) {
-                // authentication has completed, will send newConnected 
command.
-                completeConnect(clientData);
+                // First connection
+                if (this.connectionPool == null || state == State.Connecting) {
+                    // authentication has completed, will send newConnected 
command.
+                    completeConnect();
+                }
+                return;
             }
-            return;
-        }
 
-        // auth not complete, continue auth with client side.
-        final ByteBuf msg = Commands.newAuthChallenge(authMethod, brokerData, 
protocolVersionToAdvertise);
-        writeAndFlush(msg);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("[{}] Authentication in progress client by method {}.",
-                    remoteAddress, authMethod);
+            // auth not complete, continue auth with client side.
+            final ByteBuf msg = Commands.newAuthChallenge(authMethod, 
authChallenge, protocolVersionToAdvertise);
+            writeAndFlush(msg);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("[{}] Authentication in progress client by method 
{}.",
+                        remoteAddress, authMethod);
+            }
+            state = State.Connecting;
+        } catch (Exception e) {

Review Comment:
   I wrote it this way because we are in a callback and if we miss the 
exception, we will leak the connection forever. If we think we should only 
catch more specific exceptions, I can update it to catch 
`AuthenticationExcpetion` and `RuntimeException`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to