jiazhai commented on a change in pull request #3677: PIP-30: interface and 
mutual change authentication
URL: https://github.com/apache/pulsar/pull/3677#discussion_r262337142
 
 

 ##########
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
 ##########
 @@ -446,52 +453,128 @@ private String getOriginalPrincipal(String 
originalAuthData, String originalAuth
         return originalPrincipal;
     }
 
+    // complete the connect and sent newConnected command
+    private void completeConnect(int clientProtoVersion, String clientVersion) 
{
+        ctx.writeAndFlush(Commands.newConnected(clientProtoVersion));
+        state = State.Connected;
+        remoteEndpointProtocolVersion = clientProtoVersion;
+        if (isNotBlank(clientVersion) && !clientVersion.contains(" ") /* 
ignore default version: pulsar client */) {
+            this.clientVersion = clientVersion.intern();
+        }
+    }
+
+    // According to auth result, send newConnected or newAuthChallenge command.
+    private void doingAuthentication(AuthData clientData,
+                                     int clientProtocolVersion,
+                                     String clientVersion) throws Exception {
+        AuthData brokerData = authState.authenticate(clientData);
+        // authentication has completed, will send newConnected command.
+        if (authState.isComplete()) {
+            authRole = authState.getAuthRole();
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Client successfully authenticated with {} role 
{} and originalPrincipal {}",
+                    remoteAddress, authMethod, authRole, originalPrincipal);
+            }
+            completeConnect(clientProtocolVersion, clientVersion);
+            return;
+        }
+
+        // auth not complete, continue auth with client side.
+        ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData, 
clientProtocolVersion));
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Authentication in progress client by method {}.",
+                remoteAddress, authMethod);
+        }
+        state = State.Connecting;
+        return;
+    }
+
     @Override
     protected void handleConnect(CommandConnect connect) {
         checkArgument(state == State.Start);
-        if (service.isAuthenticationEnabled()) {
-            try {
-                String authMethod = "none";
-                if (connect.hasAuthMethodName()) {
-                    authMethod = connect.getAuthMethodName();
-                } else if (connect.hasAuthMethod()) {
-                    // Legacy client is passing enum
-                    authMethod = 
connect.getAuthMethod().name().substring(10).toLowerCase();
-                }
 
-                String authData = connect.getAuthData().toStringUtf8();
-                ChannelHandler sslHandler = 
ctx.channel().pipeline().get(PulsarChannelInitializer.TLS_HANDLER);
-                SSLSession sslSession = null;
-                if (sslHandler != null) {
-                    sslSession = ((SslHandler) 
sslHandler).engine().getSession();
-                }
-                originalPrincipal = getOriginalPrincipal(
-                        connect.hasOriginalAuthData() ? 
connect.getOriginalAuthData() : null,
-                        connect.hasOriginalAuthMethod() ? 
connect.getOriginalAuthMethod() : null,
-                        connect.hasOriginalPrincipal() ? 
connect.getOriginalPrincipal() : null,
-                        sslSession);
-                authenticationData = new AuthenticationDataCommand(authData, 
remoteAddress, sslSession);
-                authRole = getBrokerService().getAuthenticationService()
-                        .authenticate(authenticationData, authMethod);
-
-                log.info("[{}] Client successfully authenticated with {} role 
{} and originalPrincipal {}", remoteAddress, authMethod, authRole, 
originalPrincipal);
-            } catch (AuthenticationException e) {
-                String msg = "Unable to authenticate";
-                log.warn("[{}] {}: {}", remoteAddress, msg, e.getMessage());
-                ctx.writeAndFlush(Commands.newError(-1, 
ServerError.AuthenticationError, msg));
-                close();
+        if (log.isDebugEnabled()) {
+            log.debug("Received CONNECT from {}, auth enabled: {}",
+                remoteAddress, service.isAuthenticationEnabled());
+        }
+
+        String clientVersion = connect.getClientVersion();
+        int clientProtocolVersion = connect.getProtocolVersion();
+
+        if (!service.isAuthenticationEnabled()) {
+            completeConnect(clientProtocolVersion, clientVersion);
+            return;
+        }
+
+        try {
+            AuthData clientData = 
AuthData.of(connect.getAuthData().toByteArray());
+
+            // init authentication
+            if (connect.hasAuthMethodName()) {
+                authMethod = connect.getAuthMethodName();
+            } else if (connect.hasAuthMethod()) {
+                // Legacy client is passing enum
+                authMethod = 
connect.getAuthMethod().name().substring(10).toLowerCase();
+            } else {
+                authMethod = "none";
+            }
+
+            authenticationProvider = getBrokerService()
+                .getAuthenticationService()
+                .getAuthenticationProvider(authMethod);
+
+            // Not find provider named authMethod. Most used for tests.
+            // In AuthenticationDisabled, it will set authMethod "none".
+            if (authenticationProvider == null) {
+                authRole = 
getBrokerService().getAuthenticationService().getAnonymousUserRole();
+                completeConnect(clientProtocolVersion, clientVersion);
                 return;
             }
+
+            // init authState and other var
+            ChannelHandler sslHandler = 
ctx.channel().pipeline().get(PulsarChannelInitializer.TLS_HANDLER);
+            SSLSession sslSession = null;
+            if (sslHandler != null) {
+                sslSession = ((SslHandler) sslHandler).engine().getSession();
+            }
+            originalPrincipal = getOriginalPrincipal(
+                connect.hasOriginalAuthData() ? connect.getOriginalAuthData() 
: null,
+                connect.hasOriginalAuthMethod() ? 
connect.getOriginalAuthMethod() : null,
+                connect.hasOriginalPrincipal() ? 
connect.getOriginalPrincipal() : null,
+                sslSession);
+
+            authenticationData = 
authenticationProvider.newAuthDataSource(clientData, remoteAddress, sslSession);
 
 Review comment:
   Yes, Ivan In authentication, AuthState is enough, and no place requires to 
get authenticationData.
   There are a lot of places in authorization code that reference the 
authenticationData, like in this file and also broker/consumer.java and 
broker/producer.java. 
   Tried to remove this, but this would involve a lot of changes, to make unit 
tests passed. Would like to keep this.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to