Rajini Sivaram created KAFKA-7576:
-------------------------------------
Summary: Dynamic update of replica fetcher threads may fail to
start fetchers
Key: KAFKA-7576
URL: https://issues.apache.org/jira/browse/KAFKA-7576
Project: Kafka
Issue Type: Bug
Components: core
Affects Versions: 2.0.0, 1.1.1, 1.0.2, 2.1.0
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
When config update notification of `num.replica.fetchers` is processed,
partitions are migrated as necessary to increase or decrease the number of
fetcher threads. Existing fetchers are shutdown first before new ones are
created.This migration is performed on the thread processing ZK change
notification. The shutdown of Selector of existing fetchers is not safe since
replica fetcher thread may be processing data at the time using the same
Selector.
After the failure, another update of the config or broker restart is required
to restart the replica fetchers.
Exception stack trace:
{code:java}
java.lang.IllegalArgumentException
at java.nio.Buffer.position(Buffer.java:244)
at sun.nio.ch.IOUtil.write(IOUtil.java:68)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at
org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:210)
at
org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:160)
at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:718)
at
org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:70)
at org.apache.kafka.common.network.Selector.doClose(Selector.java:748)
at org.apache.kafka.common.network.Selector.close(Selector.java:736)
at org.apache.kafka.common.network.Selector.close(Selector.java:698)
at org.apache.kafka.common.network.Selector.close(Selector.java:314)
at org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:533)
at
kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107)
at
kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:90)
at
kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:86)
at
kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:76)
at
kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:72)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
at
kafka.server.AbstractFetcherManager.migratePartitions$1(AbstractFetcherManager.scala:72)
at
kafka.server.AbstractFetcherManager.resizeThreadPool(AbstractFetcherManager.scala:88)
at
kafka.server.DynamicThreadPool.reconfigure(DynamicBrokerConfig.scala:574)
at
kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410)
at
kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410)
at scala.collection.immutable.List.foreach(List.scala:392)
at
kafka.server.DynamicBrokerConfig.kafka$server$DynamicBrokerConfig$$updateCurrentConfig(DynamicBrokerConfig.scala:410)
<SKIP>kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread.doWork(ZkNodeChangeNotificationListener.scala:135)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)