[GitHub] [kafka] mumrah commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

2022-09-14 Thread GitBox


mumrah commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r971306933


##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##
@@ -138,6 +139,15 @@ public List authorize(
 return results;
 }
 
+@Override
+public AuthorizationResult authorizeByResourceType(

Review Comment:
   If I understand correctly, this implementation was added to take advantage 
of the new binary search approach in the ACL array. IOW, an optimization over 
the default `authorizeByResourceType` impl?



##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizer.java:
##
@@ -76,14 +78,15 @@ public interface ClusterMetadataAuthorizer extends 
Authorizer {
 void loadSnapshot(Map acls);
 
 /**
- * Add a new ACL. Any ACL with the same ID will be replaced.
- */
-void addAcl(Uuid id, StandardAcl acl);
-
-/**
- * Remove the ACL with the given ID.
+ * Add or remove ACLs.
+ *
+ * @param newAcls   The ACLs to add.
+ * @param removedAclIds The ACL IDs to remove.
  */
-void removeAcl(Uuid id);
+void applyAclChanges(

Review Comment:
   We should document that this method does not expect duplicates or allow 
replacing ACL by ID. 



##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##
@@ -182,59 +174,58 @@ StandardAuthorizerData copyWithNewConfig(int nodeId,
 loadingComplete,
 newSuperUsers,
 newDefaultResult,
-aclsByResource,
-aclsById);
+acls);
 }
 
-StandardAuthorizerData copyWithNewAcls(Collection> aclEntries) {
-StandardAuthorizerData newData = new StandardAuthorizerData(
-log,
-aclMutator,
-loadingComplete,
-superUsers,
-defaultRule.result,
-new ConcurrentSkipListSet<>(),
-new ConcurrentHashMap<>());
-for (Entry entry : aclEntries) {
-newData.addAcl(entry.getKey(), entry.getValue());
-}
-log.info("Applied {} acl(s) from image.", aclEntries.size());
-return newData;
+StandardAuthorizerData copyWithAllNewAcls(
+Collection> newAclEntries
+) {
+return copyWithNewAcls(EMPTY_ACLS, newAclEntries, 
Collections.emptySet());
 }
 
-void addAcl(Uuid id, StandardAcl acl) {
-try {
-StandardAcl prevAcl = aclsById.putIfAbsent(id, acl);
-if (prevAcl != null) {
-throw new RuntimeException("An ACL with ID " + id + " already 
exists.");
-}
-if (!aclsByResource.add(acl)) {
-aclsById.remove(id);
-throw new RuntimeException("Unable to add the ACL with ID " + 
id +
-" to aclsByResource");
-}
-log.trace("Added ACL {}: {}", id, acl);
-} catch (Throwable e) {
-log.error("addAcl error", e);
-throw e;
-}
+StandardAuthorizerData copyWithAclChanges(
+Collection> newAclEntries,
+Set removedAclIds
+) {
+return copyWithNewAcls(acls, newAclEntries, removedAclIds);
 }
 
-void removeAcl(Uuid id) {
-try {
-StandardAcl acl = aclsById.remove(id);
-if (acl == null) {
-throw new RuntimeException("ID " + id + " not found in 
aclsById.");
+StandardAuthorizerData copyWithNewAcls(
+StandardAclWithId[] existingAcls,
+Collection> newAclEntries,
+Set removedAclIds
+) {
+int newSize = existingAcls.length + newAclEntries.size() - 
removedAclIds.size();
+StandardAclWithId[] newAcls = new StandardAclWithId[newSize];
+int numRemoved = 0, j = 0;
+for (int i = 0; i < existingAcls.length; i++) {
+StandardAclWithId aclWithId = existingAcls[i];
+if (removedAclIds.contains(aclWithId.id())) {
+numRemoved++;
+} else {
+newAcls[j++] = aclWithId;
 }
-if (!aclsByResource.remove(acl)) {
-throw new RuntimeException("Unable to remove the ACL with ID " 
+ id +
-" from aclsByResource");
+}
+if (numRemoved < removedAclIds.size()) {
+throw new RuntimeException("Only located " + numRemoved + " out of 
" +
+removedAclIds.size() + " removed ACL ID(s). removedAclIds = " +
+removedAclIds.stream().map(a -> 
a.toString()).collect(Collectors.joining(", ")));
+}
+if (!newAclEntries.isEmpty()) {
+int i = 0;
+for (Entry entry : newAclEntries) {
+newAcls[existingAcls.length + i] = new 
StandardAclWithId(entry.getKey(), entry.getValue());

Review Comment:
   Should this index be offset by the number we removed?



##

[GitHub] [kafka] mumrah commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

2022-09-14 Thread GitBox


mumrah commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r971116340


##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##
@@ -182,59 +174,58 @@ StandardAuthorizerData copyWithNewConfig(int nodeId,
 loadingComplete,
 newSuperUsers,
 newDefaultResult,
-aclsByResource,
-aclsById);
+acls);
 }
 
-StandardAuthorizerData copyWithNewAcls(Collection> aclEntries) {
-StandardAuthorizerData newData = new StandardAuthorizerData(
-log,
-aclMutator,
-loadingComplete,
-superUsers,
-defaultRule.result,
-new ConcurrentSkipListSet<>(),
-new ConcurrentHashMap<>());
-for (Entry entry : aclEntries) {
-newData.addAcl(entry.getKey(), entry.getValue());
-}
-log.info("Applied {} acl(s) from image.", aclEntries.size());
-return newData;
+StandardAuthorizerData copyWithAllNewAcls(
+Collection> newAclEntries
+) {
+return copyWithNewAcls(EMPTY_ACLS, newAclEntries, 
Collections.emptySet());
 }
 
-void addAcl(Uuid id, StandardAcl acl) {
-try {
-StandardAcl prevAcl = aclsById.putIfAbsent(id, acl);
-if (prevAcl != null) {
-throw new RuntimeException("An ACL with ID " + id + " already 
exists.");
-}
-if (!aclsByResource.add(acl)) {
-aclsById.remove(id);
-throw new RuntimeException("Unable to add the ACL with ID " + 
id +
-" to aclsByResource");
-}

Review Comment:
   I think we lost this existing ID and duplicate ACL check in the new array 
code. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

2022-09-14 Thread GitBox


mumrah commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r971108745


##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##
@@ -96,18 +97,16 @@ public void completeInitialLoad(Exception e) {
 }
 
 @Override
-public void addAcl(Uuid id, StandardAcl acl) {
-data.addAcl(id, acl);
-}
-
-@Override
-public void removeAcl(Uuid id) {
-data.removeAcl(id);
+public synchronized void loadSnapshot(Map acls) {
+data = data.copyWithAllNewAcls(acls.entrySet());
 }
 
 @Override
-public synchronized void loadSnapshot(Map acls) {
-data = data.copyWithNewAcls(acls.entrySet());
+public synchronized void applyAclChanges(
+Collection> newAcls,

Review Comment:
   Oh, i must have been looking at StandardAclWithId :), this looks good as-is



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

2022-09-14 Thread GitBox


mumrah commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r970977191


##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##
@@ -397,18 +387,34 @@ private MatchingAclRule findAclRule(
 return matchingAclBuilder.build();
 }
 
+/**
+ * Use a binary search to find the index of the first ACL which is greater 
than or
+ * equal to the given ACL. This may be equal to the end of the array if 
there are
+ * no such ACLs.
+ */
+private int indexOfFirstAclGreaterThanOrEqualTo(StandardAcl exemplar) {
+int i = Arrays.binarySearch(acls,
+new StandardAclWithId(Uuid.ZERO_UUID, exemplar),
+StandardAclWithId.ACL_COMPARATOR);
+// Arrays.binarySearch returns a positive number if it found an exact 
match, and
+// a negative number otherwise.

Review Comment:
   Might comment about what the negative return value indicates. It helps L404 
make more sense :)



##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##
@@ -109,14 +110,9 @@ public class StandardAuthorizerData {
 private final DefaultRule defaultRule;
 
 /**
- * Contains all of the current ACLs sorted by (resource type, resource 
name).
+ * An immutable array of all the current ACLs sorted by (resource type, 
resource name).
  */
-private final ConcurrentSkipListSet aclsByResource;

Review Comment:
   I see we are replacing the skip-list set with a sorted array. Don't we need 
to guard against duplicates in the array?
   
   If we used a TreeSet here, it would be closer to the current implementation 
and I think it should have linear time when copying from another TreeSet



##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##
@@ -96,18 +97,16 @@ public void completeInitialLoad(Exception e) {
 }
 
 @Override
-public void addAcl(Uuid id, StandardAcl acl) {
-data.addAcl(id, acl);
-}
-
-@Override
-public void removeAcl(Uuid id) {
-data.removeAcl(id);
+public synchronized void loadSnapshot(Map acls) {
+data = data.copyWithAllNewAcls(acls.entrySet());
 }
 
 @Override
-public synchronized void loadSnapshot(Map acls) {
-data = data.copyWithNewAcls(acls.entrySet());
+public synchronized void applyAclChanges(
+Collection> newAcls,

Review Comment:
   Could we just take a `Collection` since the acl has the ID as a 
property?



##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##
@@ -109,14 +110,9 @@ public class StandardAuthorizerData {
 private final DefaultRule defaultRule;
 
 /**
- * Contains all of the current ACLs sorted by (resource type, resource 
name).
+ * An immutable array of all the current ACLs sorted by (resource type, 
resource name).
  */
-private final ConcurrentSkipListSet aclsByResource;
-
-/**
- * Contains all of the current ACLs indexed by UUID.
- */
-private final ConcurrentHashMap aclsById;

Review Comment:
   Guess we don't need this anymore if we are removing `addAcl` and `removeAcl`?



##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##
@@ -182,59 +173,58 @@ StandardAuthorizerData copyWithNewConfig(int nodeId,
 loadingComplete,
 newSuperUsers,
 newDefaultResult,
-aclsByResource,
-aclsById);
+acls);
 }
 
-StandardAuthorizerData copyWithNewAcls(Collection> aclEntries) {
-StandardAuthorizerData newData = new StandardAuthorizerData(
-log,
-aclMutator,
-loadingComplete,
-superUsers,
-defaultRule.result,
-new ConcurrentSkipListSet<>(),
-new ConcurrentHashMap<>());
-for (Entry entry : aclEntries) {
-newData.addAcl(entry.getKey(), entry.getValue());
-}
-log.info("Applied {} acl(s) from image.", aclEntries.size());
-return newData;
+StandardAuthorizerData copyWithAllNewAcls(
+Collection> newAclEntries
+) {
+return copyWithNewAcls(EMPTY_ACLS, newAclEntries, 
Collections.emptySet());
 }
 
-void addAcl(Uuid id, StandardAcl acl) {
-try {
-StandardAcl prevAcl = aclsById.putIfAbsent(id, acl);
-if (prevAcl != null) {
-throw new RuntimeException("An ACL with ID " + id + " already 
exists.");
-}
-if (!aclsByResource.add(acl)) {
-aclsById.remove(id);
-throw new RuntimeException("Unable to add the ACL with ID " + 
id +
-" to aclsByResource");
-}
-log.trace("Added ACL {}: {}", id, acl);
-} catch (Throwable e) {
-