[jira] [Commented] (KAFKA-6680) Fix config initialization in DynamicBrokerConfig

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16407769#comment-16407769
 ] 

ASF GitHub Bot commented on KAFKA-6680:
---

rajinisivaram closed pull request #4740: MINOR: Document workaround for 
KAFKA-6680 for 1.1
URL: https://github.com/apache/kafka/pull/4740
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/configuration.html b/docs/configuration.html
index df58ba76b77..7b975ba1d57 100644
--- a/docs/configuration.html
+++ b/docs/configuration.html
@@ -189,6 +189,12 @@ Adding and Removing Listeners
   Inter-broker listener must be configured using the static broker 
configuration inter.broker.listener.name
   or inter.broker.security.protocol.
 
+  Note: In Kafka version 1.1.0, thread config updates and listener 
updates are processed by a broker only if at least one other
+  broker config was configured dynamically prior to this update. To workaround 
this issue, a dummy property may be dynamically configured
+  prior to an update of thread/listener configs or the update may be retried 
after reverting the change. Dynamic update of listeners in
+  1.1.0 requires listener.security.protocol.map to be updated to 
a map containing security protocols for all the listeners,
+  even if listener name is a security protocol. These issues will be fixed in 
the next release.
+
   3.2 Topic-Level 
Configs
 
   Configurations pertinent to topics have both a server default as well an 
optional per-topic override. If no per-topic configuration is given the server 
default is used. The override can be set at topic creation time by giving one 
or more --config options. This example creates a topic named 
my-topic with a custom max message size and flush rate:


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix config initialization in DynamicBrokerConfig
> 
>
> Key: KAFKA-6680
> URL: https://issues.apache.org/jira/browse/KAFKA-6680
> Project: Kafka
>  Issue Type: Bug
>Reporter: Manikumar
>Assignee: Manikumar
>Priority: Major
> Fix For: 1.2.0
>
>
> Below issues observed while testing dynamic config update feature
> 1. {{kafkaConfig}} doesn't get updated during {{initialize}} if there are no 
> dynamic configs defined in ZK.
> 2.  update DynamicListenerConfig.validateReconfiguration() to check new 
> Listeners must be subset of listener map



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6680) Fix config initialization in DynamicBrokerConfig

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406439#comment-16406439
 ] 

ASF GitHub Bot commented on KAFKA-6680:
---

rajinisivaram opened a new pull request #4740: MINOR: Document workaround for 
KAFKA-6680 for 1.1
URL: https://github.com/apache/kafka/pull/4740
 
 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix config initialization in DynamicBrokerConfig
> 
>
> Key: KAFKA-6680
> URL: https://issues.apache.org/jira/browse/KAFKA-6680
> Project: Kafka
>  Issue Type: Bug
>Reporter: Manikumar
>Assignee: Manikumar
>Priority: Major
> Fix For: 1.2.0
>
>
> Below issues observed while testing dynamic config update feature
> 1. {{kafkaConfig}} doesn't get updated during {{initialize}} if there are no 
> dynamic configs defined in ZK.
> 2.  update DynamicListenerConfig.validateReconfiguration() to check new 
> Listeners must be subset of listener map



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6680) Fix config initialization in DynamicBrokerConfig

2018-03-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16405544#comment-16405544
 ] 

ASF GitHub Bot commented on KAFKA-6680:
---

rajinisivaram closed pull request #4731:  KAFKA-6680: Fix issues related to 
Dynamic Broker configs
URL: https://github.com/apache/kafka/pull/4731
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 3236af01bbb..92fd5d73136 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -137,6 +137,7 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
   private val dynamicConfigPasswordEncoder = 
maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
 
   private[server] def initialize(zkClient: KafkaZkClient): Unit = {
+currentConfig = new KafkaConfig(kafkaConfig.props, false, None)
 val adminZkClient = new AdminZkClient(zkClient)
 updateDefaultConfig(adminZkClient.fetchEntityConfig(ConfigType.Broker, 
ConfigEntityName.Default))
 val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, 
kafkaConfig.brokerId.toString)
@@ -719,8 +720,8 @@ class DynamicListenerConfig(server: KafkaServer) extends 
BrokerReconfigurable wi
 val oldListeners = listenersToMap(oldConfig.listeners)
 if (!newAdvertisedListeners.keySet.subsetOf(newListeners.keySet))
   throw new ConfigException(s"Advertised listeners 
'$newAdvertisedListeners' must be a subset of listeners '$newListeners'")
-if (newListeners.keySet != newConfig.listenerSecurityProtocolMap.keySet)
-  throw new ConfigException(s"Listeners '$newListeners' and listener map 
'${newConfig.listenerSecurityProtocolMap}' don't match")
+if 
(!newListeners.keySet.subsetOf(newConfig.listenerSecurityProtocolMap.keySet))
+  throw new ConfigException(s"Listeners '$newListeners' must be subset of 
listener map '${newConfig.listenerSecurityProtocolMap}'")
 newListeners.keySet.intersect(oldListeners.keySet).foreach { listenerName 
=>
   val prefix = listenerName.configPrefix
   val newListenerProps = immutableListenerConfigs(newConfig, prefix)
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 2e3e2742dbc..bca98d2cee0 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -24,10 +24,12 @@ import kafka.utils.TestUtils
 import org.apache.kafka.common.Reconfigurable
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.config.{ConfigException, SslConfigs}
+import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.Test
 
 import scala.collection.JavaConverters._
+import scala.collection.Set
 
 class DynamicBrokerConfigTest {
 
@@ -248,4 +250,56 @@ class DynamicBrokerConfigTest {
 newConfigWithNewSecret.dynamicConfig.updateBrokerConfig(0, persistedProps)
 assertEquals("staticLoginModule required;", 
newConfigWithNewSecret.values.get(KafkaConfig.SaslJaasConfigProp).asInstanceOf[Password].value)
   }
+
+  @Test
+  def testDynamicListenerConfig(): Unit = {
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 9092)
+val oldConfig =  KafkaConfig.fromProps(props)
+val kafkaServer = EasyMock.createMock(classOf[kafka.server.KafkaServer])
+EasyMock.expect(kafkaServer.config).andReturn(oldConfig).anyTimes()
+EasyMock.replay(kafkaServer)
+
+props.put(KafkaConfig.ListenersProp, 
"PLAINTEXT://hostname:9092,SASL_PLAINTEXT://hostname:9093")
+val newConfig = KafkaConfig(props)
+
+val dynamicListenerConfig = new DynamicListenerConfig(kafkaServer)
+dynamicListenerConfig.validateReconfiguration(newConfig)
+  }
+
+  @Test
+  def testDynamicConfigInitializationWithoutConfigsInZK(): Unit = {
+val zkClient = EasyMock.createMock(classOf[kafka.zk.KafkaZkClient])
+EasyMock.expect(zkClient.getEntityConfigs(EasyMock.anyString(), 
EasyMock.anyString())).andReturn(new java.util.Properties()).anyTimes()
+EasyMock.replay(zkClient)
+
+val oldConfig =  KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, 
TestUtils.MockZkConnect, port = 9092))
+val dynamicBrokerConfig = new DynamicBrokerConfig(oldConfig)
+dynamicBrokerConfig.initialize(zkClient)
+dynamicBrokerConfig.addBrokerReconfigurable(new TestDynamicThreadPool)
+
+val newprops = new Properties()
+newprops.put(KafkaConfig.NumIoThreadsProp,