[GitHub] [kafka] cmccabe commented on a change in pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

2021-05-11 Thread GitBox


cmccabe commented on a change in pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#discussion_r630598753



##
File path: tests/kafkatest/services/kafka/kafka.py
##
@@ -863,7 +878,7 @@ def kafka_configs_cmd_with_optional_security_settings(self, 
node, force_use_zk_c
 # configure JAAS to provide the typical client credentials
 jaas_conf_prop = KafkaService.JAAS_CONF_PROPERTY
 use_inter_broker_mechanism_for_client = False
-optional_jass_krb_system_props_prefix = "KAFKA_OPTS='-D%s -D%s' " 
% (jaas_conf_prop, KafkaService.KRB5_CONF)
+optional_jass_krb_system_props_prefix = "KAFKA_OPTS='-D%s -D%s' " 
% (jaas_conf_prop, KafkaService.KRB5_CONF) if security_protocol_to_use != "SSL" 
else ""

Review comment:
   I'm confused by the logic here.  If we have security_protocol==SSL then 
we do not define the jaas properties in KAFKA_OPTS?  Seems a bit weird -- why 
define this when we're using PLAINTEXT or when we're using SASL_SSL, but not 
when using SSL?  Can you add a comment about how this works?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

2021-05-11 Thread GitBox


cmccabe commented on a change in pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#discussion_r630601127



##
File path: tests/kafkatest/services/security/kafka_acls.py
##
@@ -66,17 +67,46 @@ def add_cluster_acl(self, kafka, principal, 
force_use_zk_connection=False, addit
This is necessary for the case where we are bootstrapping ACLs 
before Kafka is started or before authorizer is enabled
 :param additional_cluster_operations_to_grant may be set to ['Alter', 
'Create'] if the cluster is secured since these are required
to create SCRAM credentials and topics, respectively
+:param security_protocol set it to explicitly determine whether we use 
client or broker credentials, otherwise
+we use the the client security protocol unless inter-broker 
security protocol is PLAINTEXT, in which case we use PLAINTEXT.
+Then we use the broker's credentials if the selected security 
protocol matches the inter-broker security protocol,
+otherwise we use the client's credentials.
 """
 node = kafka.nodes[0]
 
 for operation in ['ClusterAction'] + 
additional_cluster_operations_to_grant:
 cmd = "%(cmd_prefix)s --add --cluster --operation=%(operation)s 
--allow-principal=%(principal)s" % {
-'cmd_prefix': 
kafka.kafka_acls_cmd_with_optional_security_settings(node, 
force_use_zk_connection),
+'cmd_prefix': 
kafka.kafka_acls_cmd_with_optional_security_settings(node, 
force_use_zk_connection, security_protocol),
 'operation': operation,
 'principal': principal
 }
 kafka.run_cli_tool(node, cmd)
 
+def remove_cluster_acl(self, kafka, principal, 
force_use_zk_connection=False, additional_cluster_operations_to_remove = [], 
security_protocol=None):

Review comment:
   Since this is new code, it would be really good to avoid introducing 
`force_use_zk_connection` here if possible.  I can't see anywhere in this PR 
where it is used, is this really necessary?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

2021-05-11 Thread GitBox


cmccabe commented on a change in pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#discussion_r630602650



##
File path: core/src/main/scala/kafka/server/MetadataSupport.scala
##
@@ -91,28 +93,42 @@ case class ZkSupport(adminManager: ZkAdminManager,
   override def controllerId: Option[Int] =  metadataCache.getControllerId
 }
 
-case class RaftSupport(fwdMgr: ForwardingManager, metadataCache: 
RaftMetadataCache, quotaCache: ClientQuotaCache)
+case class RaftSupport(fwdMgr: ForwardingManager, metadataCache: 
RaftMetadataCache, quotaCache: ClientQuotaCache, config: KafkaConfig)
 extends MetadataSupport {
+  if (config.requiresZookeeper) {
+throw new IllegalStateException("Config specifies ZooKeeper but metadata 
support instance is for Raft")
+  }
   override val forwardingManager: Option[ForwardingManager] = Some(fwdMgr)
   override def requireZkOrThrow(createException: => Exception): ZkSupport = 
throw createException
-  override def requireRaftOrThrow(createException: => Exception): RaftSupport 
= this
-
-  override def ensureConsistentWith(config: KafkaConfig): Unit = {
-if (config.requiresZookeeper) {
-  throw new IllegalStateException("Config specifies ZooKeeper but metadata 
support instance is for Raft")
+  override def requireZkAuthorizerOrThrow(createException: => Exception) = {
+if (!hasZkAuthorizer) {
+  throw createException
 }
   }
+  override def requireRaftOrThrow(createException: => Exception): RaftSupport 
= this
 
   override def maybeForward(request: RequestChannel.Request,
 handler: RequestChannel.Request => Unit,
 responseCallback: Option[AbstractResponse] => 
Unit): Unit = {
 if (!request.isForwarded) {
-  fwdMgr.forwardRequest(request, responseCallback)
+  request.header.apiKey match {
+case ApiKeys.CREATE_ACLS | ApiKeys.DELETE_ACLS =>
+  if (hasZkAuthorizer) {
+handler(request)

Review comment:
   Yeah, I had the same question.  It seems like the code is already in 
place.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

2021-05-11 Thread GitBox


cmccabe commented on a change in pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#discussion_r630603137



##
File path: core/src/main/scala/kafka/server/MetadataSupport.scala
##
@@ -91,28 +93,42 @@ case class ZkSupport(adminManager: ZkAdminManager,
   override def controllerId: Option[Int] =  metadataCache.getControllerId
 }
 
-case class RaftSupport(fwdMgr: ForwardingManager, metadataCache: 
RaftMetadataCache, quotaCache: ClientQuotaCache)
+case class RaftSupport(fwdMgr: ForwardingManager, metadataCache: 
RaftMetadataCache, quotaCache: ClientQuotaCache, config: KafkaConfig)
 extends MetadataSupport {
+  if (config.requiresZookeeper) {
+throw new IllegalStateException("Config specifies ZooKeeper but metadata 
support instance is for Raft")
+  }
   override val forwardingManager: Option[ForwardingManager] = Some(fwdMgr)
   override def requireZkOrThrow(createException: => Exception): ZkSupport = 
throw createException
-  override def requireRaftOrThrow(createException: => Exception): RaftSupport 
= this
-
-  override def ensureConsistentWith(config: KafkaConfig): Unit = {
-if (config.requiresZookeeper) {
-  throw new IllegalStateException("Config specifies ZooKeeper but metadata 
support instance is for Raft")
+  override def requireZkAuthorizerOrThrow(createException: => Exception) = {

Review comment:
   I don't think we need (or want) to special-case the ZK authorizer here.  
There is a Confluent authorizer that doesn't depend on ZK, and also a Cloudera 
one.   We don't want to break them.  Just forward everything




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

2021-05-17 Thread GitBox


cmccabe commented on a change in pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#discussion_r633749112



##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -99,6 +100,9 @@ class ControllerApis(val requestChannel: RequestChannel,
 case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
 case ApiKeys.SASL_AUTHENTICATE => 
handleSaslAuthenticateRequest(request)
 case ApiKeys.CREATE_PARTITIONS => handleCreatePartitions(request)
+case ApiKeys.DESCRIBE_ACLS => aclApis.handleDescribeAcls(request)

Review comment:
   We might want the controller to process DescribeAcls for debug purposes. 
 There's no reason to artificially disable it from processing the RPC, although 
I agree that it will normally not be used.

##
File path: clients/src/main/resources/common/message/DeleteAclsRequest.json
##
@@ -16,7 +16,7 @@
 {
   "apiKey": 31,
   "type": "request",
-  "listeners": ["zkBroker"],
+  "listeners": ["zkBroker", "broker", "controller"],

Review comment:
   (See above comment)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org