Rajini Sivaram created KAFKA-7576: ------------------------------------- Summary: Dynamic update of replica fetcher threads may fail to start fetchers Key: KAFKA-7576 URL: https://issues.apache.org/jira/browse/KAFKA-7576 Project: Kafka Issue Type: Bug Components: core Affects Versions: 2.0.0, 1.1.1, 1.0.2, 2.1.0 Reporter: Rajini Sivaram Assignee: Rajini Sivaram
When config update notification of `num.replica.fetchers` is processed, partitions are migrated as necessary to increase or decrease the number of fetcher threads. Existing fetchers are shutdown first before new ones are created.This migration is performed on the thread processing ZK change notification. The shutdown of Selector of existing fetchers is not safe since replica fetcher thread may be processing data at the time using the same Selector. After the failure, another update of the config or broker restart is required to restart the replica fetchers. Exception stack trace: {code:java} java.lang.IllegalArgumentException at java.nio.Buffer.position(Buffer.java:244) at sun.nio.ch.IOUtil.write(IOUtil.java:68) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:210) at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:160) at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:718) at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:70) at org.apache.kafka.common.network.Selector.doClose(Selector.java:748) at org.apache.kafka.common.network.Selector.close(Selector.java:736) at org.apache.kafka.common.network.Selector.close(Selector.java:698) at org.apache.kafka.common.network.Selector.close(Selector.java:314) at org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:533) at kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107) at kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:90) at kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:86) at kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:76) at kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:72) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) at kafka.server.AbstractFetcherManager.migratePartitions$1(AbstractFetcherManager.scala:72) at kafka.server.AbstractFetcherManager.resizeThreadPool(AbstractFetcherManager.scala:88) at kafka.server.DynamicThreadPool.reconfigure(DynamicBrokerConfig.scala:574) at kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410) at kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410) at scala.collection.immutable.List.foreach(List.scala:392) at kafka.server.DynamicBrokerConfig.kafka$server$DynamicBrokerConfig$$updateCurrentConfig(DynamicBrokerConfig.scala:410) <SKIP>kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread.doWork(ZkNodeChangeNotificationListener.scala:135) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)