----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/28769/#review69281 -----------------------------------------------------------
Thanks for the patch. Looks promising. Some comments. 1. I overlooked this when I suggested the new broker format in ZK. This means that we will need to upgrade all consumer clients before we can turn on the flag of using the new protocol on the brokers, which may not be convenient. Now, I think your earlier approach is probably better because of this? clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java <https://reviews.apache.org/r/28769/#comment113925> This is probably not intended. clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java <https://reviews.apache.org/r/28769/#comment113940> I thought the security protocol is a seperate config in the client and not part of the broker.list. Do we need to specify the protocol here? core/src/main/scala/kafka/cluster/Broker.scala <https://reviews.apache.org/r/28769/#comment113963> channel -> protocol? core/src/main/scala/kafka/cluster/Broker.scala <https://reviews.apache.org/r/28769/#comment113975> The implementation seems to represent the endpoints as a single string in csv, not a map? core/src/main/scala/kafka/javaapi/TopicMetadata.scala <https://reviews.apache.org/r/28769/#comment113978> Technically, this is an api change since it's used in javaapi.SimpleConsumer. The caller will now get a different type in the response. An alternative is to leave Broker as it is and create sth like BrokerProfile to include all endpoints. Perhaps, we need to discuss this in WIP a bit, whether it's better to break the api in order to use a more meaningingful class name, or not break the api and stick with a lousy name. core/src/main/scala/kafka/network/SocketServer.scala <https://reviews.apache.org/r/28769/#comment113981> KAFKA-1683 core/src/main/scala/kafka/network/SocketServer.scala <https://reviews.apache.org/r/28769/#comment113983> It seems that we should always be able to find the port in the map. So, perhaps we should just do portToProtocol(port). core/src/main/scala/kafka/server/KafkaConfig.scala <https://reviews.apache.org/r/28769/#comment113985> Perhaps we can give an example URI. core/src/main/scala/kafka/server/KafkaConfig.scala <https://reviews.apache.org/r/28769/#comment113969> Since this is also used for communication btw the controller and the brokers, perhaps it's better named as sth like "intra.broker.security.protocol"? core/src/main/scala/kafka/server/KafkaConfig.scala <https://reviews.apache.org/r/28769/#comment113949> I am thinking about how we should name this field. Since this is only needed for internal communication among brokers, perhaps we should name it as sth like use.new.intra.broker.wire.protocol. My next question is what happens if we have intra broker protocol changes in 2 releases. Do we want to use different names so that we can enable each change independantly? An alternative is to have the same property name and the meaning is to turn on intra broker changes introduced in this release only. The latter implies that one can't skip the upgrading of the intermediate release. So, my feeling is that probably the former will be better? Perhaps we can bring this up in the WIP discussion. core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala <https://reviews.apache.org/r/28769/#comment113990> This can be just endpoints(SecurityProtocol.PLAINTEXT). kafka-patch-review.py <https://reviews.apache.org/r/28769/#comment113920> Are the changes in this file intended? system_test/utils/kafka_system_test_utils.py <https://reviews.apache.org/r/28769/#comment113996> I thought protocol is specified separately, and not in broker.list? - Jun Rao On Jan. 14, 2015, 2:16 a.m., Gwen Shapira wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/28769/ > ----------------------------------------------------------- > > (Updated Jan. 14, 2015, 2:16 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1809 > https://issues.apache.org/jira/browse/KAFKA-1809 > > > Repository: kafka > > > Description > ------- > > trivial change to add byte serializer to ProducerPerformance; patched by Jun > Rao > > > first commit of refactoring. > > > changed topicmetadata to include brokerendpoints and fixed few unit tests > > > fixing systest and support for binding to default address > > > fixed system tests > > > fix default address binding and ipv6 support > > > fix some issues regarding endpoint parsing. Also, larger segments for systest > make the validation much faster > > > added link to security wiki in doc > > > fixing unit test after rename of ProtocolType to SecurityProtocol > > > Following Joe's advice, added security protocol enum on client side, and > modified protocol to use ID instead of string. > > > validate producer config against enum > > > add a second protocol for testing and modify SocketServerTests to check on > multi-ports > > > Reverted the metadata request changes and removed the explicit security > protocol argument. Instead the socketserver will determine the protocol based > on the port and add this to the request > > > bump version for UpdateMetadataRequest and added support for rolling upgrades > with new config > > > following tests - fixed LeaderAndISR protocol and ZK registration for > backward compatibility > > > cleaned up some changes that were actually not necessary. hopefully making > this patch slightly easier to review > > > undoing some changes that don't belong here > > > bring back config lost in cleanup > > > fixes neccessary for an all non-plaintext cluster to work > > > minor modifications following comments by Jun > > > added missing license > > > formatting > > > clean up imports > > > cleaned up V2 to not include host+port field. Using use.new.protocol flag to > decide which version to serialize > > > change endpoints collection in Broker to Map[protocol, endpoint], mostly to > be clear that we intend to have one endpoint per protocol > > > validate that listeners and advertised listeners have unique ports and > protocols > > > support legacy configs > > > some fixes following rebase > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java > 8b3e565edd1ae04d8d34bd9f1a41e9fa8c880a75 > > clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java > 1b828007975ef8893717b425ed96874d4ef7053f > > clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/utils/Utils.java > 527dd0f9c47fce7310b7a37a9b95bf87f1b9c292 > clients/src/test/java/org/apache/kafka/common/utils/ClientUtilsTest.java > 6e37ea553f73d9c584641c48c56dbf6e62ba5f88 > clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java > a39fab532f73148316a56c0f8e9197f38ea66f79 > config/server.properties b0e4496a8ca736b6abe965a430e8ce87b0e8287f > core/src/main/scala/kafka/admin/AdminUtils.scala > 28b12c7b89a56c113b665fbde1b95f873f8624a3 > core/src/main/scala/kafka/admin/TopicCommand.scala > 285c0333ff43543d3e46444c1cd9374bb883bb59 > core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala > 84f60178f6ebae735c8aa3e79ed93fe21ac4aea7 > core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala > 4ff7e8f8cc695551dd5d2fe65c74f6b6c571e340 > core/src/main/scala/kafka/api/TopicMetadata.scala > 0190076df0adf906ecd332284f222ff974b315fc > core/src/main/scala/kafka/api/TopicMetadataResponse.scala > 92ac4e687be22e4800199c0666bfac5e0059e5bb > core/src/main/scala/kafka/api/UpdateMetadataRequest.scala > 530982e36b17934b8cc5fb668075a5342e142c59 > core/src/main/scala/kafka/client/ClientUtils.scala > ebba87f0566684c796c26cb76c64b4640a5ccfde > core/src/main/scala/kafka/cluster/Broker.scala > 0060add008bb3bc4b0092f2173c469fce0120be6 > core/src/main/scala/kafka/cluster/BrokerEndPoint.scala PRE-CREATION > core/src/main/scala/kafka/cluster/EndPoint.scala PRE-CREATION > core/src/main/scala/kafka/cluster/SecurityProtocol.scala PRE-CREATION > core/src/main/scala/kafka/common/BrokerEndPointNotAvailableException.scala > PRE-CREATION > core/src/main/scala/kafka/consumer/ConsumerConfig.scala > 9ebbee6c16dc83767297c729d2d74ebbd063a993 > core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala > b9e2bea7b442a19bcebd1b350d39541a8c9dd068 > core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala > ee6139c901082358382c5ef892281386bf6fc91b > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > 191a8677444e53b043e9ad6e94c5a9191c32599e > core/src/main/scala/kafka/controller/ControllerChannelManager.scala > eb492f00449744bc8d63f55b393e2a1659d38454 > core/src/main/scala/kafka/controller/KafkaController.scala > 66df6d2fbdbdd556da6bea0df84f93e0472c8fbf > core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala > 1b28861cdf7dfb30fc696b54f8f8f05f730f4ece > core/src/main/scala/kafka/javaapi/TopicMetadata.scala > f384e04678df10a5b46a439f475c63371bf8e32b > core/src/main/scala/kafka/network/RequestChannel.scala > 7b1db3dbbb2c0676f166890f566c14aa248467ab > core/src/main/scala/kafka/network/SocketServer.scala > 39b1651b680b2995cedfde95d74c086d9c6219ef > core/src/main/scala/kafka/producer/ProducerPool.scala > 43df70bb461dd3e385e6b20396adef3c4016a3fc > core/src/main/scala/kafka/server/AbstractFetcherManager.scala > 20c00cb8cc2351950edbc8cb1752905a0c26e79f > core/src/main/scala/kafka/server/AbstractFetcherThread.scala > 8c281d4668f92eff95a4a5df3c03c4b5b20e7095 > core/src/main/scala/kafka/server/KafkaApis.scala > c011a1b79bd6c4e832fe7d097daacb0d647d1cd4 > core/src/main/scala/kafka/server/KafkaConfig.scala > bbd3fd75e83961c971e50cf83d48db79a3da8791 > core/src/main/scala/kafka/server/KafkaHealthcheck.scala > 4acdd70fe9c1ee78d6510741006c2ece65450671 > core/src/main/scala/kafka/server/KafkaServer.scala > a069eb9272c92ef62387304b60de1fe473d7ff49 > core/src/main/scala/kafka/server/MetadataCache.scala > bf81a1ab88c14be8697b441eedbeb28fa0112643 > core/src/main/scala/kafka/server/ReplicaFetcherManager.scala > 351dbbad3bdb709937943b336a5b0a9e0162a5e2 > core/src/main/scala/kafka/server/ReplicaFetcherThread.scala > 6879e730282185bda3d6bc3659cb15af0672cecf > core/src/main/scala/kafka/server/ReplicaManager.scala > e58fbb922e93b0c31dff04f187fcadb4ece986d7 > core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala > d1e7c434e77859d746b8dc68dd5d5a3740425e79 > core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala > ba6ddd7a909df79a0f7d45e8b4a2af94ea0fceb6 > core/src/main/scala/kafka/tools/SimpleConsumerShell.scala > b4f903b6c7c3bb725cac7c05eb1f885906413c4d > core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala > 111c9a8b94ce45d95551482e9fd3f8c1cccbf548 > core/src/main/scala/kafka/utils/Utils.scala > 738c1af9ef5de16fdf5130daab69757a14c48b5c > core/src/main/scala/kafka/utils/ZkUtils.scala > c14bd455b6642f5e6eb254670bef9f57ae41d6cb > core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala > 5ec613cdb50b93bfe4477e89554dfc3768759b18 > core/src/test/scala/integration/kafka/api/ProducerSendTest.scala > b15237b76def3b234924280fa3fdb25dbb0cc0dc > core/src/test/scala/other/kafka/TestOffsetManager.scala > 41f334d48897b3027ed54c58bbf4811487d3b191 > core/src/test/scala/unit/kafka/KafkaConfigTest.scala > 4d36b8b1173f60d43463c13c9d8c1275a84c8c28 > core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala > 1bf2667f47853585bc33ffb3e28256ec5f24ae84 > core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala > cd16ced5465d098be7a60498326b2a98c248f343 > core/src/test/scala/unit/kafka/cluster/BrokerTest.scala PRE-CREATION > core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala > c0355cc0135c6af2e346b4715659353a31723b86 > core/src/test/scala/unit/kafka/integration/FetcherTest.scala > 25845abbcad2e79f56f729e59239b738d3ddbc9d > core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala > 35dc071b1056e775326981573c9618d8046e601d > core/src/test/scala/unit/kafka/log/LogTest.scala > c2dd8eb69da8c0982a0dd20231c6f8bd58eb623e > core/src/test/scala/unit/kafka/network/SocketServerTest.scala > 78b431f9c88cca1bc5e430ffd41083d0e92b7e75 > core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala > 1db6ac329f7b54e600802c8a623f80d159d4e69b > core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala > d60d8e0f49443f4dc8bc2cad6e2f951eda28f5cb > core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala > f0c4a56b61b4f081cf4bee799c6e9c523ff45e19 > core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala > 2377abe4933e065d037828a214c3a87e1773a8ef > core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala > c2ba07c5fdbaf0e65ca033b2e4d88f45a8a15b2e > core/src/test/scala/unit/kafka/server/LogOffsetTest.scala > c06ee756bf0fe07e5d3c92823a476c960b37afd6 > core/src/test/scala/unit/kafka/utils/TestUtils.scala > ac15d34425795d5be20c51b01fa1108bdcd66583 > kafka-patch-review.py b7f132f9d210b8648859ab8f9c89f30ec128ab38 > system_test/replication_testsuite/testcase_1/testcase_1_properties.json > 0c6d7a316cc6b51ac0755ca03558507db0706c31 > system_test/utils/kafka_system_test_utils.py > 41d511cbc310fa87e0f2cd2f772e479e8e3ae4e2 > > Diff: https://reviews.apache.org/r/28769/diff/ > > > Testing > ------- > > > Thanks, > > Gwen Shapira > >