[
https://issues.apache.org/jira/browse/KAFKA-8066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jason Gustafson resolved KAFKA-8066.
------------------------------------
Resolution: Fixed
Fix Version/s: 2.2.1
2.1.2
2.0.2
> ReplicaFetcherThread fails to startup because of failing to register the
> metric.
> --------------------------------------------------------------------------------
>
> Key: KAFKA-8066
> URL: https://issues.apache.org/jira/browse/KAFKA-8066
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 1.1.0, 1.1.1, 1.1.2, 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1,
> 2.0.2, 2.1.2
> Reporter: Zhanxiang (Patrick) Huang
> Assignee: Zhanxiang (Patrick) Huang
> Priority: Major
> Fix For: 2.0.2, 2.1.2, 2.2.1
>
>
> After KAFKA-6051, we close leaderEndPoint in replica fetcher thread
> initiateShutdown to try to preempt in-progress fetch request and accelerate
> repica fetcher thread shutdown. However, the selector may fail to close the
> channel and throw an Exception when the replica fetcher thread is still
> actively fetching. In this case, the sensor will not be cleaned up.
> Basically, if `close(id)` throws an exception in `Selector.close()`, then
> `sensors.close()` will not be called and thus the sensors will not get
> unregistered (See codes below).
> {code:java}
> public void close() {
> List<String> connections = new ArrayList<>(channels.keySet());
> for (String id : connections)
> close(id);
> try {
> this.nioSelector.close();
> } catch (IOException | SecurityException e) {
> log.error("Exception closing nioSelector:", e);
> }
> sensors.close();
> channelBuilder.close();
> }
> {code}
> If this happen, when the broker want to start up the ReplicaFetcherThread
> with the same fetch id to the same destination broker again (e.g. due to
> leadership changes or new partitions get created), the ReplicaFetcherThread
> will fail to start up because the selector will throw an
> IllegalArgumentException if the metric with the same name already exists:
> {noformat}
> 2019/02/27 10:24:26.938 ERROR [KafkaApis] [kafka-request-handler-6]
> [kafka-server] [] [KafkaApi-38031] Error when handling request {}
> java.lang.IllegalArgumentException: A metric named 'MetricName
> [name=connection-count, group=replica-fetcher-metrics, description=The
> current number of active connections., tags={broker-id=29712, fetcher-id=3}]'
> already exists, can't register another one.
> at
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:559)
> ~[kafka-clients-2.0.0.66.jar:?]
> at
> org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:502)
> ~[kafka-clients-2.0.0.66.jar:?]
> at
> org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:485)
> ~[kafka-clients-2.0.0.66.jar:?]
> at
> org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:470)
> ~[kafka-clients-2.0.0.66.jar:?]
> at
> org.apache.kafka.common.network.Selector$SelectorMetrics.<init>(Selector.java:963)
> ~[kafka-clients-2.0.0.66.jar:?]
> at org.apache.kafka.common.network.Selector.<init>(Selector.java:170)
> ~[kafka-clients-2.0.0.66.jar:?]
> at org.apache.kafka.common.network.Selector.<init>(Selector.java:188)
> ~[kafka-clients-2.0.0.66.jar:?]
> at
> kafka.server.ReplicaFetcherBlockingSend.<init>(ReplicaFetcherBlockingSend.scala:61)
> ~[kafka_2.11-2.0.0.66.jar:?]
> at
> kafka.server.ReplicaFetcherThread$$anonfun$1.apply(ReplicaFetcherThread.scala:68)
> ~[kafka_2.11-2.0.0.66.jar:?]
> at
> kafka.server.ReplicaFetcherThread$$anonfun$1.apply(ReplicaFetcherThread.scala:68)
> ~[kafka_2.11-2.0.0.66.jar:?]
> at scala.Option.getOrElse(Option.scala:121)
> ~[scala-library-2.11.12.jar:?]
> at
> kafka.server.ReplicaFetcherThread.<init>(ReplicaFetcherThread.scala:67)
> ~[kafka_2.11-2.0.0.66.jar:?]
> at
> kafka.server.ReplicaFetcherManager.createFetcherThread(ReplicaFetcherManager.scala:32)
> ~[kafka_2.11-2.0.0.66.jar:?]
> at
> kafka.server.AbstractFetcherManager.kafka$server$AbstractFetcherManager$$addAndStartFetcherThread$1(AbstractFetcherManager.scala:132)
> ~[kafka_2.11-2.0.0.66.jar:?]
> at
> kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:146)
> ~[kafka_2.11-2.0.0.66.jar:?]
> at
> kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:137)
> ~[kafka_2.11-2.0.0.66.jar:?]
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
> ~[scala-library-2.11.12.jar:?]
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
> ~[scala-library-2.11.12.jar:?]
> at
> kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:137)
> ~[kafka_2.11-2.0.0.66.jar:?]
> at
> kafka.server.ReplicaManager.makeFollowers(ReplicaManager.scala:1333)
> ~[kafka_2.11-2.0.0.66.jar:?]
> at
> kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1107)
> ~[kafka_2.11-2.0.0.66.jar:?]
> at
> kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:194)
> ~[kafka_2.11-2.0.0.66.jar:?]
> at kafka.server.KafkaApis.handle(KafkaApis.scala:110)
> ~[kafka_2.11-2.0.0.66.jar:?]
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
> ~[kafka_2.11-2.0.0.66.jar:?]
> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
> {noformat}
> The fix should be adding a try-finally block for selector.close() to make
> sure sensors.close() will be called even an exception is thrown.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)