This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f97646488c KAFKA-13651; Add audit logging to `StandardAuthorizer`
(#12031)
f97646488c is described below
commit f97646488cff1984455ffb1fe9a147a522e6ac76
Author: Jason Gustafson <[email protected]>
AuthorDate: Wed Apr 13 10:33:15 2022 -0700
KAFKA-13651; Add audit logging to `StandardAuthorizer` (#12031)
This patch adds audit support through the kafka.authorizer.logger logger to
StandardAuthorizer. It
follows the same conventions as AclAuthorizer with a similarly formatted
log message. When
logIfAllowed is set in the Action, then the log message is at DEBUG level;
otherwise, we log at
trace. When logIfDenied is set, then the log message is at INFO level;
otherwise, we again log at
TRACE.
Reviewers: Colin P. McCabe <[email protected]>
---
build.gradle | 2 +
.../kafka/common/resource/ResourcePattern.java | 2 +-
.../kafka/metadata/authorizer/StandardAcl.java | 12 +
.../authorizer/StandardAuthorizerData.java | 306 ++++++++++++++++-----
.../authorizer/StandardAuthorizerTest.java | 94 ++++++-
5 files changed, 347 insertions(+), 69 deletions(-)
diff --git a/build.gradle b/build.gradle
index 7ecfa3e07e..f7fe0ea046 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1136,6 +1136,8 @@ project(':metadata') {
compileOnly libs.log4j
testImplementation libs.junitJupiter
testImplementation libs.hamcrest
+ testImplementation libs.mockitoCore
+ testImplementation libs.mockitoInline
testImplementation libs.slf4jlog4j
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':raft').sourceSets.test.output
diff --git
a/clients/src/main/java/org/apache/kafka/common/resource/ResourcePattern.java
b/clients/src/main/java/org/apache/kafka/common/resource/ResourcePattern.java
index 2b7504f70a..b3dfc4937f 100644
---
a/clients/src/main/java/org/apache/kafka/common/resource/ResourcePattern.java
+++
b/clients/src/main/java/org/apache/kafka/common/resource/ResourcePattern.java
@@ -89,7 +89,7 @@ public class ResourcePattern {
@Override
public String toString() {
- return "ResourcePattern(resourceType=" + resourceType + ", name=" +
((name == null) ? "<any>" : name) + ", patternType=" + patternType + ")";
+ return "ResourcePattern(resourceType=" + resourceType + ", name=" +
name + ", patternType=" + patternType + ")";
}
/**
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAcl.java
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAcl.java
index fd3e0f029e..3fe8ac70da 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAcl.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAcl.java
@@ -25,6 +25,7 @@ import
org.apache.kafka.common.metadata.AccessControlEntryRecord;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
import java.util.Objects;
@@ -96,6 +97,17 @@ final public class StandardAcl implements
Comparable<StandardAcl> {
return principal;
}
+ public KafkaPrincipal kafkaPrincipal() {
+ int colonIndex = principal.indexOf(":");
+ if (colonIndex == -1) {
+ throw new IllegalStateException("Could not parse principal from `"
+ principal + "` " +
+ "(no colon is present separating the principal type from the
principal name)");
+ }
+ String principalType = principal.substring(0, colonIndex);
+ String principalName = principal.substring(colonIndex + 1);
+ return new KafkaPrincipal(principalType, principalName);
+ }
+
public String host() {
return host;
}
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
index a70fa8ca45..27cca4271b 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
@@ -22,12 +22,18 @@ import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.authorizer.Action;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Collections;
@@ -69,12 +75,18 @@ public class StandardAuthorizerData {
* The principal entry used in ACLs that match any principal.
*/
public static final String WILDCARD_PRINCIPAL = "User:*";
+ public static final KafkaPrincipal WILDCARD_KAFKA_PRINCIPAL = new
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*");
/**
* The logger to use.
*/
final Logger log;
+ /**
+ * Logger to use for auditing.
+ */
+ final Logger auditLog;
+
/**
* The current AclMutator.
*/
@@ -88,7 +100,7 @@ public class StandardAuthorizerData {
/**
* The result to return if no ACLs match.
*/
- private final AuthorizationResult defaultResult;
+ private final DefaultRule defaultRule;
/**
* Contains all of the current ACLs sorted by (resource type, resource
name).
@@ -104,6 +116,10 @@ public class StandardAuthorizerData {
return new LogContext("[StandardAuthorizer " + nodeId + "]
").logger(StandardAuthorizerData.class);
}
+ private static Logger auditLogger() {
+ return LoggerFactory.getLogger("kafka.authorizer.logger");
+ }
+
static StandardAuthorizerData createEmpty() {
return new StandardAuthorizerData(createLogger(-1),
null,
@@ -119,18 +135,20 @@ public class StandardAuthorizerData {
ConcurrentSkipListSet<StandardAcl>
aclsByResource,
ConcurrentHashMap<Uuid, StandardAcl>
aclsById) {
this.log = log;
+ this.auditLog = auditLogger();
this.aclMutator = aclMutator;
this.superUsers = superUsers;
- this.defaultResult = defaultResult;
+ this.defaultRule = new DefaultRule(defaultResult);
this.aclsByResource = aclsByResource;
this.aclsById = aclsById;
}
StandardAuthorizerData copyWithNewAclMutator(AclMutator newAclMutator) {
- return new StandardAuthorizerData(log,
+ return new StandardAuthorizerData(
+ log,
newAclMutator,
superUsers,
- defaultResult,
+ defaultRule.result,
aclsByResource,
aclsById);
}
@@ -152,7 +170,7 @@ public class StandardAuthorizerData {
log,
aclMutator,
superUsers,
- defaultResult,
+ defaultRule.result,
new ConcurrentSkipListSet<>(),
new ConcurrentHashMap<>());
for (Entry<Uuid, StandardAcl> entry : aclEntries) {
@@ -206,18 +224,13 @@ public class StandardAuthorizerData {
}
AuthorizationResult defaultResult() {
- return defaultResult;
+ return defaultRule.result;
}
int aclCount() {
return aclsById.size();
}
- static class AuthorizationResultBuilder {
- boolean foundDeny = false;
- boolean foundAllow = false;
- }
-
/**
* Authorize an action based on the current set of ACLs.
*
@@ -227,18 +240,98 @@ public class StandardAuthorizerData {
* result. In general it makes more sense to configure the default result
to be
* DENY, but some people (and unit tests) configure it as ALLOW.
*/
- AuthorizationResult authorize(AuthorizableRequestContext requestContext,
- Action action) {
+ public AuthorizationResult authorize(
+ AuthorizableRequestContext requestContext,
+ Action action
+ ) {
+ KafkaPrincipal principal = baseKafkaPrincipal(requestContext);
+ final MatchingRule rule;
+
// Superusers are authorized to do anything.
- if (superUsers.contains(requestContext.principal().toString())) {
- if (log.isTraceEnabled()) {
- log.trace("authorize(requestContext=" + requestContext + ",
action=" + action +
- "): ALLOWED because " +
requestContext.principal().toString() +
- " is a superuser");
+ if (superUsers.contains(principal.toString())) {
+ rule = SuperUserRule.INSTANCE;
+ } else {
+ MatchingAclRule aclRule = findAclRule(
+ matchingPrincipals(requestContext),
+ requestContext.clientAddress().getHostAddress(),
+ action
+ );
+
+ if (aclRule != null) {
+ rule = aclRule;
+ } else {
+ // If nothing matched, we return the default result.
+ rule = defaultRule;
}
- return ALLOWED;
}
+ logAuditMessage(principal, requestContext, action, rule);
+ return rule.result();
+ }
+
+ private String buildAuditMessage(
+ KafkaPrincipal principal,
+ AuthorizableRequestContext context,
+ Action action,
+ MatchingRule rule
+ ) {
+ StringBuilder bldr = new StringBuilder();
+ bldr.append("Principal = ").append(principal);
+ bldr.append(" is ").append(rule.result() == ALLOWED ? "Allowed" :
"Denied");
+ bldr.append(" operation = ").append(action.operation());
+ bldr.append(" from host =
").append(context.clientAddress().getHostAddress());
+ bldr.append(" on resource = ");
+ appendResourcePattern(action.resourcePattern(), bldr);
+ bldr.append(" for request =
").append(ApiKeys.forId(context.requestType()).name);
+ bldr.append(" with resourceRefCount =
").append(action.resourceReferenceCount());
+ bldr.append(" based on rule ").append(rule);
+ return bldr.toString();
+ }
+
+ private void appendResourcePattern(ResourcePattern resourcePattern,
StringBuilder bldr) {
+
bldr.append(SecurityUtils.resourceTypeName(resourcePattern.resourceType()))
+ .append(":")
+ .append(resourcePattern.patternType())
+ .append(":")
+ .append(resourcePattern.name());
+ }
+
+ private void logAuditMessage(
+ KafkaPrincipal principal,
+ AuthorizableRequestContext requestContext,
+ Action action,
+ MatchingRule rule
+ ) {
+ switch (rule.result()) {
+ case ALLOWED:
+ // logIfAllowed is true if access is granted to the resource
as a result of this authorization.
+ // In this case, log at debug level. If false, no access is
actually granted, the result is used
+ // only to determine authorized operations. So log only at
trace level.
+ if (action.logIfAllowed() && auditLog.isDebugEnabled()) {
+ auditLog.debug(buildAuditMessage(principal,
requestContext, action, rule));
+ } else if (auditLog.isTraceEnabled()) {
+ auditLog.trace(buildAuditMessage(principal,
requestContext, action, rule));
+ }
+ return;
+
+ case DENIED:
+ // logIfDenied is true if access to the resource was
explicitly requested. Since this is an attempt
+ // to access unauthorized resources, log at info level. If
false, this is either a request to determine
+ // authorized operations or a filter (e.g for regex
subscriptions) to filter out authorized resources.
+ // In this case, log only at trace level.
+ if (action.logIfDenied()) {
+ auditLog.info(buildAuditMessage(principal, requestContext,
action, rule));
+ } else if (auditLog.isTraceEnabled()) {
+ auditLog.trace(buildAuditMessage(principal,
requestContext, action, rule));
+ }
+ }
+ }
+
+ private MatchingAclRule findAclRule(
+ Set<KafkaPrincipal> matchingPrincipals,
+ String host,
+ Action action
+ ) {
// This code relies on the ordering of StandardAcl within the
NavigableMap.
// Entries are sorted by resource type first, then REVERSE resource
name.
// Therefore, we can find all the applicable ACLs by starting at
@@ -255,7 +348,7 @@ public class StandardAuthorizerData {
// 5. rs=TOPIC rn=eeee pt=LITERAL
//
// Once we reached element 5, we would stop scanning.
- AuthorizationResultBuilder builder = new AuthorizationResultBuilder();
+ MatchingAclBuilder matchingAclBuilder = new MatchingAclBuilder();
StandardAcl exemplar = new StandardAcl(
action.resourcePattern().resourceType(),
action.resourcePattern().name(),
@@ -264,8 +357,10 @@ public class StandardAuthorizerData {
"",
AclOperation.UNKNOWN,
AclPermissionType.UNKNOWN);
- checkSection(action, exemplar, requestContext, builder);
- if (builder.foundDeny) return DENIED;
+ checkSection(action, exemplar, matchingPrincipals, host,
matchingAclBuilder);
+ if (matchingAclBuilder.foundDeny()) {
+ return matchingAclBuilder.build();
+ }
// In addition to ACLs for this specific resource name, there can also
be wildcard
// ACLs that match any resource name. These are stored as type =
LITERAL,
@@ -278,30 +373,17 @@ public class StandardAuthorizerData {
"",
AclOperation.UNKNOWN,
AclPermissionType.UNKNOWN);
- checkSection(action, exemplar, requestContext, builder);
- if (builder.foundDeny) return DENIED;
-
- // If we found ALLOW ACLs, the action is allowed.
- if (builder.foundAllow) {
- if (log.isTraceEnabled()) {
- log.trace("authorize(requestContext=" + requestContext + ",
action=" +
- action + "): ALLOWED");
- }
- return ALLOWED;
- }
-
- // If nothing matched, we return the default result.
- if (log.isTraceEnabled()) {
- log.trace("authorize(requestContext=" + requestContext + ",
action=" +
- action + "): returning default result " + defaultResult);
- }
- return defaultResult;
+ checkSection(action, exemplar, matchingPrincipals, host,
matchingAclBuilder);
+ return matchingAclBuilder.build();
}
- void checkSection(Action action,
- StandardAcl exemplar,
- AuthorizableRequestContext requestContext,
- AuthorizationResultBuilder builder) {
+ private void checkSection(
+ Action action,
+ StandardAcl exemplar,
+ Set<KafkaPrincipal> matchingPrincipals,
+ String host,
+ MatchingAclBuilder matchingAclBuilder
+ ) {
NavigableSet<StandardAcl> tailSet = aclsByResource.tailSet(exemplar,
true);
String resourceName = action.resourcePattern().name();
for (Iterator<StandardAcl> iterator = tailSet.iterator();
@@ -325,15 +407,11 @@ public class StandardAuthorizerData {
// stepped outside of the section we care about and should
stop scanning.
break;
}
- AuthorizationResult result = findResult(action, requestContext,
acl);
+ AuthorizationResult result = findResult(action,
matchingPrincipals, host, acl);
if (ALLOWED == result) {
- builder.foundAllow = true;
+ matchingAclBuilder.allowAcl = acl;
} else if (DENIED == result) {
- if (log.isTraceEnabled()) {
- log.trace("authorize(requestContext=" + requestContext +
", action=" +
- action + "): DENIED because of " + acl);
- }
- builder.foundDeny = true;
+ matchingAclBuilder.denyAcl = acl;
return;
}
}
@@ -351,30 +429,55 @@ public class StandardAuthorizerData {
private static final Set<AclOperation> IMPLIES_DESCRIBE_CONFIGS =
Collections.unmodifiableSet(
EnumSet.of(DESCRIBE_CONFIGS, ALTER_CONFIGS));
+ static AuthorizationResult findResult(Action action,
+ AuthorizableRequestContext
requestContext,
+ StandardAcl acl) {
+ return findResult(
+ action,
+ matchingPrincipals(requestContext),
+ requestContext.clientAddress().getHostAddress(),
+ acl
+ );
+ }
+
+ static KafkaPrincipal baseKafkaPrincipal(AuthorizableRequestContext
context) {
+ KafkaPrincipal sessionPrincipal = context.principal();
+ return sessionPrincipal.getClass().equals(KafkaPrincipal.class)
+ ? sessionPrincipal
+ : new KafkaPrincipal(sessionPrincipal.getPrincipalType(),
sessionPrincipal.getName());
+ }
+
+ static Set<KafkaPrincipal> matchingPrincipals(AuthorizableRequestContext
context) {
+ KafkaPrincipal sessionPrincipal = context.principal();
+ KafkaPrincipal basePrincipal =
sessionPrincipal.getClass().equals(KafkaPrincipal.class)
+ ? sessionPrincipal
+ : new KafkaPrincipal(sessionPrincipal.getPrincipalType(),
sessionPrincipal.getName());
+ return Utils.mkSet(basePrincipal, WILDCARD_KAFKA_PRINCIPAL);
+ }
+
/**
* Determine what the result of applying an ACL to the given action and
request
* context should be. Note that this function assumes that the resource
name matches;
* the resource name is not checked here.
*
- * @param action The input action.
- * @param requestContext The input request context.
- * @param acl The input ACL.
- * @return null if the ACL does not match. The
authorization result
- * otherwise.
+ * @param action The input action.
+ * @param matchingPrincipals The set of input matching principals
+ * @param host The input host.
+ * @param acl The input ACL.
+ * @return null if the ACL does not match. The
authorization result
+ * otherwise.
*/
static AuthorizationResult findResult(Action action,
- AuthorizableRequestContext
requestContext,
+ Set<KafkaPrincipal>
matchingPrincipals,
+ String host,
StandardAcl acl) {
// Check if the principal matches. If it doesn't, return no result
(null).
- if (!acl.principal().equals(WILDCARD_PRINCIPAL)) {
- if
(!acl.principal().equals(requestContext.principal().toString())) return null;
+ if (!matchingPrincipals.contains(acl.kafkaPrincipal())) {
+ return null;
}
// Check if the host matches. If it doesn't, return no result (null).
- // The hostname should be cached in the InetAddress object, so calling
this more
- // than once shouldn't be too expensive.
- if (!acl.host().equals(WILDCARD)) {
- String host = requestContext.clientAddress().getHostAddress();
- if (!acl.host().equals(host)) return null;
+ if (!acl.host().equals(WILDCARD) && !acl.host().equals(host)) {
+ return null;
}
// Check if the operation field matches. Here we hit a slight
complication.
// ACLs for various operations (READ, WRITE, DELETE, ALTER), "imply"
the presence
@@ -456,4 +559,79 @@ public class StandardAuthorizerData {
return result;
}
}
+
+ private interface MatchingRule {
+ AuthorizationResult result();
+ }
+
+ private static class SuperUserRule implements MatchingRule {
+ private static final SuperUserRule INSTANCE = new SuperUserRule();
+
+ @Override
+ public AuthorizationResult result() {
+ return ALLOWED;
+ }
+
+ @Override
+ public String toString() {
+ return "SuperUser";
+ }
+ }
+
+ private static class DefaultRule implements MatchingRule {
+ private final AuthorizationResult result;
+
+ private DefaultRule(AuthorizationResult result) {
+ this.result = result;
+ }
+
+ @Override
+ public AuthorizationResult result() {
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return result == ALLOWED ? "DefaultAllow" : "DefaultDeny";
+ }
+ }
+
+ private static class MatchingAclRule implements MatchingRule {
+ private final StandardAcl acl;
+ private final AuthorizationResult result;
+
+ private MatchingAclRule(StandardAcl acl, AuthorizationResult result) {
+ this.acl = acl;
+ this.result = result;
+ }
+
+ @Override
+ public AuthorizationResult result() {
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "MatchingAcl(acl=" + acl + ")";
+ }
+ }
+
+ private static class MatchingAclBuilder {
+ private StandardAcl denyAcl;
+ private StandardAcl allowAcl;
+
+ boolean foundDeny() {
+ return denyAcl != null;
+ }
+
+ MatchingAclRule build() {
+ if (denyAcl != null) {
+ return new MatchingAclRule(denyAcl, DENIED);
+ } else if (allowAcl != null) {
+ return new MatchingAclRule(allowAcl, ALLOWED);
+ } else {
+ return null;
+ }
+ }
+ }
}
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
index ee09bb4c12..7ed37785c4 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
@@ -32,6 +32,12 @@ import org.apache.kafka.server.authorizer.Action;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.util.Arrays;
@@ -40,7 +46,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
@@ -119,8 +124,6 @@ public class StandardAuthorizerTest {
new ResourcePattern(resourceType, resourceName, LITERAL), 1,
false, false);
}
- private final static AtomicLong NEXT_ID = new AtomicLong(0);
-
static StandardAcl newFooAcl(AclOperation op, AclPermissionType
permission) {
return new StandardAcl(
TOPIC,
@@ -428,4 +431,87 @@ public class StandardAuthorizerTest {
newAction(WRITE, GROUP, "arbitrary"),
newAction(READ, TOPIC, "ala"))));
}
-}
\ No newline at end of file
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testDenyAuditLogging(boolean logIfDenied) throws Exception {
+ try (MockedStatic<LoggerFactory> mockedLoggerFactory =
Mockito.mockStatic(LoggerFactory.class)) {
+ Logger otherLog = Mockito.mock(Logger.class);
+ Logger auditLog = Mockito.mock(Logger.class);
+ mockedLoggerFactory
+ .when(() -> LoggerFactory.getLogger("kafka.authorizer.logger"))
+ .thenReturn(auditLog);
+
+ mockedLoggerFactory
+ .when(() -> LoggerFactory.getLogger(Mockito.any(Class.class)))
+ .thenReturn(otherLog);
+
+ Mockito.when(auditLog.isDebugEnabled()).thenReturn(true);
+ Mockito.when(auditLog.isTraceEnabled()).thenReturn(true);
+
+ StandardAuthorizer authorizer = createAuthorizerWithManyAcls();
+ ResourcePattern topicResource = new ResourcePattern(TOPIC,
"alpha", LITERAL);
+ Action action = new Action(READ, topicResource, 1, false,
logIfDenied);
+ MockAuthorizableRequestContext requestContext = new
MockAuthorizableRequestContext.Builder()
+ .setPrincipal(new KafkaPrincipal(USER_TYPE, "bob"))
+ .setClientAddress(InetAddress.getByName("127.0.0.1"))
+ .build();
+
+ assertEquals(singletonList(DENIED),
authorizer.authorize(requestContext, singletonList(action)));
+
+ String expectedAuditLog = "Principal = User:bob is Denied
operation = READ " +
+ "from host = 127.0.0.1 on resource = Topic:LITERAL:alpha for
request = Fetch " +
+ "with resourceRefCount = 1 based on rule
MatchingAcl(acl=StandardAcl(resourceType=TOPIC, " +
+ "resourceName=alp, patternType=PREFIXED, principal=User:bob,
host=*, operation=READ, " +
+ "permissionType=DENY))";
+
+ if (logIfDenied) {
+ Mockito.verify(auditLog).info(expectedAuditLog);
+ } else {
+ Mockito.verify(auditLog).trace(expectedAuditLog);
+ }
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testAllowAuditLogging(boolean logIfAllowed) throws Exception {
+ try (MockedStatic<LoggerFactory> mockedLoggerFactory =
Mockito.mockStatic(LoggerFactory.class)) {
+ Logger otherLog = Mockito.mock(Logger.class);
+ Logger auditLog = Mockito.mock(Logger.class);
+ mockedLoggerFactory
+ .when(() -> LoggerFactory.getLogger("kafka.authorizer.logger"))
+ .thenReturn(auditLog);
+
+ mockedLoggerFactory
+ .when(() -> LoggerFactory.getLogger(Mockito.any(Class.class)))
+ .thenReturn(otherLog);
+
+ Mockito.when(auditLog.isDebugEnabled()).thenReturn(true);
+ Mockito.when(auditLog.isTraceEnabled()).thenReturn(true);
+
+ StandardAuthorizer authorizer = createAuthorizerWithManyAcls();
+ ResourcePattern topicResource = new ResourcePattern(TOPIC,
"green1", LITERAL);
+ Action action = new Action(READ, topicResource, 1, logIfAllowed,
false);
+ MockAuthorizableRequestContext requestContext = new
MockAuthorizableRequestContext.Builder()
+ .setPrincipal(new KafkaPrincipal(USER_TYPE, "bob"))
+ .setClientAddress(InetAddress.getByName("127.0.0.1"))
+ .build();
+
+ assertEquals(singletonList(ALLOWED),
authorizer.authorize(requestContext, singletonList(action)));
+
+ String expectedAuditLog = "Principal = User:bob is Allowed
operation = READ " +
+ "from host = 127.0.0.1 on resource = Topic:LITERAL:green1 for
request = Fetch " +
+ "with resourceRefCount = 1 based on rule
MatchingAcl(acl=StandardAcl(resourceType=TOPIC, " +
+ "resourceName=green, patternType=PREFIXED, principal=User:bob,
host=*, operation=READ, " +
+ "permissionType=ALLOW))";
+
+ if (logIfAllowed) {
+ Mockito.verify(auditLog).debug(expectedAuditLog);
+ } else {
+ Mockito.verify(auditLog).trace(expectedAuditLog);
+ }
+ }
+ }
+
+}