Dale Jin created KAFKA-4086:
-------------------------------
Summary: long processing consumer restart will stall
Key: KAFKA-4086
URL: https://issues.apache.org/jira/browse/KAFKA-4086
Project: Kafka
Issue Type: Bug
Components: consumer
Affects Versions: 0.10.0.0
Reporter: Dale Jin
[~hachikuji]
We have a long processing consumer. Whenever a new consumer tries to join the
group while the long processing consumer is processing, the new consumer will
stall.
If we kill the long processing consumer and restart it again, it will stall
both consumers.
When we kill the long processing consumer, that consumer tries to issue a
leaveGroup command but it will fail seemingly due to the client request timeout.
When we try to start the long processing consumer again, it seems to be sending
topic metadata to the broker then the subsequent join group request is issued
and returning a future but when I check the server log I don't see the
corresponding request in kafka-request.log.
When we construct the consumer, we use the following code (based on
kafka-python library):
{code}
self.consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers,
value_deserializer=deserializer,
group_id=self.user_defined_sub_name,
heartbeat_interval_ms=10000,
session_timeout_ms=300000,
enable_auto_commit=False)
{code}
on the server side, we use 0.10.0.0 with default settings.
looks like a `RebalanceInProgressError` is thrown
{code}
2016-08-22 20:39:08,984 - kafka.coordinator - INFO - Discovered coordinator 100
for group v1.user.queue
2016-08-22 20:39:08,984 - kafka.coordinator.consumer - INFO - Revoking
previously assigned partitions set() for group v1.user.queue
2016-08-22 20:39:08,990 - kafka.cluster - DEBUG - Updated cluster metadata to
ClusterMetadata(brokers: 1, topics: 1, groups: 1)
2016-08-22 20:39:08,990 - kafka.coordinator - INFO - (Re-)joining group
v1.user.queue
2016-08-22 20:39:08,990 - kafka.coordinator - DEBUG - Sending JoinGroup
(JoinGroupRequest_v0(group='v1.user.queue', session_timeout=300000,
member_id='', protocol_type='consumer',
group_protocols=[(protocol_name='range',
protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00'),
(protocol_name='roundrobin',
protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00')]))
to coordinator 100
2016-08-22 20:39:08,991 - kafka.conn - DEBUG - <BrokerConnection
host=10.128.64.81/10.128.64.81 port=9092> Request 5:
JoinGroupRequest_v0(group='v1.user.queue', session_timeout=300000,
member_id='', protocol_type='consumer',
group_protocols=[(protocol_name='range',
protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00'),
(protocol_name='roundrobin',
protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00')])
2016-08-22 20:43:04,576 - kafka.conn - WARNING - <BrokerConnection
host=10.128.64.81/10.128.64.81 port=9092> timed out after 40000 ms. Closing
connection.
2016-08-22 20:43:04,576 - kafka.client - WARNING - Node 100 connection failed –
refreshing metadata
2016-08-22 20:43:04,576 - kafka.coordinator - ERROR - Error sending
JoinGroupRequest_v0 to node 100 [Error 7 RequestTimedOutError: Request timed
out after 40000 ms]
2016-08-22 20:43:04,576 - kafka.coordinator - WARNING - Marking the coordinator
dead (node 100) for group v1.user.queue: None.
2016-08-22 20:43:04,678 - kafka.coordinator - DEBUG - Sending group coordinator
request for group v1.user.queue to broker 100
{code}
fyi, we turned on the following in log4j:
{code}
log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
log4j.additivity.kafka.server.KafkaApis=true
log4j.logger.kafka.request.logger=TRACE, requestAppender
log4j.additivity.kafka.request.logger=true
log4j.logger.kafka.controller=TRACE, controllerAppender
log4j.additivity.kafka.controller=true
log4j.logger.state.change.logger=TRACE, stateChangeAppender
log4j.additivity.state.change.logger=true
{code}
Please let us know how we can proceed forward to find out the root cause.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)