[ 
https://issues.apache.org/jira/browse/KAFKA-7576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-7576.
-----------------------------------
    Resolution: Fixed
      Reviewer: Jason Gustafson

> Dynamic update of replica fetcher threads may fail to start/close fetchers
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-7576
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7576
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 1.1.1, 2.0.1, 2.1.0
>            Reporter: Rajini Sivaram
>            Assignee: Rajini Sivaram
>            Priority: Major
>             Fix For: 1.1.2, 2.1.1, 2.0.2
>
>
> KAFKA-6051 moved  ReplicaFetcherBlockingSend shutdown earlier in the shutdown 
> sequence of ReplicaFetcherThread. As a result, shutdown of replica fetchers 
> can now throw an exception because Selector may be closed on a different 
> thread while data is being written on another thread. KAFKA-7464 changed this 
> behaviour for 2.0.1 and 2.1.0. The exception during close() is now logged and 
> not propagated to avoid exceptions during broker shutdown.
> When config update notification of `num.replica.fetchers` is processed, 
> partitions are migrated as necessary to increase or decrease the number of 
> fetcher threads. Existing fetchers are shutdown first before new ones are 
> created.This migration is performed on the thread processing ZK change 
> notification. The shutdown of Selector of existing fetchers is not safe since 
> replica fetcher thread may be processing data at the time using the same 
> Selector.
> Without the fix from KAFKA-7464, another update of the config or broker 
> restart is required to restart the replica fetchers after dynamic config 
> update if shutdown encounters an exception.
> Exception stack trace:
> {code:java}
> java.lang.IllegalArgumentException
>         at java.nio.Buffer.position(Buffer.java:244)
>         at sun.nio.ch.IOUtil.write(IOUtil.java:68)
>         at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
>         at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:210)
>         at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:160)
>         at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:718)
>         at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:70)
>         at org.apache.kafka.common.network.Selector.doClose(Selector.java:748)
>         at org.apache.kafka.common.network.Selector.close(Selector.java:736)
>         at org.apache.kafka.common.network.Selector.close(Selector.java:698)
>         at org.apache.kafka.common.network.Selector.close(Selector.java:314)
>         at 
> org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:533)
>         at 
> kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107)
>         at 
> kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:90)
>         at 
> kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:86)
>         at 
> kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:76)
>         at 
> kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:72)
>         at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
>         at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
>         at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
>         at 
> kafka.server.AbstractFetcherManager.migratePartitions$1(AbstractFetcherManager.scala:72)
>         at 
> kafka.server.AbstractFetcherManager.resizeThreadPool(AbstractFetcherManager.scala:88)
>         at 
> kafka.server.DynamicThreadPool.reconfigure(DynamicBrokerConfig.scala:574)
>         at 
> kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410)
>         at 
> kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410)
>         at scala.collection.immutable.List.foreach(List.scala:392)
>         at 
> kafka.server.DynamicBrokerConfig.kafka$server$DynamicBrokerConfig$$updateCurrentConfig(DynamicBrokerConfig.scala:410)
> <SKIP>kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread.doWork(ZkNodeChangeNotificationListener.scala:135)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> The fix from KAFKA-7464 in 2.0.1 and 2.1.0 avoids the issue with creation of 
> replica fetchers during dynamic update. But even for those branches, we 
> should clean up the Selector to avoid resource leak in the dynamic config 
> update case (discarding the exception may be sufficient when the broker is 
> shutdown).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to