Repository: kafka Updated Branches: refs/heads/trunk 9f21837e9 -> 30d3cc631
KAFKA-3100; Broker.createBroker should work if json is version > 2 and still compatible Author: Ismael Juma <[email protected]> Reviewers: Grant Henke <[email protected]>, Jun Rao <[email protected]> Closes #773 from ijuma/kafka-3100-create-broker-version-check Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/30d3cc63 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/30d3cc63 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/30d3cc63 Branch: refs/heads/trunk Commit: 30d3cc6314bf896ea37d01c5a1d6b21d69a7053f Parents: 9f21837 Author: Ismael Juma <[email protected]> Authored: Mon Jan 25 15:12:37 2016 -0800 Committer: Jun Rao <[email protected]> Committed: Mon Jan 25 15:12:37 2016 -0800 ---------------------------------------------------------------------- core/src/main/scala/kafka/cluster/Broker.scala | 27 +++++++++++------- .../unit/kafka/cluster/BrokerEndPointTest.scala | 30 ++++++++++++++++---- 2 files changed, 40 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/30d3cc63/core/src/main/scala/kafka/cluster/Broker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala index 42b76cd..b56cae9 100755 --- a/core/src/main/scala/kafka/cluster/Broker.scala +++ b/core/src/main/scala/kafka/cluster/Broker.scala @@ -52,32 +52,37 @@ object Broker { * "SSL://host1:9093"] */ def createBroker(id: Int, brokerInfoString: String): Broker = { - if(brokerInfoString == null) - throw new BrokerNotAvailableException("Broker id %s does not exist".format(id)) + if (brokerInfoString == null) + throw new BrokerNotAvailableException(s"Broker id $id does not exist") try { Json.parseFull(brokerInfoString) match { case Some(m) => val brokerInfo = m.asInstanceOf[Map[String, Any]] val version = brokerInfo("version").asInstanceOf[Int] - val endpoints = version match { - case 1 => + val endpoints = + if (version < 1) + throw new KafkaException(s"Unsupported version of broker registration: $brokerInfoString") + else if (version == 1) { val host = brokerInfo("host").asInstanceOf[String] val port = brokerInfo("port").asInstanceOf[Int] Map(SecurityProtocol.PLAINTEXT -> new EndPoint(host, port, SecurityProtocol.PLAINTEXT)) - case 2 => + } + else { val listeners = brokerInfo("endpoints").asInstanceOf[List[String]] - listeners.map(listener => { + listeners.map { listener => val ep = EndPoint.createEndPoint(listener) (ep.protocolType, ep) - }).toMap - case _ => throw new KafkaException("Unknown version of broker registration. Only versions 1 and 2 are supported." + brokerInfoString) - } + }.toMap + } + + new Broker(id, endpoints) case None => - throw new BrokerNotAvailableException("Broker id %d does not exist".format(id)) + throw new BrokerNotAvailableException(s"Broker id $id does not exist") } } catch { - case t: Throwable => throw new KafkaException("Failed to parse the broker info from zookeeper: " + brokerInfoString, t) + case t: Throwable => + throw new KafkaException(s"Failed to parse the broker info from zookeeper: $brokerInfoString", t) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/30d3cc63/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala index 7b8bf4b..905612c 100644 --- a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala @@ -28,7 +28,7 @@ import scala.collection.mutable class BrokerEndPointTest extends Logging { @Test - def testSerDe() = { + def testSerDe() { val endpoint = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT) val listEndPoints = Map(SecurityProtocol.PLAINTEXT -> endpoint) @@ -42,7 +42,7 @@ class BrokerEndPointTest extends Logging { } @Test - def testHashAndEquals() = { + def testHashAndEquals() { val endpoint1 = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT) val endpoint2 = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT) val endpoint3 = new EndPoint("myhost", 1111, SecurityProtocol.PLAINTEXT) @@ -65,7 +65,25 @@ class BrokerEndPointTest extends Logging { } @Test - def testFromJSON() = { + def testFromJsonFutureVersion() { + // `createBroker` should support future compatible versions, we use a hypothetical future version here + val brokerInfoStr = """{ + "foo":"bar", + "version":100, + "host":"localhost", + "port":9092, + "jmx_port":9999, + "timestamp":"1416974968782", + "endpoints":["SSL://localhost:9093"] + }""" + val broker = Broker.createBroker(1, brokerInfoStr) + assert(broker.id == 1) + assert(broker.getBrokerEndPoint(SecurityProtocol.SSL).host == "localhost") + assert(broker.getBrokerEndPoint(SecurityProtocol.SSL).port == 9093) + } + + @Test + def testFromJsonV2 { val brokerInfoStr = "{\"version\":2," + "\"host\":\"localhost\"," + "\"port\":9092," + @@ -79,7 +97,7 @@ class BrokerEndPointTest extends Logging { } @Test - def testFromOldJSON() = { + def testFromJsonV1() = { val brokerInfoStr = "{\"jmx_port\":-1,\"timestamp\":\"1420485325400\",\"host\":\"172.16.8.243\",\"version\":1,\"port\":9091}" val broker = Broker.createBroker(1, brokerInfoStr) assert(broker.id == 1) @@ -88,7 +106,7 @@ class BrokerEndPointTest extends Logging { } @Test - def testBrokerEndpointFromURI() = { + def testBrokerEndpointFromUri() { var connectionString = "localhost:9092" var endpoint = BrokerEndPoint.createBrokerEndPoint(1, connectionString) assert(endpoint.host == "localhost") @@ -106,7 +124,7 @@ class BrokerEndPointTest extends Logging { } @Test - def testEndpointFromURI() = { + def testEndpointFromUri() { var connectionString = "PLAINTEXT://localhost:9092" var endpoint = EndPoint.createEndPoint(connectionString) assert(endpoint.host == "localhost")
