cmccabe commented on a change in pull request #11649: URL: https://github.com/apache/kafka/pull/11649#discussion_r801175105
########## File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala ########## @@ -209,6 +212,33 @@ class BrokerMetadataPublisher(conf: KafkaConfig, clientQuotaMetadataManager.update(clientQuotasDelta) } + // Apply changes to ACLs. This needs to be handled carefully because while we are + // applying these changes, the Authorizer is continuing to return authorization + // results in other threads. We never want to expose an invalid state. For example, + // if the user created a DENY ALL acl and then created an ALLOW ACL for topic foo, + // we want to apply those changes in that order, not the reverse order! Otherwise + // there could be a window during which incorrect authorization results are returned. + Option(delta.aclsDelta()).foreach( aclsDelta => + _authorizer match { + case Some(authorizer: ClusterMetadataAuthorizer) => if (aclsDelta.isSnapshotDelta()) { + // If the delta resulted from a snapshot load, we want to apply the new changes + // all at once using ClusterMetadataAuthorizer#loadSnapshot. If this is the + // first snapshot load, it will also complete the futures returned by + // Authorizer#start (which we wait for before processing RPCs). + authorizer.loadSnapshot(newImage.acls().acls()) + } else { + // Because the changes map is a LinkedHashMap, the deltas will be returned in + // the order they were performed. + aclsDelta.changes().entrySet().forEach(e => Review comment: does scala unpack the `Entry<A, B>` object into a tuple? I didn't think it did... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org