[ 
https://issues.apache.org/jira/browse/KAFKA-1910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14336985#comment-14336985
 ] 

Guozhang Wang edited comment on KAFKA-1910 at 3/3/15 12:45 AM:
---------------------------------------------------------------

The uploaded patch contains multiple fixes to the related JIRAs as well as 
refactoring the new consumer itself. I will summarize them here instead of in 
the RB:

1. Fix ConsumerTest.testXXXwithBrokerFailure: in RestartDeadBroker we need to 
call startup() on the old brokers instead of creating new ones as the last 
approach will case the metadata to be mess up and cause the test to hang 
(KAFKA-1948). Also make sure the "test" topic is created with correct 
replication factor to avoid hanging when the only replica broker was shutdown. 
Also make the bouncing of the brokers in the background thread so that it will 
eventually be restarted.

2. Fix ConsumerTest's __consumer_offsets topic: when we call partitionFor() the 
__consumer_offsets topic may be created with replication as 
min(offsetTopicRaplicationFactor, aliveBrokers.size), see KAFKA-1864 for 
details (KAFKA-1975). 

3. Add the IllegalGeneration logic in the coordinator as it is important for 
consumers rebalancing after rediscovering the coordinator, in the current stub 
it always return OK and hence consumers migrating to the new coordinator will 
not trigger rebalance (KAFKA-1964).

4. Create the Coodinator and the FetchManager modules as KafkaConsumer 
internals. Coordinator is responsible for assign partitions (join groups), 
commit offsets and fetch offsets from coordinator, and FetchManager is 
responsible for handling fetch request / responses.

4.1 After the refactoring it is easier to detect and fix a bug where response 
callbacks being triggered multiple times, causing the coordinator NPE 
(KAFKA-1969).

4.2 Avoid always trying to fetch offsets from coordinator whenever the consumer 
decides to update fetch positions, introduce a few new variables / APIs in 
SubscriptionState accordingly.

4.3 Move serializer / de-serializer configs / constructors to AbstractConfig.

4.4 Add missing error handling in commit offset / heartbeat responses. In 
general I think we should make notes about possible error codes in each of the 
response type to help coding error handling logic, has filed KAFKA-1985 for 
that.


was (Author: guozhang):
The uploaded patch contains multiple fixes to the related JIRAs as well as 
refactoring the new consumer itself. I will summarize them here instead of in 
the RB:

1. Fix ConsumerTest.testXXXwithBrokerFailure: in RestartDeadBroker we need to 
call startup() on the old brokers instead of creating new ones as the last 
approach will case the metadata to be mess up and cause the test to hang 
(KAFKA-1948). Also make sure the "test" topic is created with correct 
replication factor to avoid hanging when the only replica broker was shutdown.

2. Fix ConsumerTest's __consumer_offsets topic: when we call partitionFor() the 
__consumer_offsets topic may be created with replication as 
min(offsetTopicRaplicationFactor, aliveBrokers.size), see KAFKA-1864 for 
details (KAFKA-1975). 

3. Add the IllegalGeneration logic in the coordinator as it is important for 
consumers rebalancing after rediscovering the coordinator, in the current stub 
it always return OK and hence consumers migrating to the new coordinator will 
not trigger rebalance (KAFKA-1964).

4. Create the Coodinator and the FetchManager modules as KafkaConsumer 
internals. Coordinator is responsible for assign partitions (join groups), 
commit offsets and fetch offsets from coordinator, and FetchManager is 
responsible for handling fetch request / responses.

4.1 After the refactoring it is easier to detect and fix a bug where response 
callbacks being triggered multiple times, causing the coordinator NPE 
(KAFKA-1969).

4.2 Avoid always trying to fetch offsets from coordinator whenever the consumer 
decides to update fetch positions, introduce a few new variables / APIs in 
SubscriptionState accordingly.

4.3 Move serializer / de-serializer configs / constructors to AbstractConfig.

4.4 Add missing error handling in commit offset / heartbeat responses. In 
general I think we should make notes about possible error codes in each of the 
response type to help coding error handling logic, has filed KAFKA-1985 for 
that.

> Refactor KafkaConsumer
> ----------------------
>
>                 Key: KAFKA-1910
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1910
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: consumer
>            Reporter: Guozhang Wang
>            Assignee: Guozhang Wang
>
> KafkaConsumer now contains all the logic on the consumer side, making it a 
> very huge class file, better re-factoring it to have multiple layers on top 
> of KafkaClient.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to