This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 5992f5141c3e7fffc709e37ffebee9280657bd84
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue Oct 4 16:31:43 2022 -0700

    MINOR: Add initial property tests for StandardAuthorizer (#12703)
    
    In https://github.com/apache/kafka/pull/12695, we discovered a gap in our 
testing of `StandardAuthorizer`. We addressed the specific case that was 
failing, but I think we need to establish a better methodology for testing 
which incorporates randomized inputs. This patch is a start in that direction. 
We implement a few basic property tests using jqwik which focus on prefix 
searching. It catches the case from https://github.com/apache/kafka/pull/12695 
prior to the fix. In the future, we [...]
    
    Reviewers: David Arthur <mum...@gmail.com>
---
 build.gradle                                       |   1 +
 checkstyle/import-control.xml                      |   2 +
 .../org/apache/kafka/common/internals/Topic.java   |  30 ++-
 .../ControllerConfigurationValidatorTest.scala     |   4 +-
 .../controller/ReplicationControlManagerTest.java  |   4 +-
 .../authorizer/StandardAuthorizerPropertyTest.java | 299 +++++++++++++++++++++
 6 files changed, 328 insertions(+), 12 deletions(-)

diff --git a/build.gradle b/build.gradle
index ee99cb7c870..56511cf61ce 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1152,6 +1152,7 @@ project(':metadata') {
     implementation libs.metrics
     compileOnly libs.log4j
     testImplementation libs.junitJupiter
+    testImplementation libs.jqwik
     testImplementation libs.hamcrest
     testImplementation libs.mockitoCore
     testImplementation libs.mockitoInline
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 45dc28d9e3a..44260f360bb 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -36,6 +36,7 @@
   <allow pkg="javax.net.ssl" />
   <allow pkg="javax.security" />
   <allow pkg="org.ietf.jgss" />
+  <allow pkg="net.jqwik.api" />
 
   <!-- no one depends on the server -->
   <disallow pkg="kafka" />
@@ -276,6 +277,7 @@
       <allow pkg="org.apache.kafka.common.resource" />
       <allow pkg="org.apache.kafka.controller" />
       <allow pkg="org.apache.kafka.metadata" />
+      <allow pkg="org.apache.kafka.common.internals" />
     </subpackage>
     <subpackage name="bootstrap">
       <allow pkg="org.apache.kafka.snapshot" />
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java 
b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
index fbf491bab98..92952a2c031 100644
--- a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
+++ b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
@@ -43,17 +43,31 @@ public class Topic {
         });
     }
 
-    public static void validate(String name, String logPrefix, 
Consumer<String> throwableConsumer) {
+    private static String detectInvalidTopic(String name) {
         if (name.isEmpty())
-            throwableConsumer.accept(logPrefix + " is illegal, it can't be 
empty");
-        if (".".equals(name) || "..".equals(name))
-            throwableConsumer.accept(logPrefix + " cannot be \".\" or \"..\"");
+            return "the empty string is not allowed";
+        if (".".equals(name))
+            return "'.' is not allowed";
+        if ("..".equals(name))
+            return "'..' is not allowed";
         if (name.length() > MAX_NAME_LENGTH)
-            throwableConsumer.accept(logPrefix + " is illegal, it can't be 
longer than " + MAX_NAME_LENGTH +
-                    " characters, " + logPrefix + ": " + name);
+            return "the length of '" + name + "' is longer than the max 
allowed length " + MAX_NAME_LENGTH;
         if (!containsValidPattern(name))
-            throwableConsumer.accept(logPrefix + " \"" + name + "\" is 
illegal, it contains a character other than " +
-                    "ASCII alphanumerics, '.', '_' and '-'");
+            return "'" + name + "' contains one or more characters other than 
" +
+                "ASCII alphanumerics, '.', '_' and '-'";
+        return null;
+    }
+
+    public static boolean isValid(String name) {
+        String reasonInvalid = detectInvalidTopic(name);
+        return reasonInvalid == null;
+    }
+
+    public static void validate(String name, String logPrefix, 
Consumer<String> throwableConsumer) {
+        String reasonInvalid = detectInvalidTopic(name);
+        if (reasonInvalid != null) {
+            throwableConsumer.accept(logPrefix + " is invalid: " +  
reasonInvalid);
+        }
     }
 
     public static boolean isInternal(String topic) {
diff --git 
a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
 
b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
index c89910ed231..36a8d71fb97 100644
--- 
a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
@@ -39,8 +39,8 @@ class ControllerConfigurationValidatorTest {
 
   @Test
   def testInvalidTopicNameRejected(): Unit = {
-    assertEquals("Topic name \"(<-invalid->)\" is illegal, it contains a 
character " +
-      "other than ASCII alphanumerics, '.', '_' and '-'",
+    assertEquals("Topic name is invalid: '(<-invalid->)' contains " +
+      "one or more characters other than ASCII alphanumerics, '.', '_' and 
'-'",
         assertThrows(classOf[InvalidTopicException], () => validator.validate(
           new ConfigResource(TOPIC, "(<-invalid->)"), emptyMap())). 
getMessage())
   }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 82e378a9823..335fd52a07d 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -809,9 +809,9 @@ public class ReplicationControlManagerTest {
         ReplicationControlManager.validateNewTopicNames(topicErrors, topics, 
Collections.emptyMap());
         Map<String, ApiError> expectedTopicErrors = new HashMap<>();
         expectedTopicErrors.put("", new ApiError(INVALID_TOPIC_EXCEPTION,
-            "Topic name is illegal, it can't be empty"));
+            "Topic name is invalid: the empty string is not allowed"));
         expectedTopicErrors.put(".", new ApiError(INVALID_TOPIC_EXCEPTION,
-            "Topic name cannot be \".\" or \"..\""));
+            "Topic name is invalid: '.' is not allowed"));
         assertEquals(expectedTopicErrors, topicErrors);
     }
 
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerPropertyTest.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerPropertyTest.java
new file mode 100644
index 00000000000..8935b92e2f9
--- /dev/null
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerPropertyTest.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata.authorizer;
+
+import net.jqwik.api.Assume;
+import net.jqwik.api.ForAll;
+import net.jqwik.api.Property;
+import net.jqwik.api.constraints.AlphaChars;
+import net.jqwik.api.constraints.Chars;
+import net.jqwik.api.constraints.NumericChars;
+import net.jqwik.api.constraints.Size;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.Predicate;
+
+import static org.apache.kafka.common.security.auth.KafkaPrincipal.USER_TYPE;
+import static 
org.apache.kafka.metadata.authorizer.StandardAuthorizerTest.PLAINTEXT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class StandardAuthorizerPropertyTest {
+
+    @Target({ ElementType.ANNOTATION_TYPE, ElementType.PARAMETER, 
ElementType.TYPE_USE })
+    @Retention(RetentionPolicy.RUNTIME)
+    @AlphaChars @NumericChars @Chars({ '_', '-', '.' })
+    public @interface ValidTopicChars { }
+
+    @Property(tries = 5000)
+    public void matchingPrefixDenyOverridesAllAllowRules(
+        @ForAll Random random,
+        @ForAll @ValidTopicChars String topic,
+        @ForAll @Size(max = 10) Set<@ValidTopicChars String> randomSuffixes
+    ) throws Exception {
+        Assume.that(Topic.isValid(topic));
+        StandardAuthorizer authorizer = buildAuthorizer();
+
+        // Create one DENY rule which matches and zero or more ALLOW rules 
which may or
+        // may not match. Regardless of the ALLOW rules, the final result 
should be DENIED.
+
+        String topicPrefix = topic.substring(0, 
random.nextInt(topic.length()));
+        StandardAcl denyRule = buildTopicWriteAcl(topicPrefix, 
PatternType.PREFIXED, AclPermissionType.DENY);
+        authorizer.addAcl(Uuid.randomUuid(), denyRule);
+        addRandomPrefixAllowAcls(authorizer, topic, randomSuffixes);
+
+        assertAuthorizationResult(
+            authorizer,
+            AuthorizationResult.DENIED,
+            AclOperation.WRITE,
+            new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
+        );
+    }
+
+    @Property(tries = 5000)
+    public void matchingLiteralDenyOverridesAllAllowRules(
+        @ForAll @ValidTopicChars String topic,
+        @ForAll @Size(max = 10) Set<@ValidTopicChars String> randomSuffixes
+    ) throws Exception {
+        Assume.that(Topic.isValid(topic));
+        StandardAuthorizer authorizer = buildAuthorizer();
+
+        // Create one DENY rule which matches and zero or more ALLOW rules 
which may or
+        // may not match. Regardless of the ALLOW rules, the final result 
should be DENIED.
+
+        StandardAcl denyRule = buildTopicWriteAcl(topic, PatternType.LITERAL, 
AclPermissionType.DENY);
+        authorizer.addAcl(Uuid.randomUuid(), denyRule);
+        addRandomPrefixAllowAcls(authorizer, topic, randomSuffixes);
+
+        assertAuthorizationResult(
+            authorizer,
+            AuthorizationResult.DENIED,
+            AclOperation.WRITE,
+            new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
+        );
+    }
+
+    @Property(tries = 5000)
+    public void matchingPrefixAllowWithNoMatchingDenyRules(
+        @ForAll Random random,
+        @ForAll @ValidTopicChars String topic,
+        @ForAll @Size(max = 10) Set<@ValidTopicChars String> randomSuffixes
+    ) throws Exception {
+        Assume.that(Topic.isValid(topic));
+        StandardAuthorizer authorizer = buildAuthorizer();
+
+        // Create one ALLOW rule which matches and zero or more DENY rules 
which do not
+        // match. The final result should be ALLOWED.
+
+        String topicPrefix = topic.substring(0, 
random.nextInt(topic.length()));
+        StandardAcl denyRule = buildTopicWriteAcl(topicPrefix, 
PatternType.PREFIXED, AclPermissionType.ALLOW);
+        authorizer.addAcl(Uuid.randomUuid(), denyRule);
+
+        addRandomNonMatchingPrefixDenyAcls(authorizer, topic, randomSuffixes);
+
+        assertAuthorizationResult(
+            authorizer,
+            AuthorizationResult.ALLOWED,
+            AclOperation.WRITE,
+            new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
+        );
+    }
+
+    @Property(tries = 5000)
+    public void matchingLiteralAllowWithNoMatchingDenyRules(
+        @ForAll @ValidTopicChars String topic,
+        @ForAll @Size(max = 10) Set<@ValidTopicChars String> randomSuffixes
+    ) throws Exception {
+        Assume.that(Topic.isValid(topic));
+        StandardAuthorizer authorizer = buildAuthorizer();
+
+        // Create one ALLOW rule which matches and zero or more DENY rules 
which do not
+        // match. The final result should be ALLOWED.
+
+        StandardAcl denyRule = buildTopicWriteAcl(topic, PatternType.LITERAL, 
AclPermissionType.ALLOW);
+        authorizer.addAcl(Uuid.randomUuid(), denyRule);
+
+        addRandomNonMatchingPrefixDenyAcls(authorizer, topic, randomSuffixes);
+
+        assertAuthorizationResult(
+            authorizer,
+            AuthorizationResult.ALLOWED,
+            AclOperation.WRITE,
+            new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
+        );
+    }
+
+    private StandardAuthorizer buildAuthorizer() {
+        StandardAuthorizer authorizer = new StandardAuthorizer();
+        authorizer.start(new 
StandardAuthorizerTest.AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT)));
+        authorizer.completeInitialLoad();
+        return authorizer;
+    }
+
+    private void assertAuthorizationResult(
+        StandardAuthorizer authorizer,
+        AuthorizationResult expectedResult,
+        AclOperation operation,
+        ResourcePattern pattern
+    ) throws Exception {
+        Action action = new Action(operation, pattern, 1, false, false);
+        List<AuthorizationResult> results = authorizer.authorize(
+            newRequestContext(),
+            Collections.singletonList(action)
+        );
+
+        assertEquals(1, results.size());
+        AuthorizationResult actualResult = results.get(0);
+
+        try {
+            assertEquals(expectedResult, actualResult);
+        } catch (Throwable e) {
+            printCounterExample(authorizer, operation, pattern, actualResult);
+            throw e;
+        }
+    }
+
+    private void printCounterExample(
+        StandardAuthorizer authorizer,
+        AclOperation operation,
+        ResourcePattern resourcePattern,
+        AuthorizationResult result
+    ) {
+        System.out.println("Assertion FAILED: Operation " + operation + " on " 
+
+            resourcePattern + " is " + result + ". Current ACLS:");
+
+        Iterable<AclBinding> allAcls = authorizer.acls(new AclBindingFilter(
+            new ResourcePatternFilter(ResourceType.ANY, null, PatternType.ANY),
+            new AccessControlEntryFilter(null, null, AclOperation.ANY, 
AclPermissionType.ANY)
+        ));
+
+        allAcls.forEach(System.out::println);
+    }
+
+    private static AuthorizableRequestContext newRequestContext() throws 
Exception {
+        return new MockAuthorizableRequestContext.Builder()
+            .setPrincipal(new KafkaPrincipal(USER_TYPE, "user"))
+            .build();
+    }
+
+    private static StandardAcl buildTopicWriteAcl(
+        String resourceName,
+        PatternType patternType,
+        AclPermissionType permissionType
+    ) {
+        return new StandardAcl(
+            ResourceType.TOPIC,
+            resourceName,
+            patternType,
+            "User:*",
+            "*",
+            AclOperation.WRITE,
+            permissionType
+        );
+    }
+
+    private boolean isPrefix(
+        String value,
+        String prefix
+    ) {
+        if (prefix.length() > value.length()) {
+            return false;
+        } else {
+            String matchingPrefix = value.substring(0, prefix.length());
+            return matchingPrefix.equals(prefix);
+        }
+    }
+
+    private void addRandomNonMatchingPrefixDenyAcls(
+        StandardAuthorizer authorizer,
+        String topic,
+        Set<String> randomSuffixes
+    ) {
+        addRandomPrefixRules(
+            authorizer,
+            topic,
+            randomSuffixes,
+            AclPermissionType.DENY,
+            pattern -> !pattern.isEmpty() && !isPrefix(topic, pattern)
+        );
+    }
+
+    private void addRandomPrefixAllowAcls(
+        StandardAuthorizer authorizer,
+        String topic,
+        Set<String> randomSuffixes
+    ) {
+        addRandomPrefixRules(
+            authorizer,
+            topic,
+            randomSuffixes,
+            AclPermissionType.ALLOW,
+            pattern -> !pattern.isEmpty()
+        );
+    }
+    
+    private void addRandomPrefixRules(
+        StandardAuthorizer authorizer,
+        String topic,
+        Set<String> randomSuffixes,
+        AclPermissionType permissionType,
+        Predicate<String> patternFilter
+    ) {
+        Set<String> prefixPatterns = new HashSet<>();
+
+        for (int i = 0; i < topic.length(); i++) {
+            String prefix = topic.substring(0, i);
+            for (String randomSuffix : randomSuffixes) {
+                String pattern = prefix + randomSuffix;
+                if (patternFilter.test(pattern)) {
+                    prefixPatterns.add(pattern);
+                }
+            }
+        }
+
+        for (String randomResourcePattern : prefixPatterns) {
+            authorizer.addAcl(Uuid.randomUuid(), buildTopicWriteAcl(
+                randomResourcePattern,
+                PatternType.PREFIXED,
+                permissionType
+            ));
+        }        
+    }
+
+}

Reply via email to