Youjun Yuan created FLINK-9630:
----------------------------------
Summary: Kafka09PartitionDiscoverer cause connection leak on
TopicAuthorizationException
Key: FLINK-9630
URL: https://issues.apache.org/jira/browse/FLINK-9630
Project: Flink
Issue Type: Bug
Components: Kafka Connector
Affects Versions: 1.4.2, 1.5.0
Environment: Linux 2.6, java 8, Kafka broker 0.10.x
Reporter: Youjun Yuan
Fix For: 1.5.1
when the Kafka topic got deleted, during task starting process,
Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in
getAllPartitionsForTopics(), and it get no chance to close the kafkaConsumer,
hence resulting TCP connection leak (to Kafka broker).
*this issue can bring down the whole Flink cluster*, because, in a default
setup (fixedDelay with INT.MAX restart attempt), job manager will randomly
schedule the job to any TaskManager that as free slot, and each attemp will
cause the TaskManager to leak a TCP connection, eventually almost every
TaskManager will run out of file handle, hence no taskmanger could make
snaptshot, or accept new job. Effectly stops the whole cluster.
The leak happens when StreamTask.invoke() calls openAllOperators(), then
FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(),
when kafkaConsumer.partitionsFor(topic) in
KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a
*TopicAuthorizationException,* no one catches this.
Though StreamTask.open catches Exception and invoks the dispose() method of
each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), however
it does not close the kakfaConsumer in partitionDiscoverer, not event invoke
the partitionDiscoverer.wakeup(), because the discoveryLoopThread was null.
below the code of FlinkKakfaConsumerBase.cancel() for your convenience
public void cancel() {
// set ourselves as not running;
// this would let the main discovery loop escape as soon as possible
running = false;
if (discoveryLoopThread != null) {
if (partitionDiscoverer != null) {
// we cannot close the discoverer here, as it is error-prone to
concurrent access;
// only wakeup the discoverer, the discovery loop will clean itself
up after it escapes
partitionDiscoverer.wakeup();
}
// the discovery loop may currently be sleeping in-between
// consecutive discoveries; interrupt to shutdown faster
discoveryLoopThread.interrupt();
}
// abort the fetcher, if there is one
if (kafkaFetcher != null) {
kafkaFetcher.cancel();
}
}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)