This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 5992f5141c3e7fffc709e37ffebee9280657bd84 Author: Jason Gustafson <ja...@confluent.io> AuthorDate: Tue Oct 4 16:31:43 2022 -0700 MINOR: Add initial property tests for StandardAuthorizer (#12703) In https://github.com/apache/kafka/pull/12695, we discovered a gap in our testing of `StandardAuthorizer`. We addressed the specific case that was failing, but I think we need to establish a better methodology for testing which incorporates randomized inputs. This patch is a start in that direction. We implement a few basic property tests using jqwik which focus on prefix searching. It catches the case from https://github.com/apache/kafka/pull/12695 prior to the fix. In the future, we [...] Reviewers: David Arthur <mum...@gmail.com> --- build.gradle | 1 + checkstyle/import-control.xml | 2 + .../org/apache/kafka/common/internals/Topic.java | 30 ++- .../ControllerConfigurationValidatorTest.scala | 4 +- .../controller/ReplicationControlManagerTest.java | 4 +- .../authorizer/StandardAuthorizerPropertyTest.java | 299 +++++++++++++++++++++ 6 files changed, 328 insertions(+), 12 deletions(-) diff --git a/build.gradle b/build.gradle index ee99cb7c870..56511cf61ce 100644 --- a/build.gradle +++ b/build.gradle @@ -1152,6 +1152,7 @@ project(':metadata') { implementation libs.metrics compileOnly libs.log4j testImplementation libs.junitJupiter + testImplementation libs.jqwik testImplementation libs.hamcrest testImplementation libs.mockitoCore testImplementation libs.mockitoInline diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 45dc28d9e3a..44260f360bb 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -36,6 +36,7 @@ <allow pkg="javax.net.ssl" /> <allow pkg="javax.security" /> <allow pkg="org.ietf.jgss" /> + <allow pkg="net.jqwik.api" /> <!-- no one depends on the server --> <disallow pkg="kafka" /> @@ -276,6 +277,7 @@ <allow pkg="org.apache.kafka.common.resource" /> <allow pkg="org.apache.kafka.controller" /> <allow pkg="org.apache.kafka.metadata" /> + <allow pkg="org.apache.kafka.common.internals" /> </subpackage> <subpackage name="bootstrap"> <allow pkg="org.apache.kafka.snapshot" /> diff --git a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java index fbf491bab98..92952a2c031 100644 --- a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java +++ b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java @@ -43,17 +43,31 @@ public class Topic { }); } - public static void validate(String name, String logPrefix, Consumer<String> throwableConsumer) { + private static String detectInvalidTopic(String name) { if (name.isEmpty()) - throwableConsumer.accept(logPrefix + " is illegal, it can't be empty"); - if (".".equals(name) || "..".equals(name)) - throwableConsumer.accept(logPrefix + " cannot be \".\" or \"..\""); + return "the empty string is not allowed"; + if (".".equals(name)) + return "'.' is not allowed"; + if ("..".equals(name)) + return "'..' is not allowed"; if (name.length() > MAX_NAME_LENGTH) - throwableConsumer.accept(logPrefix + " is illegal, it can't be longer than " + MAX_NAME_LENGTH + - " characters, " + logPrefix + ": " + name); + return "the length of '" + name + "' is longer than the max allowed length " + MAX_NAME_LENGTH; if (!containsValidPattern(name)) - throwableConsumer.accept(logPrefix + " \"" + name + "\" is illegal, it contains a character other than " + - "ASCII alphanumerics, '.', '_' and '-'"); + return "'" + name + "' contains one or more characters other than " + + "ASCII alphanumerics, '.', '_' and '-'"; + return null; + } + + public static boolean isValid(String name) { + String reasonInvalid = detectInvalidTopic(name); + return reasonInvalid == null; + } + + public static void validate(String name, String logPrefix, Consumer<String> throwableConsumer) { + String reasonInvalid = detectInvalidTopic(name); + if (reasonInvalid != null) { + throwableConsumer.accept(logPrefix + " is invalid: " + reasonInvalid); + } } public static boolean isInternal(String topic) { diff --git a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala index c89910ed231..36a8d71fb97 100644 --- a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala @@ -39,8 +39,8 @@ class ControllerConfigurationValidatorTest { @Test def testInvalidTopicNameRejected(): Unit = { - assertEquals("Topic name \"(<-invalid->)\" is illegal, it contains a character " + - "other than ASCII alphanumerics, '.', '_' and '-'", + assertEquals("Topic name is invalid: '(<-invalid->)' contains " + + "one or more characters other than ASCII alphanumerics, '.', '_' and '-'", assertThrows(classOf[InvalidTopicException], () => validator.validate( new ConfigResource(TOPIC, "(<-invalid->)"), emptyMap())). getMessage()) } diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index 82e378a9823..335fd52a07d 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -809,9 +809,9 @@ public class ReplicationControlManagerTest { ReplicationControlManager.validateNewTopicNames(topicErrors, topics, Collections.emptyMap()); Map<String, ApiError> expectedTopicErrors = new HashMap<>(); expectedTopicErrors.put("", new ApiError(INVALID_TOPIC_EXCEPTION, - "Topic name is illegal, it can't be empty")); + "Topic name is invalid: the empty string is not allowed")); expectedTopicErrors.put(".", new ApiError(INVALID_TOPIC_EXCEPTION, - "Topic name cannot be \".\" or \"..\"")); + "Topic name is invalid: '.' is not allowed")); assertEquals(expectedTopicErrors, topicErrors); } diff --git a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerPropertyTest.java b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerPropertyTest.java new file mode 100644 index 00000000000..8935b92e2f9 --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerPropertyTest.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.metadata.authorizer; + +import net.jqwik.api.Assume; +import net.jqwik.api.ForAll; +import net.jqwik.api.Property; +import net.jqwik.api.constraints.AlphaChars; +import net.jqwik.api.constraints.Chars; +import net.jqwik.api.constraints.NumericChars; +import net.jqwik.api.constraints.Size; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.acl.AccessControlEntryFilter; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourcePatternFilter; +import org.apache.kafka.common.resource.ResourceType; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.server.authorizer.Action; +import org.apache.kafka.server.authorizer.AuthorizableRequestContext; +import org.apache.kafka.server.authorizer.AuthorizationResult; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.function.Predicate; + +import static org.apache.kafka.common.security.auth.KafkaPrincipal.USER_TYPE; +import static org.apache.kafka.metadata.authorizer.StandardAuthorizerTest.PLAINTEXT; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class StandardAuthorizerPropertyTest { + + @Target({ ElementType.ANNOTATION_TYPE, ElementType.PARAMETER, ElementType.TYPE_USE }) + @Retention(RetentionPolicy.RUNTIME) + @AlphaChars @NumericChars @Chars({ '_', '-', '.' }) + public @interface ValidTopicChars { } + + @Property(tries = 5000) + public void matchingPrefixDenyOverridesAllAllowRules( + @ForAll Random random, + @ForAll @ValidTopicChars String topic, + @ForAll @Size(max = 10) Set<@ValidTopicChars String> randomSuffixes + ) throws Exception { + Assume.that(Topic.isValid(topic)); + StandardAuthorizer authorizer = buildAuthorizer(); + + // Create one DENY rule which matches and zero or more ALLOW rules which may or + // may not match. Regardless of the ALLOW rules, the final result should be DENIED. + + String topicPrefix = topic.substring(0, random.nextInt(topic.length())); + StandardAcl denyRule = buildTopicWriteAcl(topicPrefix, PatternType.PREFIXED, AclPermissionType.DENY); + authorizer.addAcl(Uuid.randomUuid(), denyRule); + addRandomPrefixAllowAcls(authorizer, topic, randomSuffixes); + + assertAuthorizationResult( + authorizer, + AuthorizationResult.DENIED, + AclOperation.WRITE, + new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL) + ); + } + + @Property(tries = 5000) + public void matchingLiteralDenyOverridesAllAllowRules( + @ForAll @ValidTopicChars String topic, + @ForAll @Size(max = 10) Set<@ValidTopicChars String> randomSuffixes + ) throws Exception { + Assume.that(Topic.isValid(topic)); + StandardAuthorizer authorizer = buildAuthorizer(); + + // Create one DENY rule which matches and zero or more ALLOW rules which may or + // may not match. Regardless of the ALLOW rules, the final result should be DENIED. + + StandardAcl denyRule = buildTopicWriteAcl(topic, PatternType.LITERAL, AclPermissionType.DENY); + authorizer.addAcl(Uuid.randomUuid(), denyRule); + addRandomPrefixAllowAcls(authorizer, topic, randomSuffixes); + + assertAuthorizationResult( + authorizer, + AuthorizationResult.DENIED, + AclOperation.WRITE, + new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL) + ); + } + + @Property(tries = 5000) + public void matchingPrefixAllowWithNoMatchingDenyRules( + @ForAll Random random, + @ForAll @ValidTopicChars String topic, + @ForAll @Size(max = 10) Set<@ValidTopicChars String> randomSuffixes + ) throws Exception { + Assume.that(Topic.isValid(topic)); + StandardAuthorizer authorizer = buildAuthorizer(); + + // Create one ALLOW rule which matches and zero or more DENY rules which do not + // match. The final result should be ALLOWED. + + String topicPrefix = topic.substring(0, random.nextInt(topic.length())); + StandardAcl denyRule = buildTopicWriteAcl(topicPrefix, PatternType.PREFIXED, AclPermissionType.ALLOW); + authorizer.addAcl(Uuid.randomUuid(), denyRule); + + addRandomNonMatchingPrefixDenyAcls(authorizer, topic, randomSuffixes); + + assertAuthorizationResult( + authorizer, + AuthorizationResult.ALLOWED, + AclOperation.WRITE, + new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL) + ); + } + + @Property(tries = 5000) + public void matchingLiteralAllowWithNoMatchingDenyRules( + @ForAll @ValidTopicChars String topic, + @ForAll @Size(max = 10) Set<@ValidTopicChars String> randomSuffixes + ) throws Exception { + Assume.that(Topic.isValid(topic)); + StandardAuthorizer authorizer = buildAuthorizer(); + + // Create one ALLOW rule which matches and zero or more DENY rules which do not + // match. The final result should be ALLOWED. + + StandardAcl denyRule = buildTopicWriteAcl(topic, PatternType.LITERAL, AclPermissionType.ALLOW); + authorizer.addAcl(Uuid.randomUuid(), denyRule); + + addRandomNonMatchingPrefixDenyAcls(authorizer, topic, randomSuffixes); + + assertAuthorizationResult( + authorizer, + AuthorizationResult.ALLOWED, + AclOperation.WRITE, + new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL) + ); + } + + private StandardAuthorizer buildAuthorizer() { + StandardAuthorizer authorizer = new StandardAuthorizer(); + authorizer.start(new StandardAuthorizerTest.AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT))); + authorizer.completeInitialLoad(); + return authorizer; + } + + private void assertAuthorizationResult( + StandardAuthorizer authorizer, + AuthorizationResult expectedResult, + AclOperation operation, + ResourcePattern pattern + ) throws Exception { + Action action = new Action(operation, pattern, 1, false, false); + List<AuthorizationResult> results = authorizer.authorize( + newRequestContext(), + Collections.singletonList(action) + ); + + assertEquals(1, results.size()); + AuthorizationResult actualResult = results.get(0); + + try { + assertEquals(expectedResult, actualResult); + } catch (Throwable e) { + printCounterExample(authorizer, operation, pattern, actualResult); + throw e; + } + } + + private void printCounterExample( + StandardAuthorizer authorizer, + AclOperation operation, + ResourcePattern resourcePattern, + AuthorizationResult result + ) { + System.out.println("Assertion FAILED: Operation " + operation + " on " + + resourcePattern + " is " + result + ". Current ACLS:"); + + Iterable<AclBinding> allAcls = authorizer.acls(new AclBindingFilter( + new ResourcePatternFilter(ResourceType.ANY, null, PatternType.ANY), + new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY) + )); + + allAcls.forEach(System.out::println); + } + + private static AuthorizableRequestContext newRequestContext() throws Exception { + return new MockAuthorizableRequestContext.Builder() + .setPrincipal(new KafkaPrincipal(USER_TYPE, "user")) + .build(); + } + + private static StandardAcl buildTopicWriteAcl( + String resourceName, + PatternType patternType, + AclPermissionType permissionType + ) { + return new StandardAcl( + ResourceType.TOPIC, + resourceName, + patternType, + "User:*", + "*", + AclOperation.WRITE, + permissionType + ); + } + + private boolean isPrefix( + String value, + String prefix + ) { + if (prefix.length() > value.length()) { + return false; + } else { + String matchingPrefix = value.substring(0, prefix.length()); + return matchingPrefix.equals(prefix); + } + } + + private void addRandomNonMatchingPrefixDenyAcls( + StandardAuthorizer authorizer, + String topic, + Set<String> randomSuffixes + ) { + addRandomPrefixRules( + authorizer, + topic, + randomSuffixes, + AclPermissionType.DENY, + pattern -> !pattern.isEmpty() && !isPrefix(topic, pattern) + ); + } + + private void addRandomPrefixAllowAcls( + StandardAuthorizer authorizer, + String topic, + Set<String> randomSuffixes + ) { + addRandomPrefixRules( + authorizer, + topic, + randomSuffixes, + AclPermissionType.ALLOW, + pattern -> !pattern.isEmpty() + ); + } + + private void addRandomPrefixRules( + StandardAuthorizer authorizer, + String topic, + Set<String> randomSuffixes, + AclPermissionType permissionType, + Predicate<String> patternFilter + ) { + Set<String> prefixPatterns = new HashSet<>(); + + for (int i = 0; i < topic.length(); i++) { + String prefix = topic.substring(0, i); + for (String randomSuffix : randomSuffixes) { + String pattern = prefix + randomSuffix; + if (patternFilter.test(pattern)) { + prefixPatterns.add(pattern); + } + } + } + + for (String randomResourcePattern : prefixPatterns) { + authorizer.addAcl(Uuid.randomUuid(), buildTopicWriteAcl( + randomResourcePattern, + PatternType.PREFIXED, + permissionType + )); + } + } + +}