[ https://issues.apache.org/jira/browse/KAFKA-7464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dong Lin updated KAFKA-7464: ---------------------------- Comment: was deleted (was: Moving this to 2.2.0 since the PR is not ready.) > Fail to shutdown ReplicaManager during broker cleaned shutdown > -------------------------------------------------------------- > > Key: KAFKA-7464 > URL: https://issues.apache.org/jira/browse/KAFKA-7464 > Project: Kafka > Issue Type: Bug > Affects Versions: 2.0.0 > Reporter: Zhanxiang (Patrick) Huang > Assignee: Zhanxiang (Patrick) Huang > Priority: Critical > Fix For: 2.0.1, 2.1.0 > > > In 2.0 deployment, we saw the following log when shutting down the > ReplicaManager in broker cleaned shutdown: > {noformat} > 2018/09/27 08:22:18.699 WARN [CoreUtils$] [Thread-1] [kafka-server] [] null > java.lang.IllegalArgumentException: null > at java.nio.Buffer.position(Buffer.java:244) ~[?:1.8.0_121] > at sun.nio.ch.IOUtil.write(IOUtil.java:68) ~[?:1.8.0_121] > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > ~[?:1.8.0_121] > at > org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:214) > ~[kafka-clients-2.0.0.22.jar:?] > at > org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:164) > ~[kafka-clients-2.0.0.22.jar:?] > at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:806) > ~[kafka-clients-2.0.0.22.jar:?] > at > org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:107) > ~[kafka-clients-2.0.0.22.jar:?] > at > org.apache.kafka.common.network.Selector.doClose(Selector.java:751) > ~[kafka-clients-2.0.0.22.jar:?] > at org.apache.kafka.common.network.Selector.close(Selector.java:739) > ~[kafka-clients-2.0.0.22.jar:?] > at org.apache.kafka.common.network.Selector.close(Selector.java:701) > ~[kafka-clients-2.0.0.22.jar:?] > at org.apache.kafka.common.network.Selector.close(Selector.java:315) > ~[kafka-clients-2.0.0.22.jar:?] > at > org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:595) > ~[kafka-clients-2.0.0.22.jar:?] > at > kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:108) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:183) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:182) > ~[kafka_2.11-2.0.0.22.jar:?] > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) > ~[scala-library-2.11.12.jar:?] > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) > ~[scala-library-2.11.12.jar:?] > at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) > ~[scala-library-2.11.12.jar:?] > at > kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:182) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.ReplicaFetcherManager.shutdown(ReplicaFetcherManager.scala:37) > ~[kafka_2.11-2.0.0.22.jar:?] > at kafka.server.ReplicaManager.shutdown(ReplicaManager.scala:1471) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.KafkaServer$$anonfun$shutdown$12.apply$mcV$sp(KafkaServer.scala:616) > ~[kafka_2.11-2.0.0.22.jar:?] > at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86) > ~[kafka_2.11-2.0.0.22.jar:?] > at kafka.server.KafkaServer.shutdown(KafkaServer.scala:616) > ~[kafka_2.11-2.0.0.22.jar:?] > {noformat} > After that, we noticed that some of the replica fetcher thread fail to > shutdown: > {noformat} > 2018/09/27 08:22:46.176 ERROR [LogDirFailureChannel] > [ReplicaFetcherThread-26-13085] [kafka-server] [] Error while rolling log > segment for video-social-gestures-30 in dir /export/content/kafka/i001_caches > java.nio.channels.ClosedChannelException: null > at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110) > ~[?:1.8.0_121] > at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300) > ~[?:1.8.0_121] > at > org.apache.kafka.common.record.FileRecords.truncateTo(FileRecords.java:244) > ~[kafka-clients-2.0.0.22.jar:?] > at > org.apache.kafka.common.record.FileRecords.trim(FileRecords.java:206) > ~[kafka-clients-2.0.0.22.jar:?] > at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:512) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1493) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1493) > ~[kafka_2.11-2.0.0.22.jar:?] > at scala.Option.foreach(Option.scala:257) > ~[scala-library-2.11.12.jar:?] > at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1493) > ~[kafka_2.11-2.0.0.22.jar:?] > at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1479) > ~[kafka_2.11-2.0.0.22.jar:?] > at kafka.log.Log.maybeHandleIOException(Log.scala:1856) > ~[kafka_2.11-2.0.0.22.jar:?] > at kafka.log.Log.roll(Log.scala:1479) ~[kafka_2.11-2.0.0.22.jar:?] > at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1465) > ~[kafka_2.11-2.0.0.22.jar:?] > at kafka.log.Log$$anonfun$append$2.apply(Log.scala:868) > ~[kafka_2.11-2.0.0.22.jar:?] > at kafka.log.Log$$anonfun$append$2.apply(Log.scala:762) > ~[kafka_2.11-2.0.0.22.jar:?] > at kafka.log.Log.maybeHandleIOException(Log.scala:1856) > ~[kafka_2.11-2.0.0.22.jar:?] > at kafka.log.Log.append(Log.scala:762) ~[kafka_2.11-2.0.0.22.jar:?] > at kafka.log.Log.appendAsFollower(Log.scala:743) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.cluster.Partition$$anonfun$doAppendRecordsToFollowerOrFutureReplica$1.apply(Partition.scala:601) > ~[kafka_2.11-2.0.0.22.jar:?] > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) > ~[kafka_2.11-2.0.0.22.jar:?] > at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica(Partition.scala:588) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica(Partition.scala:608) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:130) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:43) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:188) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:174) > ~[kafka_2.11-2.0.0.22.jar:?] > at scala.Option.foreach(Option.scala:257) > ~[scala-library-2.11.12.jar:?] > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:174) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:171) > ~[kafka_2.11-2.0.0.22.jar:?] > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > ~[scala-library-2.11.12.jar:?] > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > ~[scala-library-2.11.12.jar:?] > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:171) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171) > ~[kafka_2.11-2.0.0.22.jar:?] > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:169) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:115) > ~[kafka_2.11-2.0.0.22.jar:?] > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > ~[kafka_2.11-2.0.0.22.jar:?]{noformat} > Worse more, we found out that if there is a exception thrown in > ReplicaFetcherManager shutdown, we basically will skip purgatory shutdown and > HW checkpoint and in our case we didn't see the "Shut down completely" log: > {code:java} > def shutdown(checkpointHW: Boolean = true) { > info("Shutting down") > removeMetrics() > if (logDirFailureHandler != null) > logDirFailureHandler.shutdown() > replicaFetcherManager.shutdown() > replicaAlterLogDirsManager.shutdown() > delayedFetchPurgatory.shutdown() > delayedProducePurgatory.shutdown() > delayedDeleteRecordsPurgatory.shutdown() > if (checkpointHW) > checkpointHighWatermarks() > info("Shut down completely") > } > {code} > The reason why we see this is that after KAFKA-6051, we close leaderEndPoint > in replica fetcher thread initiateShutdown to try to preempt in-progress > fetch request and accelerate repica fetcher thread shutdown. However, > leaderEndpoint can throw an Exception when the replica fetcher thread is > still actively fetching. > > I am wondering whether we should try to catch the exception thrown in > "leaderEndpoint.close()" instead of letting it throw up in the call stack. In > my opinion, it is safe to do so because ReplicaFetcherThread.initiateShutdown > will be called when: > # Server shutdown – In this case we will shut down the process anyway so > even though we fail to close leader enpoint cleanly there is no harm. > # shutdownIdleFetcherThread – In this case the fetcher thread is idle and we > will not use it again anyway so there is no harm either. -- This message was sent by Atlassian JIRA (v7.6.3#76005)