Zhanxiang (Patrick) Huang created KAFKA-8066:
------------------------------------------------

             Summary: ReplicaFetcherThread fails to startup because of failing 
to register the metric.
                 Key: KAFKA-8066
                 URL: https://issues.apache.org/jira/browse/KAFKA-8066
             Project: Kafka
          Issue Type: Bug
            Reporter: Zhanxiang (Patrick) Huang
            Assignee: Zhanxiang (Patrick) Huang


After KAFKA-6051, we close leaderEndPoint in replica fetcher thread 
initiateShutdown to try to preempt in-progress fetch request and accelerate 
repica fetcher thread shutdown. However, the selector may fail to close the 
channel and throw an Exception when the replica fetcher thread is still 
actively fetching. In this case, the sensor will not be cleaned up. Basically, 
if `close(id)` throws an exception in `Selector.close()`, then 
`sensors.close()` will not be called and thus the sensors will not get 
unregistered (See codes below).
{code:java}
    public void close() {
        List<String> connections = new ArrayList<>(channels.keySet());
        for (String id : connections)
            close(id);
        try {
            this.nioSelector.close();
        } catch (IOException | SecurityException e) {
            log.error("Exception closing nioSelector:", e);
        }
        sensors.close();
        channelBuilder.close();
    }
{code}

If this happen, when the broker want to start up the ReplicaFetcherThread with 
the same fetch id to the same destination broker again (e.g. due to leadership 
changes or new partitions get created), the ReplicaFetcherThread will fail to 
start up because the selector will throw an IllegalArgumentException if the 
metric with the same name already exists:

{noformat}
2019/02/27 10:24:26.938 ERROR [KafkaApis] [kafka-request-handler-6] 
[kafka-server] [] [KafkaApi-38031] Error when handling request {}
java.lang.IllegalArgumentException: A metric named 'MetricName 
[name=connection-count, group=replica-fetcher-metrics, description=The current 
number of active connections., tags={broker-id=29712, fetcher-id=3}]' already 
exists, can't register another one.
        at 
org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:559) 
~[kafka-clients-2.0.0.66.jar:?]
        at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:502) 
~[kafka-clients-2.0.0.66.jar:?]
        at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:485) 
~[kafka-clients-2.0.0.66.jar:?]
        at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:470) 
~[kafka-clients-2.0.0.66.jar:?]
        at 
org.apache.kafka.common.network.Selector$SelectorMetrics.<init>(Selector.java:963)
 ~[kafka-clients-2.0.0.66.jar:?]
        at org.apache.kafka.common.network.Selector.<init>(Selector.java:170) 
~[kafka-clients-2.0.0.66.jar:?]
        at org.apache.kafka.common.network.Selector.<init>(Selector.java:188) 
~[kafka-clients-2.0.0.66.jar:?]
        at 
kafka.server.ReplicaFetcherBlockingSend.<init>(ReplicaFetcherBlockingSend.scala:61)
 ~[kafka_2.11-2.0.0.66.jar:?]
        at 
kafka.server.ReplicaFetcherThread$$anonfun$1.apply(ReplicaFetcherThread.scala:68)
 ~[kafka_2.11-2.0.0.66.jar:?]
        at 
kafka.server.ReplicaFetcherThread$$anonfun$1.apply(ReplicaFetcherThread.scala:68)
 ~[kafka_2.11-2.0.0.66.jar:?]
        at scala.Option.getOrElse(Option.scala:121) 
~[scala-library-2.11.12.jar:?]
        at 
kafka.server.ReplicaFetcherThread.<init>(ReplicaFetcherThread.scala:67) 
~[kafka_2.11-2.0.0.66.jar:?]
        at 
kafka.server.ReplicaFetcherManager.createFetcherThread(ReplicaFetcherManager.scala:32)
 ~[kafka_2.11-2.0.0.66.jar:?]
        at 
kafka.server.AbstractFetcherManager.kafka$server$AbstractFetcherManager$$addAndStartFetcherThread$1(AbstractFetcherManager.scala:132)
 ~[kafka_2.11-2.0.0.66.jar:?]
        at 
kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:146)
 ~[kafka_2.11-2.0.0.66.jar:?]
        at 
kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:137)
 ~[kafka_2.11-2.0.0.66.jar:?]
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
 ~[scala-library-2.11.12.jar:?]
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:116) 
~[scala-library-2.11.12.jar:?]
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) 
~[scala-library-2.11.12.jar:?]
        at 
kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:137)
 ~[kafka_2.11-2.0.0.66.jar:?]
        at kafka.server.ReplicaManager.makeFollowers(ReplicaManager.scala:1333) 
~[kafka_2.11-2.0.0.66.jar:?]
        at 
kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1107) 
~[kafka_2.11-2.0.0.66.jar:?]
        at 
kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:194) 
~[kafka_2.11-2.0.0.66.jar:?]
        at kafka.server.KafkaApis.handle(KafkaApis.scala:110) 
~[kafka_2.11-2.0.0.66.jar:?]
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) 
~[kafka_2.11-2.0.0.66.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]

{noformat}


The fix should be adding a try-finally block for selector.close() to make sure 
sensors.close() will be called even an exception is thrown.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to