Rajini Sivaram created KAFKA-7576:
-------------------------------------

             Summary: Dynamic update of replica fetcher threads may fail to 
start fetchers
                 Key: KAFKA-7576
                 URL: https://issues.apache.org/jira/browse/KAFKA-7576
             Project: Kafka
          Issue Type: Bug
          Components: core
    Affects Versions: 2.0.0, 1.1.1, 1.0.2, 2.1.0
            Reporter: Rajini Sivaram
            Assignee: Rajini Sivaram


When config update notification of `num.replica.fetchers` is processed, 
partitions are migrated as necessary to increase or decrease the number of 
fetcher threads. Existing fetchers are shutdown first before new ones are 
created.This migration is performed on the thread processing ZK change 
notification. The shutdown of Selector of existing fetchers is not safe since 
replica fetcher thread may be processing data at the time using the same 
Selector.

After the failure, another update of the config or broker restart is required 
to restart the replica fetchers.

Exception stack trace:
{code:java}
java.lang.IllegalArgumentException
        at java.nio.Buffer.position(Buffer.java:244)
        at sun.nio.ch.IOUtil.write(IOUtil.java:68)
        at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
        at 
org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:210)
        at 
org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:160)
        at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:718)
        at 
org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:70)
        at org.apache.kafka.common.network.Selector.doClose(Selector.java:748)
        at org.apache.kafka.common.network.Selector.close(Selector.java:736)
        at org.apache.kafka.common.network.Selector.close(Selector.java:698)
        at org.apache.kafka.common.network.Selector.close(Selector.java:314)
        at org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:533)
        at 
kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107)
        at 
kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:90)
        at 
kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:86)
        at 
kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:76)
        at 
kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:72)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
        at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
        at 
kafka.server.AbstractFetcherManager.migratePartitions$1(AbstractFetcherManager.scala:72)
        at 
kafka.server.AbstractFetcherManager.resizeThreadPool(AbstractFetcherManager.scala:88)
        at 
kafka.server.DynamicThreadPool.reconfigure(DynamicBrokerConfig.scala:574)
        at 
kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410)
        at 
kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at 
kafka.server.DynamicBrokerConfig.kafka$server$DynamicBrokerConfig$$updateCurrentConfig(DynamicBrokerConfig.scala:410)
<SKIP>kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread.doWork(ZkNodeChangeNotificationListener.scala:135)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to