[ https://issues.apache.org/jira/browse/KAFKA-3957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Vincent Fumo resolved KAFKA-3957. --------------------------------- Resolution: Duplicate > consumer timeout not being respected when kafka broker is not available > ----------------------------------------------------------------------- > > Key: KAFKA-3957 > URL: https://issues.apache.org/jira/browse/KAFKA-3957 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.9.0.1 > Reporter: Vincent Fumo > Priority: Minor > > KafkaConsumer v0.9:: > I have a consumer set up with session.timeout.ms set to 30s. I make a call > like > consumer.poll(10000) > but if the kafka broker is down, that call will hang indefinitely. > Digging into the code it seems that the timeout isn't respected: > KafkaConsumer calls out to pollOnce() as seen below:: > private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long > timeout) { > // TODO: Sub-requests should take into account the poll timeout > (KAFKA-1894) > coordinator.ensureCoordinatorKnown(); > // ensure we have partitions assigned if we expect to > if (subscriptions.partitionsAutoAssigned()) > coordinator.ensurePartitionAssignment(); > // fetch positions if we have partitions we're subscribed to that we > // don't know the offset for > if (!subscriptions.hasAllFetchPositions()) > updateFetchPositions(this.subscriptions.missingFetchPositions()); > // init any new fetches (won't resend pending fetches) > Cluster cluster = this.metadata.fetch(); > Map<TopicPartition, List<ConsumerRecord<K, V>>> records = > fetcher.fetchedRecords(); > // if data is available already, e.g. from a previous network client > poll() call to commit, > // then just return it immediately > if (!records.isEmpty()) { > return records; > } > fetcher.initFetches(cluster); > client.poll(timeout); > return fetcher.fetchedRecords(); > } > and we see that we stick on the call to coordinator.ensureCoordinatorKnown(); > AbstractCoordinator :: > public void ensureCoordinatorKnown() { > while (coordinatorUnknown()) { > RequestFuture<Void> future = sendGroupMetadataRequest(); > client.poll(future); > if (future.failed()) { > if (future.isRetriable()) > client.awaitMetadataUpdate(); > else > throw future.exception(); > } > } > } > in this case the Future fails (since the broker is down) and then a call to > client.awaitMetadataUpdate() is made which in the case of the > ConsumerNetworkClient will block forever : > public void awaitMetadataUpdate() { > int version = this.metadata.requestUpdate(); > do { > poll(Long.MAX_VALUE); > } while (this.metadata.version() == version); > } > I feel that this is a bug. When you set a timeout on a call to a blocking > method, that timeout should be respected and an exception should be thrown. -- This message was sent by Atlassian JIRA (v6.3.4#6332)