> On Feb. 3, 2015, 7:14 p.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/server/TopicCommandHelper.scala, lines 1-17 > > <https://reviews.apache.org/r/29301/diff/7/?file=821380#file821380line1> > > > > One general comment: > > > > For some topic commands, why use AdminUtils to write ZK path again > > instead of handle it via the controller directly? Or this is still WIP? > > Andrii Biletskyi wrote: > Not sure I understand you. You mean technially calling ZK client from > Controller class, not through TopicCommandHelper? If so - it's just to leave > KafkaApi clean and small. > > Guozhang Wang wrote: > For example, upon receiving a create-topic request, the helper class will > call AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK() which will > just write this request to ZK admin path for it to be captured by controller; > however since only the broker with the active controller will receive such > requests why don't we just hand off the request from KafkaApi to the > controller to handle it. > > One question, though, is that we need to make sure concurrency is correct > for controller handling multiple such tasks, and we have some thoughts about > how to deal with such cases (see Jiangjie and my commnets in KAFKA-1305). > > Andrii Biletskyi wrote: > Thanks for explanation. > So instead of current workflow: > CreateTopicRequest -> Helper class -> AdminUtils -> zk path is created -> > Controller's changeTopicListener picks up the change -> topic is created > You propose: > CreateTopicRequest -> Controller directly executes logic from > ChangeTopicListener > ? > Very interesting idea! Can we make a separate ticket for that? I tried to > port TopicCommand "as is" in order to have at least for now working > end-to-end infrastructure to handle Admin commands. I believe this is more > like refactoring TopicCommand (probably delete- and alterTopic should be > changed too). I'm a bit concerned adding this refactoring will require > additional efforts to test (especially taking into account your note about > KAFKA-1305) and time to agree on approach we will use to address this issue.
Agree. > On Feb. 3, 2015, 7:14 p.m., Guozhang Wang wrote: > > clients/src/main/java/org/apache/kafka/common/requests/admin/AbstractAdminRequest.java, > > lines 1-28 > > <https://reviews.apache.org/r/29301/diff/7/?file=821321#file821321line1> > > > > Wondering if an abstract admin request is necessary, as it does not > > have many common interface functions. > > Andrii Biletskyi wrote: > This is needed to avoid code dupliaction in admin clients. See > RequestDispatcher for example. > You will need to call admin request and get response of that type. Having > AbstractAdminRequest (specifically createResponseCounterpart) lets you have: > ``` > public <T extends AbstractAdminResponse> T > sendAdminRequest(AbstractAdminRequest<T> abstractRequest) throws Exception { > ``` > Instead of sendCreateTopicRequest, sendAlter... etc. If there is a better > and cleaner way to achive this - please let me know. > > Guozhang Wang wrote: > I see. How about changing "sendAdminRequest(AbstractAdminRequest<T>)" to > "sendRequest(ClientRequest)" and the caller like AlterTopicCommand.execute() > will be: > > AlterTopicRequest alterTopicRequest = // create the request > ClientRequest request = new ClientRequest(new RequestSend(...) ...) > dispatcher.sendRequest(request) > > This way we are duplicating the second line here in every upper-level > class, while saving the admin interface. I actually do not know which one is > better.. > > Andrii Biletskyi wrote: > Yes, but you will also need typed response. Let me continue your example: > > AlterTopicRequest alterTopicRequest = // create the request > ClientRequest request = new ClientRequest(new RequestSend(...) ...) > __ClientResponse response = dispatcher.sendRequest(request, > ApiKeys.ALTER_TOPIC)__ > __AlterTopicResponse alterTopicResponse = new > AlterTopicResponse(response.responseBody())__ > alterTopicResponse.// now get what you need from typed response > > And you will have this NetworkClient related Stuff (RequestSend, > ClientRequest ...) everywhere in you client code. But it looks pretty strange > you can't have generic method to send request and get immidiately response of > the required type. > > So really RequestDispatcher allready has sendRequest() as you suggest, > with sendAdminRequest I tried to address issue with getting response > counterpart. But I agree that solution might mislead people, so if doesn't > worth it - I'm okay to remove intermediate AbstractAdminRequest/Response. Makes sense, I am now OK with the admin request interface. - Guozhang ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29301/#review70790 ----------------------------------------------------------- On Jan. 14, 2015, 4:07 p.m., Andrii Biletskyi wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/29301/ > ----------------------------------------------------------- > > (Updated Jan. 14, 2015, 4:07 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1694 > https://issues.apache.org/jira/browse/KAFKA-1694 > > > Repository: kafka > > > Description > ------- > > KAFKA-1694 - introduced new type for Wire protocol, ported > ClusterMetadataResponse to it > > > KAFKA-1694 - Split Admin RQ/RP to separate messages > > > KAFKA-1694 - Admin commands can be handled only by controller; > DeleteTopicCommand NPE fix > > > KAFKA-1776 - Ported ConsumerGroupOffsetChecker > > > KAFKA-1776 - Ported PreferredReplicaElectionTool and ReassignPartitionsTool > to CLI > > > KAFKA-1694 - kafka-tools is uploaded on uploadAllArchives > > > KAFKA-1694 - ReviewBoard 29301 code review fixes > > > KAFKA-1694 - Data for ReassignPartitions and PreferredReplicaLeaderElection > is in json string > > > KAFKA-1694 - Added logging > > > KAFKA-1694 - fixed misprint in schema > > > KAFKA-1694 - DescribeTopicCommand supports all flags that TopicCommand does > > > KAFKA-1694 - Fixed compile error for new Selector constructor > > > KAFKA-1694 - Fixed ConsumerGroupChecker sends DescribeTopicResponse instead > of ConsumerGroupOffsetsResponse > > > KAFKA-1694 - Introduced AbstractAdminRequest/Response to avoid code > duplication for sending admin requests / receiving response. > > > Diffs > ----- > > bin/kafka.sh PRE-CREATION > bin/windows/kafka.bat PRE-CREATION > build.gradle c9ac43378c3bf5443f0f47c8ba76067237ecb348 > clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java > 109fc965e09b2ed186a073351bd037ac8af20a4c > clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java > 7517b879866fc5dad5f8d8ad30636da8bbe7784a > clients/src/main/java/org/apache/kafka/common/protocol/types/MaybeOf.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java > 121e880a941fcd3e6392859edba11a94236494cc > > clients/src/main/java/org/apache/kafka/common/requests/ClusterMetadataRequest.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/ClusterMetadataResponse.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/AbstractAdminRequest.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/AbstractAdminResponse.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/AlterTopicRequest.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/AlterTopicResponse.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/ConsumerGroupOffsetsRequest.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/ConsumerGroupOffsetsResponse.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/CreateTopicRequest.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/CreateTopicResponse.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/DeleteTopicRequest.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/DeleteTopicResponse.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/DescribeTopicOutput.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/DescribeTopicRequest.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/DescribeTopicResponse.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/ListTopicsOutput.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/ListTopicsRequest.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/ListTopicsResponse.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/PreferredReplicaLeaderElectionRequest.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/PreferredReplicaLeaderElectionResponse.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/ReassignPartitionsRequest.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/ReassignPartitionsResponse.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/TopicConfigDetails.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/TopicPartitionDetails.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/VerifyPreferredReplicaLeaderElectionRequest.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/VerifyPreferredReplicaLeaderElectionResponse.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/VerifyReassignPartitionsRequest.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/VerifyReassignPartitionsResponse.java > PRE-CREATION > > clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java > df37fc6d8f0db0b8192a948426af603be3444da4 > config/tools-log4j.properties 52f07c96019b4083fc78f62cfb0a81080327e847 > core/src/main/scala/kafka/api/ApiUtils.scala > 1f80de1638978901500df808ca5133308c9d1fca > core/src/main/scala/kafka/api/ClusterMetadataRequest.scala PRE-CREATION > core/src/main/scala/kafka/api/ClusterMetadataResponse.scala PRE-CREATION > core/src/main/scala/kafka/api/RequestKeys.scala > c24c0345feedc7b9e2e9f40af11bfa1b8d328c43 > core/src/main/scala/kafka/api/admin/AlterTopicRequest.scala PRE-CREATION > core/src/main/scala/kafka/api/admin/AlterTopicResponse.scala PRE-CREATION > core/src/main/scala/kafka/api/admin/ConsumerGroupOffsetsRequest.scala > PRE-CREATION > core/src/main/scala/kafka/api/admin/ConsumerGroupOffsetsResponse.scala > PRE-CREATION > core/src/main/scala/kafka/api/admin/CreateTopicRequest.scala PRE-CREATION > core/src/main/scala/kafka/api/admin/CreateTopicResponse.scala PRE-CREATION > core/src/main/scala/kafka/api/admin/DeleteTopicRequest.scala PRE-CREATION > core/src/main/scala/kafka/api/admin/DeleteTopicResponse.scala PRE-CREATION > core/src/main/scala/kafka/api/admin/DescribeTopicRequest.scala PRE-CREATION > core/src/main/scala/kafka/api/admin/DescribeTopicResponse.scala > PRE-CREATION > core/src/main/scala/kafka/api/admin/ListTopicsRequest.scala PRE-CREATION > core/src/main/scala/kafka/api/admin/ListTopicsResponse.scala PRE-CREATION > > core/src/main/scala/kafka/api/admin/PreferredReplicaLeaderElectionRequest.scala > PRE-CREATION > > core/src/main/scala/kafka/api/admin/PreferredReplicaLeaderElectionResponse.scala > PRE-CREATION > core/src/main/scala/kafka/api/admin/ReassignPartitionsRequest.scala > PRE-CREATION > core/src/main/scala/kafka/api/admin/ReassignPartitionsResponse.scala > PRE-CREATION > > core/src/main/scala/kafka/api/admin/VerifyPreferredReplicaLeaderElectionRequest.scala > PRE-CREATION > > core/src/main/scala/kafka/api/admin/VerifyPreferredReplicaLeaderElectionResponse.scala > PRE-CREATION > core/src/main/scala/kafka/api/admin/VerifyReassignPartitionsRequest.scala > PRE-CREATION > core/src/main/scala/kafka/api/admin/VerifyReassignPartitionsResponse.scala > PRE-CREATION > core/src/main/scala/kafka/common/AdminRequestFailedException.scala > PRE-CREATION > core/src/main/scala/kafka/common/ErrorMapping.scala > eedc2f5f21dd8755fba891998456351622e17047 > core/src/main/scala/kafka/common/InvalidRequestTargetException.scala > PRE-CREATION > core/src/main/scala/kafka/controller/ControllerChannelManager.scala > eb492f00449744bc8d63f55b393e2a1659d38454 > core/src/main/scala/kafka/controller/KafkaController.scala > 66df6d2fbdbdd556da6bea0df84f93e0472c8fbf > core/src/main/scala/kafka/server/KafkaApis.scala > c011a1b79bd6c4e832fe7d097daacb0d647d1cd4 > core/src/main/scala/kafka/server/MetadataCache.scala > bf81a1ab88c14be8697b441eedbeb28fa0112643 > core/src/main/scala/kafka/server/TopicCommandHelper.scala PRE-CREATION > core/src/main/scala/kafka/tools/ConsumerOffsetCheckerHelper.scala > PRE-CREATION > core/src/main/scala/kafka/tools/PreferredReplicaLeaderElectionHelper.scala > PRE-CREATION > core/src/main/scala/kafka/tools/ReassignPartitionsHelper.scala PRE-CREATION > core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala > cd16ced5465d098be7a60498326b2a98c248f343 > settings.gradle 83f764e6a4a15a5fdba232dce74a369870f26b45 > tools/src/main/java/org/apache/kafka/cli/BaseCommandOpts.java PRE-CREATION > tools/src/main/java/org/apache/kafka/cli/Boot.java PRE-CREATION > tools/src/main/java/org/apache/kafka/cli/RequestDispatcher.java > PRE-CREATION > tools/src/main/java/org/apache/kafka/cli/Shell.java PRE-CREATION > tools/src/main/java/org/apache/kafka/cli/command/AlterTopicCommand.java > PRE-CREATION > tools/src/main/java/org/apache/kafka/cli/command/ClearScreenCommand.java > PRE-CREATION > tools/src/main/java/org/apache/kafka/cli/command/Command.java PRE-CREATION > tools/src/main/java/org/apache/kafka/cli/command/CreateTopicCommand.java > PRE-CREATION > tools/src/main/java/org/apache/kafka/cli/command/DeleteTopicCommand.java > PRE-CREATION > tools/src/main/java/org/apache/kafka/cli/command/DescribeTopicCommand.java > PRE-CREATION > tools/src/main/java/org/apache/kafka/cli/command/ExitCommand.java > PRE-CREATION > tools/src/main/java/org/apache/kafka/cli/command/ListTopicsCommand.java > PRE-CREATION > > tools/src/main/java/org/apache/kafka/cli/command/PreferredReplicaLeaderElectionCommand.java > PRE-CREATION > > tools/src/main/java/org/apache/kafka/cli/command/PrintConsumerGroupOffsetsCommand.java > PRE-CREATION > tools/src/main/java/org/apache/kafka/cli/command/PrintHelpCommand.java > PRE-CREATION > > tools/src/main/java/org/apache/kafka/cli/command/ReassignPartitionsCommand.java > PRE-CREATION > tools/src/main/java/org/apache/kafka/cli/command/TopicSwitchCommand.java > PRE-CREATION > tools/src/main/java/org/apache/kafka/cli/util/StringUtils.java PRE-CREATION > > Diff: https://reviews.apache.org/r/29301/diff/ > > > Testing > ------- > > > Thanks, > > Andrii Biletskyi > >