[GitHub] [kafka] hachikuji commented on a change in pull request #11649: KAFKA-13646: Implement KIP-801: KRaft authorizer

2022-02-08 Thread GitBox


hachikuji commented on a change in pull request #11649:
URL: https://github.com/apache/kafka/pull/11649#discussion_r802061270



##
File path: 
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
##
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.authorizer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.slf4j.Logger;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import static org.apache.kafka.common.acl.AclOperation.ALTER;
+import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.DELETE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.READ;
+import static org.apache.kafka.common.acl.AclOperation.WRITE;
+import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
+import static org.apache.kafka.common.resource.PatternType.LITERAL;
+import static org.apache.kafka.server.authorizer.AuthorizationResult.ALLOWED;
+import static org.apache.kafka.server.authorizer.AuthorizationResult.DENIED;
+
+
+/**
+ * A class which encapsulates the configuration and the ACL data owned by 
StandardAuthorizer.
+ *
+ * The methods in this class support lockless concurrent access.
+ */
+public class StandardAuthorizerData {
+/**
+ * The host or name string used in ACLs that match any host or name.
+ */
+public static final String WILDCARD = "*";
+
+/**
+ * The principal entry used in ACLs that match any principal.
+ */
+public static final String WILDCARD_PRINCIPAL = "User:*";
+
+/**
+ * The logger to use.
+ */
+final Logger log;
+
+/**
+ * The current AclMutator.
+ */
+final AclMutator aclMutator;
+
+/**
+ * A statically configured set of users that are authorized to do anything.
+ */
+private final Set superUsers;
+
+/**
+ * The result to return if no ACLs match.
+ */
+private final AuthorizationResult defaultResult;
+
+/**
+ * Contains all of the current ACLs sorted by (resource type, pattern 
type, resource name).
+ */
+private final ConcurrentSkipListSet aclsByResource;
+
+/**
+ * Contains all of the current ACLs indexed by UUID.
+ */
+private final ConcurrentHashMap aclsById;
+
+private static Logger createLogger(int nodeId) {
+return new LogContext("[StandardAuthorizer " + nodeId + "] 
").logger(StandardAuthorizerData.class);
+}
+
+static StandardAuthorizerData createEmpty() {
+return new StandardAuthorizerData(createLogger(-1),
+null,
+Collections.emptySet(),
+DENIED,
+new ConcurrentSkipListSet<>(), new ConcurrentHashMap<>());
+}
+
+private StandardAuthorizerData(Logger log,
+   AclMutator aclMutator,
+   Set superUsers,
+   AuthorizationResult defaultResult,
+   ConcurrentSkipListSet 
aclsByResource,
+   ConcurrentHashMap 
aclsById) {
+this.log = log;
+this.aclMutator = aclMutator;
+this.superUsers = superUsers;
+this.defaultResult = 

[GitHub] [kafka] hachikuji commented on a change in pull request #11649: KAFKA-13646: Implement KIP-801: KRaft authorizer

2022-02-08 Thread GitBox


hachikuji commented on a change in pull request #11649:
URL: https://github.com/apache/kafka/pull/11649#discussion_r802060134



##
File path: 
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
##
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.authorizer;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+import static org.apache.kafka.server.authorizer.AuthorizationResult.ALLOWED;
+import static org.apache.kafka.server.authorizer.AuthorizationResult.DENIED;
+
+
+/**
+ * The standard authorizer which is used in KRaft-based clusters if no other 
authorizer is
+ * configured.
+ */
+public class StandardAuthorizer implements ClusterMetadataAuthorizer {
+public final static String SUPER_USERS_CONFIG = "super.users";
+
+public final static String ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG = 
"allow.everyone.if.no.acl.found";
+
+/**
+ * A future which is completed once we have loaded a snapshot.
+ * TODO: complete this when we reach the high water mark.
+ */
+private final CompletableFuture initialLoadFuture = 
CompletableFuture.completedFuture(null);
+
+/**
+ * The current data. Can be read without a lock. Must be written while 
holding the object lock.

Review comment:
   Fair enough.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11649: KAFKA-13646: Implement KIP-801: KRaft authorizer

2022-02-07 Thread GitBox


hachikuji commented on a change in pull request #11649:
URL: https://github.com/apache/kafka/pull/11649#discussion_r801214042



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java
##
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.metadata.AccessControlEntryRecord;
+import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer;
+import org.apache.kafka.metadata.authorizer.StandardAcl;
+import org.apache.kafka.metadata.authorizer.StandardAclRecordIterator;
+import org.apache.kafka.metadata.authorizer.StandardAclWithId;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.authorizer.AclCreateResult;
+import org.apache.kafka.server.authorizer.AclDeleteResult;
+import 
org.apache.kafka.server.authorizer.AclDeleteResult.AclBindingDeleteResult;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineHashSet;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+
+
+/**
+ * The AclControlManager manages any ACLs that are stored in the 
__cluster_metadata topic.
+ * If the ACLs are stored externally (such as in ZooKeeper) then there will be 
nothing for
+ * this manager to do, and the authorizer field will always be Optional.empty.
+ *
+ * Because the Authorizer is being concurrently used by other threads, we need 
to be
+ * careful about snapshots. We don't want the Authorizer to act based on 
partial state
+ * during the loading process. Therefore, unlike most of the other managers,
+ * AclControlManager needs to receive callbacks when we start loading a 
snapshot and when
+ * we finish. The prepareForSnapshotLoad callback clears the authorizer field, 
preventing
+ * any changes from affecting the authorizer until completeSnapshotLoad is 
called.
+ * Note that the Authorizer's start() method will block until the first 
snapshot load has
+ * completed, which is another reason the prepare / complete callbacks are 
needed.
+ */
+public class AclControlManager {
+private final TimelineHashMap idToAcl;
+private final TimelineHashSet existingAcls;
+private final Optional authorizer;

Review comment:
   I guess my expectation was that we would only construct 
`AclControlManager` if there was a `ClusterMetadataAuthorizer` implementation. 
Why would we need it otherwise?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11649: KAFKA-13646: Implement KIP-801: KRaft authorizer

2022-02-07 Thread GitBox


hachikuji commented on a change in pull request #11649:
URL: https://github.com/apache/kafka/pull/11649#discussion_r801157654



##
File path: core/src/main/scala/kafka/server/ControllerServer.scala
##
@@ -173,8 +174,12 @@ class ControllerServer(
 setMetrics(new 
QuorumControllerMetrics(KafkaYammerMetrics.defaultRegistry())).
 setCreateTopicPolicy(createTopicPolicy.asJava).
 setAlterConfigPolicy(alterConfigPolicy.asJava).
-setConfigurationValidator(new ControllerConfigurationValidator()).
-build()
+setConfigurationValidator(new ControllerConfigurationValidator())
+  authorizer match {

Review comment:
   Makes sense.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11649: KAFKA-13646: Implement KIP-801: KRaft authorizer

2022-02-07 Thread GitBox


hachikuji commented on a change in pull request #11649:
URL: https://github.com/apache/kafka/pull/11649#discussion_r801154082



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -3366,7 +3366,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 })
   }
 
-  def handleEnvelope(request: RequestChannel.Request, requestLocal: 
RequestLocal): Unit = {
+  def handleala(request: RequestChannel.Request, requestLocal: RequestLocal): 
Unit = {

Review comment:
   This looks unintentional




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11649: KAFKA-13646: Implement KIP-801: KRaft authorizer

2022-02-07 Thread GitBox


hachikuji commented on a change in pull request #11649:
URL: https://github.com/apache/kafka/pull/11649#discussion_r800941876



##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -689,7 +689,7 @@ class ControllerApis(val requestChannel: RequestChannel,
   def handleCreatePartitions(request: RequestChannel.Request): Unit = {
 val future = createPartitions(request.body[CreatePartitionsRequest].data,
   authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME),
-  names => authHelper.filterByAuthorized(request.context, CREATE, TOPIC, 
names)(n => n))
+  names => authHelper.filterByAuthorized(request.context, ALTER, TOPIC, 
names)(n => n))

Review comment:
   I was going to suggest pulling this fix out so that we could backport 
more easily, but I guess it doesn't matter since kraft never had an authorizer 
before.

##
File path: core/src/main/scala/kafka/server/ControllerServer.scala
##
@@ -173,8 +174,12 @@ class ControllerServer(
 setMetrics(new 
QuorumControllerMetrics(KafkaYammerMetrics.defaultRegistry())).
 setCreateTopicPolicy(createTopicPolicy.asJava).
 setAlterConfigPolicy(alterConfigPolicy.asJava).
-setConfigurationValidator(new ControllerConfigurationValidator()).
-build()
+setConfigurationValidator(new ControllerConfigurationValidator())
+  authorizer match {

Review comment:
   nit: can be simplified. Something like this:
   ```scala
   authorizer.foreach(controllerBuilder.setAuthorizer)
   ```

##
File path: 
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
##
@@ -209,6 +212,33 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
 clientQuotaMetadataManager.update(clientQuotasDelta)
   }
 
+  // Apply changes to ACLs. This needs to be handled carefully because 
while we are
+  // applying these changes, the Authorizer is continuing to return 
authorization
+  // results in other threads. We never want to expose an invalid state. 
For example,
+  // if the user created a DENY ALL acl and then created an ALLOW ACL for 
topic foo,
+  // we want to apply those changes in that order, not the reverse order! 
Otherwise
+  // there could be a window during which incorrect authorization results 
are returned.
+  Option(delta.aclsDelta()).foreach( aclsDelta =>
+_authorizer match {
+  case Some(authorizer: ClusterMetadataAuthorizer) => if 
(aclsDelta.isSnapshotDelta()) {
+// If the delta resulted from a snapshot load, we want to apply 
the new changes
+// all at once using ClusterMetadataAuthorizer#loadSnapshot. If 
this is the
+// first snapshot load, it will also complete the futures returned 
by
+   // Authorizer#start (which we wait for before processing RPCs).
+authorizer.loadSnapshot(newImage.acls().acls())
+  } else {
+// Because the changes map is a LinkedHashMap, the deltas will be 
returned in
+// the order they were performed.
+aclsDelta.changes().entrySet().forEach(e =>
+  if (e.getValue().isPresent()) {
+authorizer.addAcl(e.getKey(), e.getValue().get())
+  } else {
+authorizer.removeAcl(e.getKey())
+  })
+  }
+  case _ => // No ClusterMetadataAuthorizer is configured. There is 
nothing to do.

Review comment:
   Should we log some kind of warning in this case? It may be a 
misconfiguration I guess. Or maybe the user has changed authorizers?

##
File path: 
metadata/src/main/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizer.java
##
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.authorizer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.requests.ApiError;
+import