dajac commented on a change in pull request #11295: URL: https://github.com/apache/kafka/pull/11295#discussion_r702694677
########## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ########## @@ -354,10 +354,10 @@ object KafkaConfig { // For ZooKeeper TLS client authentication to be enabled the client must (at a minimum) configure itself as using TLS // with both a client connection socket and a key store location explicitly set. - private[kafka] def zkTlsClientAuthEnabled(zkClientConfig: ZKClientConfig) = { - getZooKeeperClientProperty(zkClientConfig, ZkSslClientEnableProp).getOrElse("false") == "true" && - getZooKeeperClientProperty(zkClientConfig, ZkClientCnxnSocketProp).isDefined && - getZooKeeperClientProperty(zkClientConfig, ZkSslKeyStoreLocationProp).isDefined + private[kafka] def zkTlsClientAuthEnabled(zkClientConfig: ZKClientConfig): Boolean = { + zooKeeperClientProperty(zkClientConfig, ZkSslClientEnableProp).map(_ == "true").getOrElse(false) && Review comment: nit: We could use `exists` here. ########## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ########## @@ -95,24 +95,24 @@ object AclAuthorizer { } } - private[authorizer] def zkClientConfigFromKafkaConfigAndMap(kafkaConfig: KafkaConfig, configMap: mutable.Map[String, _<:Any]): Option[ZKClientConfig] = { + private[authorizer] def zkClientConfigFromKafkaConfigAndMap(kafkaConfig: KafkaConfig, configMap: mutable.Map[String, _<:Any]): ZKClientConfig = { val zkSslClientEnable = configMap.get(AclAuthorizer.configPrefix + KafkaConfig.ZkSslClientEnableProp). map(_.toString).getOrElse(kafkaConfig.zkSslClientEnable.toString).toBoolean if (!zkSslClientEnable) - None + new ZKClientConfig else { // start with the base config from the Kafka configuration // be sure to force creation since the zkSslClientEnable property in the kafkaConfig could be false val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(kafkaConfig, true) // add in any prefixed overlays - KafkaConfig.ZkSslConfigToSystemPropertyMap.foreach{ case (kafkaProp, sysProp) => { - val prefixedValue = configMap.get(AclAuthorizer.configPrefix + kafkaProp) - if (prefixedValue.isDefined) - zkClientConfig.get.setProperty(sysProp, + KafkaConfig.ZkSslConfigToSystemPropertyMap.forKeyValue { (kafkaProp, sysProp) => { Review comment: nit: We could remove the extra block defined by the last `{` on this line. ########## File path: core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala ########## @@ -1340,6 +1343,32 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { assertEquals(ZooDefs.Ids.READ_ACL_UNSAFE.asScala, zkClient.getAcl(mockPath)) } + @Test + def testJuteMaxBufffer(): Unit = { + // default case + assertEquals("4194304", zkClient.currentZooKeeper.getClientConfig.getProperty(ZKConfig.JUTE_MAXBUFFER)) + + // Value set directly on ZKClientConfig takes precedence over system property + System.setProperty(ZKConfig.JUTE_MAXBUFFER, (3000 * 1024).toString) + try { + val clientConfig1 = new ZKClientConfig + clientConfig1.setProperty(ZKConfig.JUTE_MAXBUFFER, (2000 * 1024).toString) + val client1 = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout, + zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, name = "KafkaZkClient", + zkClientConfig = clientConfig1) + try assertEquals("2048000", client1.currentZooKeeper.getClientConfig.getProperty(ZKConfig.JUTE_MAXBUFFER)) + finally client1.close() Review comment: Is it worth defining an inner helper method to avoid the repeated code here? Something like: `assertPropery(config: ZKClientConfig, property: String, expectedValue: String)`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org