> On Feb. 3, 2015, 7:14 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/server/TopicCommandHelper.scala, lines 1-17
> > <https://reviews.apache.org/r/29301/diff/7/?file=821380#file821380line1>
> >
> >     One general comment:
> >     
> >     For some topic commands, why use AdminUtils to write ZK path again 
> > instead of handle it via the controller directly? Or this is still WIP?
> 
> Andrii Biletskyi wrote:
>     Not sure I understand you. You mean technially calling ZK client from 
> Controller class, not through TopicCommandHelper? If so - it's just to leave 
> KafkaApi clean and small.
> 
> Guozhang Wang wrote:
>     For example, upon receiving a create-topic request, the helper class will 
> call AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK() which will 
> just write this request to ZK admin path for it to be captured by controller; 
> however since only the broker with the active controller will receive such 
> requests why don't we just hand off the request from KafkaApi to the 
> controller to handle it.
>     
>     One question, though, is that we need to make sure concurrency is correct 
> for controller handling multiple such tasks, and we have some thoughts about 
> how to deal with such cases (see Jiangjie and my commnets in KAFKA-1305).

Thanks for explanation.
So instead of current workflow:
CreateTopicRequest -> Helper class -> AdminUtils -> zk path is created -> 
Controller's changeTopicListener picks up the change -> topic is created
You propose:
CreateTopicRequest -> Controller directly executes logic from 
ChangeTopicListener
?
Very interesting idea! Can we make a separate ticket for that? I tried to port 
TopicCommand "as is" in order to have at least for now working end-to-end 
infrastructure to handle Admin commands. I believe this is more like 
refactoring TopicCommand (probably delete- and alterTopic should be changed 
too). I'm a bit concerned adding this refactoring will require additional 
efforts to test (especially taking into account your note about KAFKA-1305) and 
time to agree on approach we will use to address this issue.


> On Feb. 3, 2015, 7:14 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/controller/ControllerChannelManager.scala, lines 
> > 301-310
> > <https://reviews.apache.org/r/29301/diff/7/?file=821376#file821376line301>
> >
> >     Do not understand the rationale behind this: could you add some 
> > comments? Particularly, why we want to send an empty metadata map to the 
> > brokers with forceSendBrokerInfo?
> 
> Andrii Biletskyi wrote:
>     Thanks, this is done because on startup we don't send UpdateMetadaRequest 
> (updateMetadataRequestMap is empty) and thus brokers' cache is not filled 
> with brokers and controller. This leads to ClusterMetadataRequest can't be 
> served correctly. 
>     I'm not sure this is the best way to do it, open for suggestions.
> 
> Guozhang Wang wrote:
>     In this case can we just use addUpdateMetadataRequestForBrokers() before 
> calling sendRequestsToBrokers()?

If I understood correctly - addUpdateMetadataRequestForBrokers() is already 
called, it's just nothing is added to UpdateMetadata. The steps are the 
following:
1. One broker cluster is started (no topics)
2. KafkaController.onControllerFailover() is called
3. sendUpdateMetadataRequest()
4. addUpdateMetadataRequest(): updateMetadataRequest is created foreach 
controllerContext.partitionLeadershipInfo.keySet (which is empty)
5. sendRequestsToBrokers(): we send UpdateMetadata foreach broker from 
updateMetadataRequestMap (which is empty) -> broker holding a controller's role 
doesn't receive UpdateMetadataRequest

So essentially the problem is that UpdateMetadaRequest holds data about 
controller, brokers _and_ partitionState but we send UpdateMetadaRequest only 
if there is partitionState update to be sent.


> On Feb. 3, 2015, 7:14 p.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/common/requests/admin/AbstractAdminRequest.java,
> >  lines 1-28
> > <https://reviews.apache.org/r/29301/diff/7/?file=821321#file821321line1>
> >
> >     Wondering if an abstract admin request is necessary, as it does not 
> > have many common interface functions.
> 
> Andrii Biletskyi wrote:
>     This is needed to avoid code dupliaction in admin clients. See 
> RequestDispatcher for example.
>     You will need to call admin request and get response of that type. Having 
> AbstractAdminRequest (specifically createResponseCounterpart) lets you have:
>     ```
>     public <T extends AbstractAdminResponse> T 
> sendAdminRequest(AbstractAdminRequest<T> abstractRequest) throws Exception {
>     ```
>     Instead of sendCreateTopicRequest, sendAlter... etc. If there is a better 
> and cleaner way to achive this - please let me know.
> 
> Guozhang Wang wrote:
>     I see. How about changing "sendAdminRequest(AbstractAdminRequest<T>)" to 
> "sendRequest(ClientRequest)" and the caller like AlterTopicCommand.execute() 
> will be:
>     
>     AlterTopicRequest alterTopicRequest = // create the request
>     ClientRequest request = new ClientRequest(new RequestSend(...) ...)
>     dispatcher.sendRequest(request)
>     
>     This way we are duplicating the second line here in every upper-level 
> class, while saving the admin interface. I actually do not know which one is 
> better..

Yes, but you will also need typed response. Let me continue your example:

AlterTopicRequest alterTopicRequest = // create the request
ClientRequest request = new ClientRequest(new RequestSend(...) ...)
__ClientResponse response = dispatcher.sendRequest(request, 
ApiKeys.ALTER_TOPIC)__
__AlterTopicResponse alterTopicResponse = new 
AlterTopicResponse(response.responseBody())__
alterTopicResponse.// now get what you need from typed response

And you will have this NetworkClient related Stuff (RequestSend, ClientRequest 
...) everywhere in you client code. But it looks pretty strange you can't have 
generic method to send request and get immidiately response of the required 
type.

So really RequestDispatcher allready has sendRequest() as you suggest, with 
sendAdminRequest I tried to address issue with getting response counterpart. 
But I agree that solution might mislead people, so if doesn't worth it - I'm 
okay to remove intermediate AbstractAdminRequest/Response.


- Andrii


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29301/#review70790
-----------------------------------------------------------


On Jan. 14, 2015, 4:07 p.m., Andrii Biletskyi wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/29301/
> -----------------------------------------------------------
> 
> (Updated Jan. 14, 2015, 4:07 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1694
>     https://issues.apache.org/jira/browse/KAFKA-1694
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1694 - introduced new type for Wire protocol, ported 
> ClusterMetadataResponse to it
> 
> 
> KAFKA-1694 - Split Admin RQ/RP to separate messages
> 
> 
> KAFKA-1694 - Admin commands can be handled only by controller; 
> DeleteTopicCommand NPE fix
> 
> 
> KAFKA-1776 - Ported ConsumerGroupOffsetChecker
> 
> 
> KAFKA-1776 - Ported PreferredReplicaElectionTool and ReassignPartitionsTool 
> to CLI
> 
> 
> KAFKA-1694 - kafka-tools is uploaded on uploadAllArchives
> 
> 
> KAFKA-1694 - ReviewBoard 29301 code review fixes
> 
> 
> KAFKA-1694 - Data for ReassignPartitions and PreferredReplicaLeaderElection 
> is in json string
> 
> 
> KAFKA-1694 - Added logging
> 
> 
> KAFKA-1694 - fixed misprint in schema
> 
> 
> KAFKA-1694 - DescribeTopicCommand supports all flags that TopicCommand does
> 
> 
> KAFKA-1694 - Fixed compile error for new Selector constructor
> 
> 
> KAFKA-1694 - Fixed ConsumerGroupChecker sends DescribeTopicResponse instead 
> of ConsumerGroupOffsetsResponse
> 
> 
> KAFKA-1694 - Introduced AbstractAdminRequest/Response to avoid code 
> duplication for sending admin requests / receiving response.
> 
> 
> Diffs
> -----
> 
>   bin/kafka.sh PRE-CREATION 
>   bin/windows/kafka.bat PRE-CREATION 
>   build.gradle c9ac43378c3bf5443f0f47c8ba76067237ecb348 
>   clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java 
> 109fc965e09b2ed186a073351bd037ac8af20a4c 
>   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
> 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
>   clients/src/main/java/org/apache/kafka/common/protocol/types/MaybeOf.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
> 121e880a941fcd3e6392859edba11a94236494cc 
>   
> clients/src/main/java/org/apache/kafka/common/requests/ClusterMetadataRequest.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/ClusterMetadataResponse.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/admin/AbstractAdminRequest.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/admin/AbstractAdminResponse.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/admin/AlterTopicRequest.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/admin/AlterTopicResponse.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/admin/ConsumerGroupOffsetsRequest.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/admin/ConsumerGroupOffsetsResponse.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/admin/CreateTopicRequest.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/admin/CreateTopicResponse.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/admin/DeleteTopicRequest.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/admin/DeleteTopicResponse.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/admin/DescribeTopicOutput.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/admin/DescribeTopicRequest.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/admin/DescribeTopicResponse.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/admin/ListTopicsOutput.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/admin/ListTopicsRequest.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/admin/ListTopicsResponse.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/admin/PreferredReplicaLeaderElectionRequest.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/admin/PreferredReplicaLeaderElectionResponse.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/admin/ReassignPartitionsRequest.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/admin/ReassignPartitionsResponse.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/admin/TopicConfigDetails.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/admin/TopicPartitionDetails.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/admin/VerifyPreferredReplicaLeaderElectionRequest.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/admin/VerifyPreferredReplicaLeaderElectionResponse.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/admin/VerifyReassignPartitionsRequest.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/admin/VerifyReassignPartitionsResponse.java
>  PRE-CREATION 
>   
> clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
>  df37fc6d8f0db0b8192a948426af603be3444da4 
>   config/tools-log4j.properties 52f07c96019b4083fc78f62cfb0a81080327e847 
>   core/src/main/scala/kafka/api/ApiUtils.scala 
> 1f80de1638978901500df808ca5133308c9d1fca 
>   core/src/main/scala/kafka/api/ClusterMetadataRequest.scala PRE-CREATION 
>   core/src/main/scala/kafka/api/ClusterMetadataResponse.scala PRE-CREATION 
>   core/src/main/scala/kafka/api/RequestKeys.scala 
> c24c0345feedc7b9e2e9f40af11bfa1b8d328c43 
>   core/src/main/scala/kafka/api/admin/AlterTopicRequest.scala PRE-CREATION 
>   core/src/main/scala/kafka/api/admin/AlterTopicResponse.scala PRE-CREATION 
>   core/src/main/scala/kafka/api/admin/ConsumerGroupOffsetsRequest.scala 
> PRE-CREATION 
>   core/src/main/scala/kafka/api/admin/ConsumerGroupOffsetsResponse.scala 
> PRE-CREATION 
>   core/src/main/scala/kafka/api/admin/CreateTopicRequest.scala PRE-CREATION 
>   core/src/main/scala/kafka/api/admin/CreateTopicResponse.scala PRE-CREATION 
>   core/src/main/scala/kafka/api/admin/DeleteTopicRequest.scala PRE-CREATION 
>   core/src/main/scala/kafka/api/admin/DeleteTopicResponse.scala PRE-CREATION 
>   core/src/main/scala/kafka/api/admin/DescribeTopicRequest.scala PRE-CREATION 
>   core/src/main/scala/kafka/api/admin/DescribeTopicResponse.scala 
> PRE-CREATION 
>   core/src/main/scala/kafka/api/admin/ListTopicsRequest.scala PRE-CREATION 
>   core/src/main/scala/kafka/api/admin/ListTopicsResponse.scala PRE-CREATION 
>   
> core/src/main/scala/kafka/api/admin/PreferredReplicaLeaderElectionRequest.scala
>  PRE-CREATION 
>   
> core/src/main/scala/kafka/api/admin/PreferredReplicaLeaderElectionResponse.scala
>  PRE-CREATION 
>   core/src/main/scala/kafka/api/admin/ReassignPartitionsRequest.scala 
> PRE-CREATION 
>   core/src/main/scala/kafka/api/admin/ReassignPartitionsResponse.scala 
> PRE-CREATION 
>   
> core/src/main/scala/kafka/api/admin/VerifyPreferredReplicaLeaderElectionRequest.scala
>  PRE-CREATION 
>   
> core/src/main/scala/kafka/api/admin/VerifyPreferredReplicaLeaderElectionResponse.scala
>  PRE-CREATION 
>   core/src/main/scala/kafka/api/admin/VerifyReassignPartitionsRequest.scala 
> PRE-CREATION 
>   core/src/main/scala/kafka/api/admin/VerifyReassignPartitionsResponse.scala 
> PRE-CREATION 
>   core/src/main/scala/kafka/common/AdminRequestFailedException.scala 
> PRE-CREATION 
>   core/src/main/scala/kafka/common/ErrorMapping.scala 
> eedc2f5f21dd8755fba891998456351622e17047 
>   core/src/main/scala/kafka/common/InvalidRequestTargetException.scala 
> PRE-CREATION 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
> eb492f00449744bc8d63f55b393e2a1659d38454 
>   core/src/main/scala/kafka/controller/KafkaController.scala 
> 66df6d2fbdbdd556da6bea0df84f93e0472c8fbf 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> c011a1b79bd6c4e832fe7d097daacb0d647d1cd4 
>   core/src/main/scala/kafka/server/MetadataCache.scala 
> bf81a1ab88c14be8697b441eedbeb28fa0112643 
>   core/src/main/scala/kafka/server/TopicCommandHelper.scala PRE-CREATION 
>   core/src/main/scala/kafka/tools/ConsumerOffsetCheckerHelper.scala 
> PRE-CREATION 
>   core/src/main/scala/kafka/tools/PreferredReplicaLeaderElectionHelper.scala 
> PRE-CREATION 
>   core/src/main/scala/kafka/tools/ReassignPartitionsHelper.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
> cd16ced5465d098be7a60498326b2a98c248f343 
>   settings.gradle 83f764e6a4a15a5fdba232dce74a369870f26b45 
>   tools/src/main/java/org/apache/kafka/cli/BaseCommandOpts.java PRE-CREATION 
>   tools/src/main/java/org/apache/kafka/cli/Boot.java PRE-CREATION 
>   tools/src/main/java/org/apache/kafka/cli/RequestDispatcher.java 
> PRE-CREATION 
>   tools/src/main/java/org/apache/kafka/cli/Shell.java PRE-CREATION 
>   tools/src/main/java/org/apache/kafka/cli/command/AlterTopicCommand.java 
> PRE-CREATION 
>   tools/src/main/java/org/apache/kafka/cli/command/ClearScreenCommand.java 
> PRE-CREATION 
>   tools/src/main/java/org/apache/kafka/cli/command/Command.java PRE-CREATION 
>   tools/src/main/java/org/apache/kafka/cli/command/CreateTopicCommand.java 
> PRE-CREATION 
>   tools/src/main/java/org/apache/kafka/cli/command/DeleteTopicCommand.java 
> PRE-CREATION 
>   tools/src/main/java/org/apache/kafka/cli/command/DescribeTopicCommand.java 
> PRE-CREATION 
>   tools/src/main/java/org/apache/kafka/cli/command/ExitCommand.java 
> PRE-CREATION 
>   tools/src/main/java/org/apache/kafka/cli/command/ListTopicsCommand.java 
> PRE-CREATION 
>   
> tools/src/main/java/org/apache/kafka/cli/command/PreferredReplicaLeaderElectionCommand.java
>  PRE-CREATION 
>   
> tools/src/main/java/org/apache/kafka/cli/command/PrintConsumerGroupOffsetsCommand.java
>  PRE-CREATION 
>   tools/src/main/java/org/apache/kafka/cli/command/PrintHelpCommand.java 
> PRE-CREATION 
>   
> tools/src/main/java/org/apache/kafka/cli/command/ReassignPartitionsCommand.java
>  PRE-CREATION 
>   tools/src/main/java/org/apache/kafka/cli/command/TopicSwitchCommand.java 
> PRE-CREATION 
>   tools/src/main/java/org/apache/kafka/cli/util/StringUtils.java PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/29301/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Andrii Biletskyi
> 
>

Reply via email to