C0urante commented on code in PR #12846:
URL: https://github.com/apache/kafka/pull/12846#discussion_r1034907024


##########
connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java:
##########
@@ -174,4 +153,85 @@ public void handle(Callback[] callbacks) throws 
UnsupportedCallbackException {
                 ));
         }
     }
+
+    private void setSecurityContextForRequest(ContainerRequestContext 
requestContext, BasicAuthenticationCredential credential) {
+        requestContext.setSecurityContext(new SecurityContext() {
+            @Override
+            public Principal getUserPrincipal() {
+                return () -> credential.getUsername();
+            }
+
+            @Override
+            public boolean isUserInRole(String role) {
+                return false;
+            }
+
+            @Override
+            public boolean isSecure() {
+                String scheme = 
requestContext.getUriInfo().getRequestUri().getScheme();
+                return scheme != null && scheme.equalsIgnoreCase("https");
+            }
+
+            @Override
+            public String getAuthenticationScheme() {
+                return BASIC_AUTH;
+            }
+        });
+    }
+
+
+    public static class BasicAuthenticationCredential {
+        private String username;
+        private String password;
+
+        public BasicAuthenticationCredential(String authorizationHeader) {
+            final char colon = ':';
+            final char space = ' ';
+            final String authType = "basic";
+
+            initializeEmptyCredentials();
+
+            if (authorizationHeader == null) {
+                log.trace("No credentials were provided with the request");
+                return;
+            }
+
+            int spaceIndex = authorizationHeader.indexOf(space);
+            if (spaceIndex <= 0) {
+                log.trace("Request credentials were malformed; no space 
present in value for authorization header");
+                return;
+            }
+
+            String method = authorizationHeader.substring(0, spaceIndex);
+            if (!authType.equalsIgnoreCase(method)) {
+                log.trace("Request credentials used {} authentication, but 
only {} supported; ignoring", method, authType);
+                return;
+            }
+
+            authorizationHeader = authorizationHeader.substring(spaceIndex + 
1);
+            authorizationHeader = new 
String(Base64.getDecoder().decode(authorizationHeader),
+                    StandardCharsets.UTF_8);
+            int i = authorizationHeader.indexOf(colon);
+            if (i <= 0) {
+                log.trace("Request credentials were malformed; no colon 
present between username and password");
+                return;
+            }
+
+            this.username = authorizationHeader.substring(0, i);
+            this.password = authorizationHeader.substring(i + 1);
+        }
+
+        private void initializeEmptyCredentials() {
+            this.username = "";
+            this.password = "";
+        }
+
+        public String getUsername() {

Review Comment:
   Nit: we don't use the `get` prefix in this code base
   ```suggestion
           public String username() {
   ```



##########
connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java:
##########
@@ -174,4 +153,85 @@ public void handle(Callback[] callbacks) throws 
UnsupportedCallbackException {
                 ));
         }
     }
+
+    private void setSecurityContextForRequest(ContainerRequestContext 
requestContext, BasicAuthenticationCredential credential) {
+        requestContext.setSecurityContext(new SecurityContext() {
+            @Override
+            public Principal getUserPrincipal() {
+                return () -> credential.getUsername();
+            }
+
+            @Override
+            public boolean isUserInRole(String role) {
+                return false;
+            }
+
+            @Override
+            public boolean isSecure() {
+                String scheme = 
requestContext.getUriInfo().getRequestUri().getScheme();
+                return scheme != null && scheme.equalsIgnoreCase("https");
+            }
+
+            @Override
+            public String getAuthenticationScheme() {
+                return BASIC_AUTH;
+            }
+        });
+    }
+
+
+    public static class BasicAuthenticationCredential {
+        private String username;
+        private String password;
+
+        public BasicAuthenticationCredential(String authorizationHeader) {
+            final char colon = ':';
+            final char space = ' ';
+            final String authType = "basic";
+
+            initializeEmptyCredentials();
+
+            if (authorizationHeader == null) {
+                log.trace("No credentials were provided with the request");
+                return;
+            }
+
+            int spaceIndex = authorizationHeader.indexOf(space);
+            if (spaceIndex <= 0) {
+                log.trace("Request credentials were malformed; no space 
present in value for authorization header");
+                return;
+            }
+
+            String method = authorizationHeader.substring(0, spaceIndex);
+            if (!authType.equalsIgnoreCase(method)) {
+                log.trace("Request credentials used {} authentication, but 
only {} supported; ignoring", method, authType);
+                return;
+            }
+
+            authorizationHeader = authorizationHeader.substring(spaceIndex + 
1);
+            authorizationHeader = new 
String(Base64.getDecoder().decode(authorizationHeader),
+                    StandardCharsets.UTF_8);
+            int i = authorizationHeader.indexOf(colon);
+            if (i <= 0) {
+                log.trace("Request credentials were malformed; no colon 
present between username and password");
+                return;
+            }
+
+            this.username = authorizationHeader.substring(0, i);
+            this.password = authorizationHeader.substring(i + 1);
+        }
+
+        private void initializeEmptyCredentials() {
+            this.username = "";
+            this.password = "";
+        }
+
+        public String getUsername() {
+            return username;
+        }
+
+        public String getPassword() {

Review Comment:
   Same nit RE getters:
   ```suggestion
           public String password() {
   ```



##########
connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java:
##########
@@ -174,4 +153,85 @@ public void handle(Callback[] callbacks) throws 
UnsupportedCallbackException {
                 ));
         }
     }
+
+    private void setSecurityContextForRequest(ContainerRequestContext 
requestContext, BasicAuthenticationCredential credential) {
+        requestContext.setSecurityContext(new SecurityContext() {
+            @Override
+            public Principal getUserPrincipal() {
+                return () -> credential.getUsername();
+            }
+
+            @Override
+            public boolean isUserInRole(String role) {
+                return false;
+            }
+
+            @Override
+            public boolean isSecure() {
+                String scheme = 
requestContext.getUriInfo().getRequestUri().getScheme();
+                return scheme != null && scheme.equalsIgnoreCase("https");
+            }
+
+            @Override
+            public String getAuthenticationScheme() {
+                return BASIC_AUTH;
+            }
+        });
+    }
+
+
+    public static class BasicAuthenticationCredential {
+        private String username;
+        private String password;
+
+        public BasicAuthenticationCredential(String authorizationHeader) {
+            final char colon = ':';
+            final char space = ' ';

Review Comment:
   While we're in the neighborhood, WDYT about removing these declarations and 
using `':'` and `' '` inline instead? Seems like it might be more readable; if 
you feel the same, we can clean things up, and if not, fine to leave as-is.



##########
connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java:
##########
@@ -174,4 +153,85 @@ public void handle(Callback[] callbacks) throws 
UnsupportedCallbackException {
                 ));
         }
     }
+
+    private void setSecurityContextForRequest(ContainerRequestContext 
requestContext, BasicAuthenticationCredential credential) {
+        requestContext.setSecurityContext(new SecurityContext() {
+            @Override
+            public Principal getUserPrincipal() {
+                return () -> credential.getUsername();
+            }
+
+            @Override
+            public boolean isUserInRole(String role) {
+                return false;
+            }
+
+            @Override
+            public boolean isSecure() {
+                String scheme = 
requestContext.getUriInfo().getRequestUri().getScheme();
+                return scheme != null && scheme.equalsIgnoreCase("https");
+            }
+
+            @Override
+            public String getAuthenticationScheme() {
+                return BASIC_AUTH;
+            }
+        });
+    }
+
+
+    public static class BasicAuthenticationCredential {

Review Comment:
   Few nits:
   - We can downgrade the visibility of this class
   - I've never seen `credential` (singular) used to refer to a 
username/password combination
   - `Auth` instead of `Authentication` is fine
   ```suggestion
       // Visible for testing
       static class BasicAuthCredentials {
   ```



##########
connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java:
##########
@@ -211,6 +214,18 @@ public void testUnsupportedCallback() {
         assertThrows(ConnectException.class, () -> callbackHandler.handle(new 
Callback[] {unsupportedCallback}));
     }
 
+    @Test
+    public void testSecurityContextSet() throws IOException {
+        File credentialFile = setupPropertyLoginFile(true);
+        JaasBasicAuthFilter jaasBasicAuthFilter = 
setupJaasFilter("KafkaConnect", credentialFile.getPath());
+        ContainerRequestContext requestContext = setMock("Basic", "user1", 
"password1");
+        jaasBasicAuthFilter.filter(requestContext);
+
+        ArgumentCaptor<SecurityContext> argument = 
ArgumentCaptor.forClass(SecurityContext.class);
+        verify(requestContext).setSecurityContext(argument.capture());
+        assertEquals("user1", 
argument.getValue().getUserPrincipal().getName());

Review Comment:
   Is it worth adding coverage for the context's `isSecure` method as well?



##########
connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java:
##########
@@ -211,6 +214,18 @@ public void testUnsupportedCallback() {
         assertThrows(ConnectException.class, () -> callbackHandler.handle(new 
Callback[] {unsupportedCallback}));
     }
 
+    @Test
+    public void testSecurityContextSet() throws IOException {
+        File credentialFile = setupPropertyLoginFile(true);
+        JaasBasicAuthFilter jaasBasicAuthFilter = 
setupJaasFilter("KafkaConnect", credentialFile.getPath());
+        ContainerRequestContext requestContext = setMock("Basic", "user1", 
"password1");
+        jaasBasicAuthFilter.filter(requestContext);
+
+        ArgumentCaptor<SecurityContext> argument = 
ArgumentCaptor.forClass(SecurityContext.class);

Review Comment:
   Nit: `argument` isn't very descriptive; maybe `context`, `securityContext`, 
or `capturedContext` would be clearer?



##########
connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java:
##########
@@ -174,4 +153,84 @@ public void handle(Callback[] callbacks) throws 
UnsupportedCallbackException {
                 ));
         }
     }
+
+    private void setSecurityContextForRequest(ContainerRequestContext 
requestContext, BasicAuthenticationCredential credential) {
+        requestContext.setSecurityContext(new SecurityContext() {
+            @Override
+            public Principal getUserPrincipal() {
+                return () -> credential.getUsername();
+            }
+
+            @Override
+            public boolean isUserInRole(String role) {
+                return false;
+            }
+
+            @Override
+            public boolean isSecure() {
+                return 
requestContext.getUriInfo().getRequestUri().getScheme().equalsIgnoreCase("https");
+            }
+
+            @Override
+            public String getAuthenticationScheme() {
+                return BASIC_AUTH;
+            }
+        });
+    }
+
+
+    public static class BasicAuthenticationCredential {
+        private String username;
+        private String password;
+
+        public BasicAuthenticationCredential(String authorizationHeader) {
+            final char colon = ':';
+            final char space = ' ';
+            final String authType = "basic";
+
+            initializeEmptyCredentials();
+
+            if (authorizationHeader == null) {
+                log.trace("No credentials were provided with the request");
+                return;
+            }
+
+            int spaceIndex = authorizationHeader.indexOf(space);
+            if (spaceIndex <= 0) {
+                log.trace("Request credentials were malformed; no space 
present in value for authorization header");
+                return;
+            }
+
+            String method = authorizationHeader.substring(0, spaceIndex);
+            if (!authType.equalsIgnoreCase(method)) {
+                log.trace("Request credentials used {} authentication, but 
only {} supported; ignoring", method, authType);
+                return;
+            }
+
+            authorizationHeader = authorizationHeader.substring(spaceIndex + 
1);
+            authorizationHeader = new 
String(Base64.getDecoder().decode(authorizationHeader),
+                    StandardCharsets.UTF_8);
+            int i = authorizationHeader.indexOf(colon);
+            if (i <= 0) {
+                log.trace("Request credentials were malformed; no colon 
present between username and password");
+                return;
+            }
+
+            this.username = authorizationHeader.substring(0, i);
+            this.password = authorizationHeader.substring(i + 1);
+        }
+
+        private void initializeEmptyCredentials() {
+            this.username = "";
+            this.password = "";
+        }

Review Comment:
   IMO null is better here, especially since we use it as a sentinel value [in 
the `PropertyFileLoginModule` 
class](https://github.com/apache/kafka/blob/d3ee9341cc894ab14a3ec013c8a590b268438f71/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/PropertyFileLoginModule.java#L111-L113)
 to detect invalid credentials.
   
   I think we should make `username` and `password` final, and set them to 
`null` if the credentials are invalid (i.e., cannot be parsed or use an 
unsupported auth mechanism)



##########
connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java:
##########
@@ -174,4 +153,85 @@ public void handle(Callback[] callbacks) throws 
UnsupportedCallbackException {
                 ));
         }
     }
+
+    private void setSecurityContextForRequest(ContainerRequestContext 
requestContext, BasicAuthenticationCredential credential) {
+        requestContext.setSecurityContext(new SecurityContext() {
+            @Override
+            public Principal getUserPrincipal() {
+                return () -> credential.getUsername();
+            }
+
+            @Override
+            public boolean isUserInRole(String role) {
+                return false;
+            }
+
+            @Override
+            public boolean isSecure() {
+                String scheme = 
requestContext.getUriInfo().getRequestUri().getScheme();
+                return scheme != null && scheme.equalsIgnoreCase("https");

Review Comment:
   Nit: can invert the method call here to avoid a null check
   ```suggestion
                   return "https".equalsIgnoreCase(scheme);
   ```



##########
connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java:
##########
@@ -174,4 +153,85 @@ public void handle(Callback[] callbacks) throws 
UnsupportedCallbackException {
                 ));
         }
     }
+
+    private void setSecurityContextForRequest(ContainerRequestContext 
requestContext, BasicAuthenticationCredential credential) {
+        requestContext.setSecurityContext(new SecurityContext() {
+            @Override
+            public Principal getUserPrincipal() {
+                return () -> credential.getUsername();

Review Comment:
   Nit: can use a method reference here
   ```suggestion
                   return credential::getUsername;
   ```



##########
connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java:
##########
@@ -174,4 +153,84 @@ public void handle(Callback[] callbacks) throws 
UnsupportedCallbackException {
                 ));
         }
     }
+
+    private void setSecurityContextForRequest(ContainerRequestContext 
requestContext, BasicAuthenticationCredential credential) {
+        requestContext.setSecurityContext(new SecurityContext() {
+            @Override
+            public Principal getUserPrincipal() {
+                return () -> credential.getUsername();
+            }
+
+            @Override
+            public boolean isUserInRole(String role) {
+                return false;
+            }
+
+            @Override
+            public boolean isSecure() {
+                return 
requestContext.getUriInfo().getRequestUri().getScheme().equalsIgnoreCase("https");
+            }
+
+            @Override
+            public String getAuthenticationScheme() {
+                return BASIC_AUTH;
+            }
+        });
+    }
+
+
+    public static class BasicAuthenticationCredential {
+        private String username;
+        private String password;
+
+        public BasicAuthenticationCredential(String authorizationHeader) {
+            final char colon = ':';
+            final char space = ' ';
+            final String authType = "basic";

Review Comment:
   Looks like this is still applicable?
   
   IMO it'd be clearest if we removed the `authType` field altogether, 
statically imported `SecurityContext.BASIC_AUTH`, and used that inline wherever 
applicable.



##########
connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java:
##########
@@ -111,41 +119,12 @@ private boolean isInternalRequest(ContainerRequestContext 
requestContext) {
 
     public static class BasicAuthCallBackHandler implements CallbackHandler {
 
-        private static final String BASIC = "basic";
-        private static final char COLON = ':';
-        private static final char SPACE = ' ';
         private String username;
         private String password;

Review Comment:
   Nit: while we're in the neighborhood, would you mind making these final?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to