[
https://issues.apache.org/jira/browse/KAFKA-4086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ismael Juma updated KAFKA-4086:
-------------------------------
Labels: consumer reliability (was: consumer)
> long processing consumer restart will stall
> -------------------------------------------
>
> Key: KAFKA-4086
> URL: https://issues.apache.org/jira/browse/KAFKA-4086
> Project: Kafka
> Issue Type: Bug
> Components: consumer
> Affects Versions: 0.10.0.0
> Reporter: Dale Jin
> Labels: consumer, reliability
>
> [~hachikuji]
> We have a long processing consumer. Whenever a new consumer tries to join the
> group while the long processing consumer is processing, the new consumer will
> stall.
> If we kill the long processing consumer and restart it again, it will stall
> both consumers.
> When we kill the long processing consumer, that consumer tries to issue a
> leaveGroup command but it will fail seemingly due to the client request
> timeout.
> When we try to start the long processing consumer again, it seems to be
> sending topic metadata to the broker then the subsequent join group request
> is issued and returning a future but when I check the server log I don't see
> the corresponding request in kafka-request.log.
> When we construct the consumer, we use the following code (based on
> kafka-python library):
> {code}
> self.consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers,
> value_deserializer=deserializer,
> group_id=self.user_defined_sub_name,
> heartbeat_interval_ms=10000,
> session_timeout_ms=300000,
> enable_auto_commit=False)
> {code}
> on the server side, we use 0.10.0.0 with default settings.
> looks like a `RebalanceInProgressError` is thrown
> {code}
> 2016-08-22 20:39:08,984 - kafka.coordinator - INFO - Discovered coordinator
> 100 for group v1.user.queue
> 2016-08-22 20:39:08,984 - kafka.coordinator.consumer - INFO - Revoking
> previously assigned partitions set() for group v1.user.queue
> 2016-08-22 20:39:08,990 - kafka.cluster - DEBUG - Updated cluster metadata to
> ClusterMetadata(brokers: 1, topics: 1, groups: 1)
> 2016-08-22 20:39:08,990 - kafka.coordinator - INFO - (Re-)joining group
> v1.user.queue
> 2016-08-22 20:39:08,990 - kafka.coordinator - DEBUG - Sending JoinGroup
> (JoinGroupRequest_v0(group='v1.user.queue', session_timeout=300000,
> member_id='', protocol_type='consumer',
> group_protocols=[(protocol_name='range',
> protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00'),
> (protocol_name='roundrobin',
> protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00')]))
> to coordinator 100
> 2016-08-22 20:39:08,991 - kafka.conn - DEBUG - <BrokerConnection
> host=10.128.64.81/10.128.64.81 port=9092> Request 5:
> JoinGroupRequest_v0(group='v1.user.queue', session_timeout=300000,
> member_id='', protocol_type='consumer',
> group_protocols=[(protocol_name='range',
> protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00'),
> (protocol_name='roundrobin',
> protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00')])
> 2016-08-22 20:43:04,576 - kafka.conn - WARNING - <BrokerConnection
> host=10.128.64.81/10.128.64.81 port=9092> timed out after 40000 ms. Closing
> connection.
> 2016-08-22 20:43:04,576 - kafka.client - WARNING - Node 100 connection failed
> – refreshing metadata
> 2016-08-22 20:43:04,576 - kafka.coordinator - ERROR - Error sending
> JoinGroupRequest_v0 to node 100 [Error 7 RequestTimedOutError: Request timed
> out after 40000 ms]
> 2016-08-22 20:43:04,576 - kafka.coordinator - WARNING - Marking the
> coordinator dead (node 100) for group v1.user.queue: None.
> 2016-08-22 20:43:04,678 - kafka.coordinator - DEBUG - Sending group
> coordinator request for group v1.user.queue to broker 100
> {code}
> fyi, we turned on the following in log4j:
> {code}
> log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
> log4j.additivity.kafka.server.KafkaApis=true
> log4j.logger.kafka.request.logger=TRACE, requestAppender
> log4j.additivity.kafka.request.logger=true
> log4j.logger.kafka.controller=TRACE, controllerAppender
> log4j.additivity.kafka.controller=true
> log4j.logger.state.change.logger=TRACE, stateChangeAppender
> log4j.additivity.state.change.logger=true
> {code}
> Please let us know how we can proceed forward to find out the root cause.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)