Repository: kafka Updated Branches: refs/heads/trunk 1388ed9ba -> e9a72ceab
KAFKA-3132: URI scheme in "listeners" property should not be case-sen⦠â¦sitive Author: Grant Henke <[email protected]> Reviewers: Ismael Juma Closes #811 from granthenke/listeners-case Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e9a72cea Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e9a72cea Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e9a72cea Branch: refs/heads/trunk Commit: e9a72ceab6e0ceaf4d2125756f07154cd15a7178 Parents: 1388ed9 Author: Grant Henke <[email protected]> Authored: Tue Jan 26 16:47:26 2016 -0800 Committer: Gwen Shapira <[email protected]> Committed: Tue Jan 26 16:47:26 2016 -0800 ---------------------------------------------------------------------- .../main/java/org/apache/kafka/clients/ClientUtils.java | 2 +- .../apache/kafka/common/protocol/SecurityProtocol.java | 5 +++++ .../java/org/apache/kafka/common/network/EchoServer.java | 2 +- core/src/main/scala/kafka/cluster/EndPoint.scala | 4 ++-- core/src/main/scala/kafka/server/KafkaConfig.scala | 2 +- .../test/scala/unit/kafka/server/KafkaConfigTest.scala | 10 ++++++++++ 6 files changed, 20 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e9a72cea/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java index b614198..0201257 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java @@ -73,7 +73,7 @@ public class ClientUtils { * @return configured ChannelBuilder based on the configs. */ public static ChannelBuilder createChannelBuilder(Map<String, ?> configs) { - SecurityProtocol securityProtocol = SecurityProtocol.valueOf((String) configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); + SecurityProtocol securityProtocol = SecurityProtocol.forName((String) configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); if (!SecurityProtocol.nonTestingValues().contains(securityProtocol)) throw new ConfigException("Invalid SecurityProtocol " + securityProtocol); return ChannelBuilders.create(securityProtocol, Mode.CLIENT, LoginType.CLIENT, configs); http://git-wip-us.apache.org/repos/asf/kafka/blob/e9a72cea/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java index cbd0c42..905c670 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java @@ -83,6 +83,11 @@ public enum SecurityProtocol { return CODE_TO_SECURITY_PROTOCOL.get(id); } + /** Case insensitive lookup by protocol name */ + public static SecurityProtocol forName(String name) { + return SecurityProtocol.valueOf(name.toUpperCase()); + } + /** * Returns the set of non-testing SecurityProtocol instances, that is, SecurityProtocol instances that are suitable * for production usage. http://git-wip-us.apache.org/repos/asf/kafka/blob/e9a72cea/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java index 9354bfe..44b5a5f 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java +++ b/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java @@ -43,7 +43,7 @@ class EchoServer extends Thread { public EchoServer(Map<String, ?> configs) throws Exception { this.protocol = configs.containsKey("security.protocol") ? - SecurityProtocol.valueOf((String) configs.get("security.protocol")) : SecurityProtocol.PLAINTEXT; + SecurityProtocol.forName((String) configs.get("security.protocol")) : SecurityProtocol.PLAINTEXT; if (protocol == SecurityProtocol.SSL) { this.sslFactory = new SslFactory(Mode.SERVER); this.sslFactory.configure(configs); http://git-wip-us.apache.org/repos/asf/kafka/blob/e9a72cea/core/src/main/scala/kafka/cluster/EndPoint.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/EndPoint.scala b/core/src/main/scala/kafka/cluster/EndPoint.scala index 76997b5..32c27ed 100644 --- a/core/src/main/scala/kafka/cluster/EndPoint.scala +++ b/core/src/main/scala/kafka/cluster/EndPoint.scala @@ -44,8 +44,8 @@ object EndPoint { def createEndPoint(connectionString: String): EndPoint = { val uriParseExp = """^(.*)://\[?([0-9a-zA-Z\-.:]*)\]?:(-?[0-9]+)""".r connectionString match { - case uriParseExp(protocol, "", port) => new EndPoint(null, port.toInt, SecurityProtocol.valueOf(protocol)) - case uriParseExp(protocol, host, port) => new EndPoint(host, port.toInt, SecurityProtocol.valueOf(protocol)) + case uriParseExp(protocol, "", port) => new EndPoint(null, port.toInt, SecurityProtocol.forName(protocol)) + case uriParseExp(protocol, host, port) => new EndPoint(host, port.toInt, SecurityProtocol.forName(protocol)) case _ => throw new KafkaException("Unable to parse " + connectionString + " to a broker endpoint") } } http://git-wip-us.apache.org/repos/asf/kafka/blob/e9a72cea/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 4911809..00bf0cb 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -800,7 +800,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra val leaderImbalancePerBrokerPercentage = getInt(KafkaConfig.LeaderImbalancePerBrokerPercentageProp) val leaderImbalanceCheckIntervalSeconds = getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp) val uncleanLeaderElectionEnable: java.lang.Boolean = getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp) - val interBrokerSecurityProtocol = SecurityProtocol.valueOf(getString(KafkaConfig.InterBrokerSecurityProtocolProp)) + val interBrokerSecurityProtocol = SecurityProtocol.forName(getString(KafkaConfig.InterBrokerSecurityProtocolProp)) val interBrokerProtocolVersion = ApiVersion(getString(KafkaConfig.InterBrokerProtocolVersionProp)) /** ********* Controlled shutdown configuration ***********/ http://git-wip-us.apache.org/repos/asf/kafka/blob/e9a72cea/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 9ddc2c1..e8ffb5b 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -235,6 +235,16 @@ class KafkaConfigTest { } @Test + def testCaseInsensitiveListenerProtocol() { + val props = new Properties() + props.put(KafkaConfig.BrokerIdProp, "1") + props.put(KafkaConfig.ZkConnectProp, "localhost:2181") + props.put(KafkaConfig.ListenersProp, "plaintext://localhost:9091,SsL://localhost:9092") + + assert(isValidKafkaConfig(props)) + } + + @Test def testListenerDefaults() { val props = new Properties() props.put(KafkaConfig.BrokerIdProp, "1")
