Dale Jin created KAFKA-4086:
-------------------------------

             Summary: long processing consumer restart will stall
                 Key: KAFKA-4086
                 URL: https://issues.apache.org/jira/browse/KAFKA-4086
             Project: Kafka
          Issue Type: Bug
          Components: consumer
    Affects Versions: 0.10.0.0
            Reporter: Dale Jin


[~hachikuji]
We have a long processing consumer. Whenever a new consumer tries to join the 
group while the long processing consumer is processing, the new consumer will 
stall.
If we kill the long processing consumer and restart it again, it will stall 
both consumers.
When we kill the long processing consumer, that consumer tries to issue a 
leaveGroup command but it will fail seemingly due to the client request timeout.
When we try to start the long processing consumer again, it seems to be sending 
topic metadata to the broker then the subsequent join group request is issued 
and returning a future but when I check the server log I don't see the 
corresponding request in kafka-request.log.
When we construct the consumer, we use the following code (based on 
kafka-python library):
{code}
        self.consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers,
                                      value_deserializer=deserializer,
                                      group_id=self.user_defined_sub_name,
                                      heartbeat_interval_ms=10000,
                                      session_timeout_ms=300000,
                                      enable_auto_commit=False)
{code}
on the server side, we use 0.10.0.0 with default settings.
looks like a `RebalanceInProgressError` is thrown
{code}
2016-08-22 20:39:08,984 - kafka.coordinator - INFO - Discovered coordinator 100 
for group v1.user.queue
2016-08-22 20:39:08,984 - kafka.coordinator.consumer - INFO - Revoking 
previously assigned partitions set() for group v1.user.queue
2016-08-22 20:39:08,990 - kafka.cluster - DEBUG - Updated cluster metadata to 
ClusterMetadata(brokers: 1, topics: 1, groups: 1)
2016-08-22 20:39:08,990 - kafka.coordinator - INFO - (Re-)joining group 
v1.user.queue
2016-08-22 20:39:08,990 - kafka.coordinator - DEBUG - Sending JoinGroup 
(JoinGroupRequest_v0(group='v1.user.queue', session_timeout=300000, 
member_id='', protocol_type='consumer', 
group_protocols=[(protocol_name='range', 
protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00'),
 (protocol_name='roundrobin', 
protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00')]))
 to coordinator 100
2016-08-22 20:39:08,991 - kafka.conn - DEBUG - <BrokerConnection 
host=10.128.64.81/10.128.64.81 port=9092> Request 5: 
JoinGroupRequest_v0(group='v1.user.queue', session_timeout=300000, 
member_id='', protocol_type='consumer', 
group_protocols=[(protocol_name='range', 
protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00'),
 (protocol_name='roundrobin', 
protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00')])
2016-08-22 20:43:04,576 - kafka.conn - WARNING - <BrokerConnection 
host=10.128.64.81/10.128.64.81 port=9092> timed out after 40000 ms. Closing 
connection.
2016-08-22 20:43:04,576 - kafka.client - WARNING - Node 100 connection failed – 
refreshing metadata
2016-08-22 20:43:04,576 - kafka.coordinator - ERROR - Error sending 
JoinGroupRequest_v0 to node 100 [Error 7 RequestTimedOutError: Request timed 
out after 40000 ms]
2016-08-22 20:43:04,576 - kafka.coordinator - WARNING - Marking the coordinator 
dead (node 100) for group v1.user.queue: None.
2016-08-22 20:43:04,678 - kafka.coordinator - DEBUG - Sending group coordinator 
request for group v1.user.queue to broker 100
{code}
fyi, we turned on the following in log4j:
{code}
log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
log4j.additivity.kafka.server.KafkaApis=true
log4j.logger.kafka.request.logger=TRACE, requestAppender
log4j.additivity.kafka.request.logger=true
log4j.logger.kafka.controller=TRACE, controllerAppender
log4j.additivity.kafka.controller=true
log4j.logger.state.change.logger=TRACE, stateChangeAppender
log4j.additivity.state.change.logger=true
{code}
Please let us know how we can proceed forward to find out the root cause.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to