This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f97646488c KAFKA-13651; Add audit logging to `StandardAuthorizer` 
(#12031)
f97646488c is described below

commit f97646488cff1984455ffb1fe9a147a522e6ac76
Author: Jason Gustafson <[email protected]>
AuthorDate: Wed Apr 13 10:33:15 2022 -0700

    KAFKA-13651; Add audit logging to `StandardAuthorizer` (#12031)
    
    This patch adds audit support through the kafka.authorizer.logger logger to 
StandardAuthorizer. It
    follows the same conventions as AclAuthorizer with a similarly formatted 
log message. When
    logIfAllowed is set in the Action, then the log message is at DEBUG level; 
otherwise, we log at
    trace. When logIfDenied is set, then the log message is at INFO level; 
otherwise, we again log at
    TRACE.
    
    Reviewers: Colin P. McCabe <[email protected]>
---
 build.gradle                                       |   2 +
 .../kafka/common/resource/ResourcePattern.java     |   2 +-
 .../kafka/metadata/authorizer/StandardAcl.java     |  12 +
 .../authorizer/StandardAuthorizerData.java         | 306 ++++++++++++++++-----
 .../authorizer/StandardAuthorizerTest.java         |  94 ++++++-
 5 files changed, 347 insertions(+), 69 deletions(-)

diff --git a/build.gradle b/build.gradle
index 7ecfa3e07e..f7fe0ea046 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1136,6 +1136,8 @@ project(':metadata') {
     compileOnly libs.log4j
     testImplementation libs.junitJupiter
     testImplementation libs.hamcrest
+    testImplementation libs.mockitoCore
+    testImplementation libs.mockitoInline
     testImplementation libs.slf4jlog4j
     testImplementation project(':clients').sourceSets.test.output
     testImplementation project(':raft').sourceSets.test.output
diff --git 
a/clients/src/main/java/org/apache/kafka/common/resource/ResourcePattern.java 
b/clients/src/main/java/org/apache/kafka/common/resource/ResourcePattern.java
index 2b7504f70a..b3dfc4937f 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/resource/ResourcePattern.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/resource/ResourcePattern.java
@@ -89,7 +89,7 @@ public class ResourcePattern {
 
     @Override
     public String toString() {
-        return "ResourcePattern(resourceType=" + resourceType + ", name=" + 
((name == null) ? "<any>" : name) + ", patternType=" + patternType + ")";
+        return "ResourcePattern(resourceType=" + resourceType + ", name=" + 
name + ", patternType=" + patternType + ")";
     }
 
     /**
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAcl.java 
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAcl.java
index fd3e0f029e..3fe8ac70da 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAcl.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAcl.java
@@ -25,6 +25,7 @@ import 
org.apache.kafka.common.metadata.AccessControlEntryRecord;
 import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
 
 import java.util.Objects;
 
@@ -96,6 +97,17 @@ final public class StandardAcl implements 
Comparable<StandardAcl> {
         return principal;
     }
 
+    public KafkaPrincipal kafkaPrincipal() {
+        int colonIndex = principal.indexOf(":");
+        if (colonIndex == -1) {
+            throw new IllegalStateException("Could not parse principal from `" 
+ principal + "` " +
+                "(no colon is present separating the principal type from the 
principal name)");
+        }
+        String principalType = principal.substring(0, colonIndex);
+        String principalName = principal.substring(colonIndex + 1);
+        return new KafkaPrincipal(principalType, principalName);
+    }
+
     public String host() {
         return host;
     }
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
index a70fa8ca45..27cca4271b 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
@@ -22,12 +22,18 @@ import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.server.authorizer.Action;
 import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
 import org.apache.kafka.server.authorizer.AuthorizationResult;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -69,12 +75,18 @@ public class StandardAuthorizerData {
      * The principal entry used in ACLs that match any principal.
      */
     public static final String WILDCARD_PRINCIPAL = "User:*";
+    public static final KafkaPrincipal WILDCARD_KAFKA_PRINCIPAL = new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*");
 
     /**
      * The logger to use.
      */
     final Logger log;
 
+    /**
+     * Logger to use for auditing.
+     */
+    final Logger auditLog;
+
     /**
      * The current AclMutator.
      */
@@ -88,7 +100,7 @@ public class StandardAuthorizerData {
     /**
      * The result to return if no ACLs match.
      */
-    private final AuthorizationResult defaultResult;
+    private final DefaultRule defaultRule;
 
     /**
      * Contains all of the current ACLs sorted by (resource type, resource 
name).
@@ -104,6 +116,10 @@ public class StandardAuthorizerData {
         return new LogContext("[StandardAuthorizer " + nodeId + "] 
").logger(StandardAuthorizerData.class);
     }
 
+    private static Logger auditLogger() {
+        return LoggerFactory.getLogger("kafka.authorizer.logger");
+    }
+
     static StandardAuthorizerData createEmpty() {
         return new StandardAuthorizerData(createLogger(-1),
             null,
@@ -119,18 +135,20 @@ public class StandardAuthorizerData {
                                    ConcurrentSkipListSet<StandardAcl> 
aclsByResource,
                                    ConcurrentHashMap<Uuid, StandardAcl> 
aclsById) {
         this.log = log;
+        this.auditLog = auditLogger();
         this.aclMutator = aclMutator;
         this.superUsers = superUsers;
-        this.defaultResult = defaultResult;
+        this.defaultRule = new DefaultRule(defaultResult);
         this.aclsByResource = aclsByResource;
         this.aclsById = aclsById;
     }
 
     StandardAuthorizerData copyWithNewAclMutator(AclMutator newAclMutator) {
-        return new StandardAuthorizerData(log,
+        return new StandardAuthorizerData(
+            log,
             newAclMutator,
             superUsers,
-            defaultResult,
+            defaultRule.result,
             aclsByResource,
             aclsById);
     }
@@ -152,7 +170,7 @@ public class StandardAuthorizerData {
             log,
             aclMutator,
             superUsers,
-            defaultResult,
+            defaultRule.result,
             new ConcurrentSkipListSet<>(),
             new ConcurrentHashMap<>());
         for (Entry<Uuid, StandardAcl> entry : aclEntries) {
@@ -206,18 +224,13 @@ public class StandardAuthorizerData {
     }
 
     AuthorizationResult defaultResult() {
-        return defaultResult;
+        return defaultRule.result;
     }
 
     int aclCount() {
         return aclsById.size();
     }
 
-    static class AuthorizationResultBuilder {
-        boolean foundDeny = false;
-        boolean foundAllow = false;
-    }
-
     /**
      * Authorize an action based on the current set of ACLs.
      *
@@ -227,18 +240,98 @@ public class StandardAuthorizerData {
      * result. In general it makes more sense to configure the default result 
to be
      * DENY, but some people (and unit tests) configure it as ALLOW.
      */
-    AuthorizationResult authorize(AuthorizableRequestContext requestContext,
-                                  Action action) {
+    public AuthorizationResult authorize(
+        AuthorizableRequestContext requestContext,
+        Action action
+    ) {
+        KafkaPrincipal principal = baseKafkaPrincipal(requestContext);
+        final MatchingRule rule;
+
         // Superusers are authorized to do anything.
-        if (superUsers.contains(requestContext.principal().toString())) {
-            if (log.isTraceEnabled()) {
-                log.trace("authorize(requestContext=" + requestContext + ", 
action=" + action +
-                    "): ALLOWED because " + 
requestContext.principal().toString() +
-                    " is a superuser");
+        if (superUsers.contains(principal.toString())) {
+            rule = SuperUserRule.INSTANCE;
+        } else {
+            MatchingAclRule aclRule = findAclRule(
+                matchingPrincipals(requestContext),
+                requestContext.clientAddress().getHostAddress(),
+                action
+            );
+
+            if (aclRule != null) {
+                rule = aclRule;
+            } else {
+                // If nothing matched, we return the default result.
+                rule = defaultRule;
             }
-            return ALLOWED;
         }
 
+        logAuditMessage(principal, requestContext, action, rule);
+        return rule.result();
+    }
+
+    private String buildAuditMessage(
+        KafkaPrincipal principal,
+        AuthorizableRequestContext context,
+        Action action,
+        MatchingRule rule
+    ) {
+        StringBuilder bldr = new StringBuilder();
+        bldr.append("Principal = ").append(principal);
+        bldr.append(" is ").append(rule.result() == ALLOWED ? "Allowed" : 
"Denied");
+        bldr.append(" operation = ").append(action.operation());
+        bldr.append(" from host = 
").append(context.clientAddress().getHostAddress());
+        bldr.append(" on resource = ");
+        appendResourcePattern(action.resourcePattern(), bldr);
+        bldr.append(" for request = 
").append(ApiKeys.forId(context.requestType()).name);
+        bldr.append(" with resourceRefCount = 
").append(action.resourceReferenceCount());
+        bldr.append(" based on rule ").append(rule);
+        return bldr.toString();
+    }
+
+    private void appendResourcePattern(ResourcePattern resourcePattern, 
StringBuilder bldr) {
+        
bldr.append(SecurityUtils.resourceTypeName(resourcePattern.resourceType()))
+            .append(":")
+            .append(resourcePattern.patternType())
+            .append(":")
+            .append(resourcePattern.name());
+    }
+
+    private void logAuditMessage(
+        KafkaPrincipal principal,
+        AuthorizableRequestContext requestContext,
+        Action action,
+        MatchingRule rule
+    ) {
+        switch (rule.result()) {
+            case ALLOWED:
+                // logIfAllowed is true if access is granted to the resource 
as a result of this authorization.
+                // In this case, log at debug level. If false, no access is 
actually granted, the result is used
+                // only to determine authorized operations. So log only at 
trace level.
+                if (action.logIfAllowed() && auditLog.isDebugEnabled()) {
+                    auditLog.debug(buildAuditMessage(principal, 
requestContext, action, rule));
+                } else if (auditLog.isTraceEnabled()) {
+                    auditLog.trace(buildAuditMessage(principal, 
requestContext, action, rule));
+                }
+                return;
+
+            case DENIED:
+                // logIfDenied is true if access to the resource was 
explicitly requested. Since this is an attempt
+                // to access unauthorized resources, log at info level. If 
false, this is either a request to determine
+                // authorized operations or a filter (e.g for regex 
subscriptions) to filter out authorized resources.
+                // In this case, log only at trace level.
+                if (action.logIfDenied()) {
+                    auditLog.info(buildAuditMessage(principal, requestContext, 
action, rule));
+                } else if (auditLog.isTraceEnabled()) {
+                    auditLog.trace(buildAuditMessage(principal, 
requestContext, action, rule));
+                }
+        }
+    }
+
+    private MatchingAclRule findAclRule(
+        Set<KafkaPrincipal> matchingPrincipals,
+        String host,
+        Action action
+    ) {
         // This code relies on the ordering of StandardAcl within the 
NavigableMap.
         // Entries are sorted by resource type first, then REVERSE resource 
name.
         // Therefore, we can find all the applicable ACLs by starting at
@@ -255,7 +348,7 @@ public class StandardAuthorizerData {
         // 5. rs=TOPIC rn=eeee pt=LITERAL
         //
         // Once we reached element 5, we would stop scanning.
-        AuthorizationResultBuilder builder = new AuthorizationResultBuilder();
+        MatchingAclBuilder matchingAclBuilder = new MatchingAclBuilder();
         StandardAcl exemplar = new StandardAcl(
             action.resourcePattern().resourceType(),
             action.resourcePattern().name(),
@@ -264,8 +357,10 @@ public class StandardAuthorizerData {
             "",
             AclOperation.UNKNOWN,
             AclPermissionType.UNKNOWN);
-        checkSection(action, exemplar, requestContext, builder);
-        if (builder.foundDeny) return DENIED;
+        checkSection(action, exemplar, matchingPrincipals, host, 
matchingAclBuilder);
+        if (matchingAclBuilder.foundDeny()) {
+            return matchingAclBuilder.build();
+        }
 
         // In addition to ACLs for this specific resource name, there can also 
be wildcard
         // ACLs that match any resource name. These are stored as type = 
LITERAL,
@@ -278,30 +373,17 @@ public class StandardAuthorizerData {
             "",
             AclOperation.UNKNOWN,
             AclPermissionType.UNKNOWN);
-        checkSection(action, exemplar, requestContext, builder);
-        if (builder.foundDeny) return DENIED;
-
-        // If we found ALLOW ACLs, the action is allowed.
-        if (builder.foundAllow) {
-            if (log.isTraceEnabled()) {
-                log.trace("authorize(requestContext=" + requestContext + ", 
action=" +
-                    action + "): ALLOWED");
-            }
-            return ALLOWED;
-        }
-
-        // If nothing matched, we return the default result.
-        if (log.isTraceEnabled()) {
-            log.trace("authorize(requestContext=" + requestContext + ", 
action=" +
-                action + "): returning default result " + defaultResult);
-        }
-        return defaultResult;
+        checkSection(action, exemplar, matchingPrincipals, host, 
matchingAclBuilder);
+        return matchingAclBuilder.build();
     }
 
-    void checkSection(Action action,
-                      StandardAcl exemplar,
-                      AuthorizableRequestContext requestContext,
-                      AuthorizationResultBuilder builder) {
+    private void checkSection(
+        Action action,
+        StandardAcl exemplar,
+        Set<KafkaPrincipal> matchingPrincipals,
+        String host,
+        MatchingAclBuilder matchingAclBuilder
+    ) {
         NavigableSet<StandardAcl> tailSet = aclsByResource.tailSet(exemplar, 
true);
         String resourceName = action.resourcePattern().name();
         for (Iterator<StandardAcl> iterator = tailSet.iterator();
@@ -325,15 +407,11 @@ public class StandardAuthorizerData {
                 // stepped outside of the section we care about and should 
stop scanning.
                 break;
             }
-            AuthorizationResult result = findResult(action, requestContext, 
acl);
+            AuthorizationResult result = findResult(action, 
matchingPrincipals, host, acl);
             if (ALLOWED == result) {
-                builder.foundAllow = true;
+                matchingAclBuilder.allowAcl = acl;
             } else if (DENIED == result) {
-                if (log.isTraceEnabled()) {
-                    log.trace("authorize(requestContext=" + requestContext + 
", action=" +
-                        action + "): DENIED because of " + acl);
-                }
-                builder.foundDeny = true;
+                matchingAclBuilder.denyAcl = acl;
                 return;
             }
         }
@@ -351,30 +429,55 @@ public class StandardAuthorizerData {
     private static final Set<AclOperation> IMPLIES_DESCRIBE_CONFIGS = 
Collections.unmodifiableSet(
         EnumSet.of(DESCRIBE_CONFIGS, ALTER_CONFIGS));
 
+    static AuthorizationResult findResult(Action action,
+                                          AuthorizableRequestContext 
requestContext,
+                                          StandardAcl acl) {
+        return findResult(
+            action,
+            matchingPrincipals(requestContext),
+            requestContext.clientAddress().getHostAddress(),
+            acl
+        );
+    }
+
+    static KafkaPrincipal baseKafkaPrincipal(AuthorizableRequestContext 
context) {
+        KafkaPrincipal sessionPrincipal = context.principal();
+        return sessionPrincipal.getClass().equals(KafkaPrincipal.class)
+            ? sessionPrincipal
+            : new KafkaPrincipal(sessionPrincipal.getPrincipalType(), 
sessionPrincipal.getName());
+    }
+
+    static Set<KafkaPrincipal> matchingPrincipals(AuthorizableRequestContext 
context) {
+        KafkaPrincipal sessionPrincipal = context.principal();
+        KafkaPrincipal basePrincipal = 
sessionPrincipal.getClass().equals(KafkaPrincipal.class)
+            ? sessionPrincipal
+            : new KafkaPrincipal(sessionPrincipal.getPrincipalType(), 
sessionPrincipal.getName());
+        return Utils.mkSet(basePrincipal, WILDCARD_KAFKA_PRINCIPAL);
+    }
+
     /**
      * Determine what the result of applying an ACL to the given action and 
request
      * context should be. Note that this function assumes that the resource 
name matches;
      * the resource name is not checked here.
      *
-     * @param action            The input action.
-     * @param requestContext    The input request context.
-     * @param acl               The input ACL.
-     * @return                  null if the ACL does not match. The 
authorization result
-     *                          otherwise.
+     * @param action             The input action.
+     * @param matchingPrincipals The set of input matching principals
+     * @param host               The input host.
+     * @param acl                The input ACL.
+     * @return                   null if the ACL does not match. The 
authorization result
+     *                           otherwise.
      */
     static AuthorizationResult findResult(Action action,
-                                          AuthorizableRequestContext 
requestContext,
+                                          Set<KafkaPrincipal> 
matchingPrincipals,
+                                          String host,
                                           StandardAcl acl) {
         // Check if the principal matches. If it doesn't, return no result 
(null).
-        if (!acl.principal().equals(WILDCARD_PRINCIPAL)) {
-            if 
(!acl.principal().equals(requestContext.principal().toString())) return null;
+        if (!matchingPrincipals.contains(acl.kafkaPrincipal())) {
+            return null;
         }
         // Check if the host matches. If it doesn't, return no result (null).
-        // The hostname should be cached in the InetAddress object, so calling 
this more
-        // than once shouldn't be too expensive.
-        if (!acl.host().equals(WILDCARD)) {
-            String host = requestContext.clientAddress().getHostAddress();
-            if (!acl.host().equals(host)) return null;
+        if (!acl.host().equals(WILDCARD) && !acl.host().equals(host)) {
+            return null;
         }
         // Check if the operation field matches. Here we hit a slight 
complication.
         // ACLs for various operations (READ, WRITE, DELETE, ALTER), "imply" 
the presence
@@ -456,4 +559,79 @@ public class StandardAuthorizerData {
             return result;
         }
     }
+
+    private interface MatchingRule {
+        AuthorizationResult result();
+    }
+
+    private static class SuperUserRule implements MatchingRule {
+        private static final SuperUserRule INSTANCE = new SuperUserRule();
+
+        @Override
+        public AuthorizationResult result() {
+            return ALLOWED;
+        }
+
+        @Override
+        public String toString() {
+            return "SuperUser";
+        }
+    }
+
+    private static class DefaultRule implements MatchingRule {
+        private final AuthorizationResult result;
+
+        private DefaultRule(AuthorizationResult result) {
+            this.result = result;
+        }
+
+        @Override
+        public AuthorizationResult result() {
+            return result;
+        }
+
+        @Override
+        public String toString() {
+            return result == ALLOWED ? "DefaultAllow" : "DefaultDeny";
+        }
+    }
+
+    private static class MatchingAclRule implements MatchingRule {
+        private final StandardAcl acl;
+        private final AuthorizationResult result;
+
+        private MatchingAclRule(StandardAcl acl, AuthorizationResult result) {
+            this.acl = acl;
+            this.result = result;
+        }
+
+        @Override
+        public AuthorizationResult result() {
+            return result;
+        }
+
+        @Override
+        public String toString() {
+            return "MatchingAcl(acl=" + acl + ")";
+        }
+    }
+
+    private static class MatchingAclBuilder {
+        private StandardAcl denyAcl;
+        private StandardAcl allowAcl;
+
+        boolean foundDeny() {
+            return denyAcl != null;
+        }
+
+        MatchingAclRule build() {
+            if (denyAcl != null) {
+                return new MatchingAclRule(denyAcl, DENIED);
+            } else if (allowAcl != null) {
+                return new MatchingAclRule(allowAcl, ALLOWED);
+            } else {
+                return null;
+            }
+        }
+    }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
index ee09bb4c12..7ed37785c4 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
@@ -32,6 +32,12 @@ import org.apache.kafka.server.authorizer.Action;
 import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.net.InetAddress;
 import java.util.Arrays;
@@ -40,7 +46,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
@@ -119,8 +124,6 @@ public class StandardAuthorizerTest {
             new ResourcePattern(resourceType, resourceName, LITERAL), 1, 
false, false);
     }
 
-    private final static AtomicLong NEXT_ID = new AtomicLong(0);
-
     static StandardAcl newFooAcl(AclOperation op, AclPermissionType 
permission) {
         return new StandardAcl(
             TOPIC,
@@ -428,4 +431,87 @@ public class StandardAuthorizerTest {
                     newAction(WRITE, GROUP, "arbitrary"),
                     newAction(READ, TOPIC, "ala"))));
     }
-}
\ No newline at end of file
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testDenyAuditLogging(boolean logIfDenied) throws Exception {
+        try (MockedStatic<LoggerFactory> mockedLoggerFactory = 
Mockito.mockStatic(LoggerFactory.class)) {
+            Logger otherLog = Mockito.mock(Logger.class);
+            Logger auditLog = Mockito.mock(Logger.class);
+            mockedLoggerFactory
+                .when(() -> LoggerFactory.getLogger("kafka.authorizer.logger"))
+                .thenReturn(auditLog);
+
+            mockedLoggerFactory
+                .when(() -> LoggerFactory.getLogger(Mockito.any(Class.class)))
+                .thenReturn(otherLog);
+
+            Mockito.when(auditLog.isDebugEnabled()).thenReturn(true);
+            Mockito.when(auditLog.isTraceEnabled()).thenReturn(true);
+
+            StandardAuthorizer authorizer = createAuthorizerWithManyAcls();
+            ResourcePattern topicResource = new ResourcePattern(TOPIC, 
"alpha", LITERAL);
+            Action action = new Action(READ, topicResource, 1, false, 
logIfDenied);
+            MockAuthorizableRequestContext requestContext = new 
MockAuthorizableRequestContext.Builder()
+                .setPrincipal(new KafkaPrincipal(USER_TYPE, "bob"))
+                .setClientAddress(InetAddress.getByName("127.0.0.1"))
+                .build();
+
+            assertEquals(singletonList(DENIED), 
authorizer.authorize(requestContext, singletonList(action)));
+
+            String expectedAuditLog = "Principal = User:bob is Denied 
operation = READ " +
+                "from host = 127.0.0.1 on resource = Topic:LITERAL:alpha for 
request = Fetch " +
+                "with resourceRefCount = 1 based on rule 
MatchingAcl(acl=StandardAcl(resourceType=TOPIC, " +
+                "resourceName=alp, patternType=PREFIXED, principal=User:bob, 
host=*, operation=READ, " +
+                "permissionType=DENY))";
+
+            if (logIfDenied) {
+                Mockito.verify(auditLog).info(expectedAuditLog);
+            } else {
+                Mockito.verify(auditLog).trace(expectedAuditLog);
+            }
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testAllowAuditLogging(boolean logIfAllowed) throws Exception {
+        try (MockedStatic<LoggerFactory> mockedLoggerFactory = 
Mockito.mockStatic(LoggerFactory.class)) {
+            Logger otherLog = Mockito.mock(Logger.class);
+            Logger auditLog = Mockito.mock(Logger.class);
+            mockedLoggerFactory
+                .when(() -> LoggerFactory.getLogger("kafka.authorizer.logger"))
+                .thenReturn(auditLog);
+
+            mockedLoggerFactory
+                .when(() -> LoggerFactory.getLogger(Mockito.any(Class.class)))
+                .thenReturn(otherLog);
+
+            Mockito.when(auditLog.isDebugEnabled()).thenReturn(true);
+            Mockito.when(auditLog.isTraceEnabled()).thenReturn(true);
+
+            StandardAuthorizer authorizer = createAuthorizerWithManyAcls();
+            ResourcePattern topicResource = new ResourcePattern(TOPIC, 
"green1", LITERAL);
+            Action action = new Action(READ, topicResource, 1, logIfAllowed, 
false);
+            MockAuthorizableRequestContext requestContext = new 
MockAuthorizableRequestContext.Builder()
+                .setPrincipal(new KafkaPrincipal(USER_TYPE, "bob"))
+                .setClientAddress(InetAddress.getByName("127.0.0.1"))
+                .build();
+
+            assertEquals(singletonList(ALLOWED), 
authorizer.authorize(requestContext, singletonList(action)));
+
+            String expectedAuditLog = "Principal = User:bob is Allowed 
operation = READ " +
+                "from host = 127.0.0.1 on resource = Topic:LITERAL:green1 for 
request = Fetch " +
+                "with resourceRefCount = 1 based on rule 
MatchingAcl(acl=StandardAcl(resourceType=TOPIC, " +
+                "resourceName=green, patternType=PREFIXED, principal=User:bob, 
host=*, operation=READ, " +
+                "permissionType=ALLOW))";
+
+            if (logIfAllowed) {
+                Mockito.verify(auditLog).debug(expectedAuditLog);
+            } else {
+                Mockito.verify(auditLog).trace(expectedAuditLog);
+            }
+        }
+    }
+
+}

Reply via email to