[ 
https://issues.apache.org/jira/browse/KAFKA-2584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14943694#comment-14943694
 ] 

Joel Koshy commented on KAFKA-2584:
-----------------------------------

Here is a quick summary of the patch we ended up applying at LinkedIn:
* In the implementation of {{EndPoint.createEndPoint(listener)}}: catch 
{{IllegalArgumentException}} (due to enum validation) and throw a new 
{{UnknownEndpointException}}.
* {{Broker.createBroker}} catches {{UnknownEndpointException}} and just filters 
out those endpoints.
* Config validation will still fail on unknown endpoints (since we don't catch 
the exception there).

We felt the enum validation is useful for config validation. Hopefully the 
above is clear enough, but we can upload a small PR as well. It is not 
necessarily the best/only option, but we can discuss alternatives.

> SecurityProtocol enum validation should be removed or relaxed for non-config 
> usages
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-2584
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2584
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Joel Koshy
>             Fix For: 0.9.0.0
>
>
> While deploying SSL to our clusters, we had to roll back due to another 
> compatibility issue similar to what we mentioned in passing in other 
> threads/KIP hangouts. i.e., picking up jars between official releases. 
> Fortunately, there is an easy server-side hot-fix we can do internally to 
> work around it. However, I would classify the issue below as a bug since 
> there is little point in doing endpoint type validation (except for config 
> validation).
> What happened here is that some (old) consumers (that do not care about SSL) 
> picked up a Kafka jar that understood multiple endpoints but did not have the 
> SSL feature. The rebalance fails because while creating the Broker objects we 
> are forced to validate all the endpoints.
> Yes the old consumer is going away, but this would affect tools as well. The 
> same issue could also happen on the brokers if we were to upgrade them to 
> include (say) a Kerberos endpoint. So the old brokers would not be able to 
> read the registration of newly upgraded brokers. Well you could get around 
> that by doing two rounds of deployment (one to get the new code, and another 
> to expose the Kerberos endpoint) but that’s inconvenient and I think 
> unnecessary. Although validation makes sense for configs, I think the current 
> validate everywhere is overkill. (i.e., an old consumer, tool or broker 
> should not complain because another broker can talk more protocols.)
> {noformat}
> kafka.common.KafkaException: Failed to parse the broker info from zookeeper: 
> {"jmx_port":-1,"timestamp":"1442952770627","endpoints":["PLAINTEXT://<host>:<plaintextport>","SSL://<host>:<sslport>"],"host”:”<host>","version":2,"port”:<port>}
>         at kafka.cluster.Broker$.createBroker(Broker.scala:61)
>         at kafka.utils.ZkUtils$$anonfun$getCluster$1.apply(ZkUtils.scala:520)
>         at kafka.utils.ZkUtils$$anonfun$getCluster$1.apply(ZkUtils.scala:518)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>         at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at kafka.utils.ZkUtils$.getCluster(ZkUtils.scala:518)
>         at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener
> ...
> Caused by: java.lang.IllegalArgumentException: No enum constant 
> org.apache.kafka.common.protocol.SecurityProtocol.SSL
>         at java.lang.Enum.valueOf(Enum.java:238)
>         at 
> org.apache.kafka.common.protocol.SecurityProtocol.valueOf(SecurityProtocol.java:24)
>         at kafka.cluster.EndPoint$.createEndPoint(EndPoint.scala:48)
>         at kafka.cluster.Broker$$anonfun$1.apply(Broker.scala:74)
>         at kafka.cluster.Broker$$anonfun$1.apply(Broker.scala:73)
>         at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>         at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>         at scala.collection.immutable.List.foreach(List.scala:318)
>         at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>         at kafka.cluster.Broker$.createBroker(Broker.scala:73)
>         ... 70 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to