Zhanxiang (Patrick) Huang created KAFKA-7464:
------------------------------------------------

             Summary: Fail to shutdown ReplicaManager during broker cleaned 
shutdown
                 Key: KAFKA-7464
                 URL: https://issues.apache.org/jira/browse/KAFKA-7464
             Project: Kafka
          Issue Type: Bug
            Reporter: Zhanxiang (Patrick) Huang
            Assignee: Zhanxiang (Patrick) Huang


In 2.0 deployment, we saw the following log when shutting down the 
ReplicaManager in broker cleaned shutdown:
{noformat}
2018/09/27 08:22:18.699 WARN [CoreUtils$] [Thread-1] [kafka-server] [] null
java.lang.IllegalArgumentException: null
        at java.nio.Buffer.position(Buffer.java:244) ~[?:1.8.0_121]
        at sun.nio.ch.IOUtil.write(IOUtil.java:68) ~[?:1.8.0_121]
        at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) 
~[?:1.8.0_121]
        at 
org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:214)
 ~[kafka-clients-2.0.0.22.jar:?]
        at 
org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:164)
 ~[kafka-clients-2.0.0.22.jar:?]
        at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:806) 
~[kafka-clients-2.0.0.22.jar:?]
        at 
org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:107) 
~[kafka-clients-2.0.0.22.jar:?]
        at org.apache.kafka.common.network.Selector.doClose(Selector.java:751) 
~[kafka-clients-2.0.0.22.jar:?]
        at org.apache.kafka.common.network.Selector.close(Selector.java:739) 
~[kafka-clients-2.0.0.22.jar:?]
        at org.apache.kafka.common.network.Selector.close(Selector.java:701) 
~[kafka-clients-2.0.0.22.jar:?]
        at org.apache.kafka.common.network.Selector.close(Selector.java:315) 
~[kafka-clients-2.0.0.22.jar:?]
        at org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:595) 
~[kafka-clients-2.0.0.22.jar:?]
        at 
kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107)
 ~[kafka_2.11-2.0.0.22.jar:?]
        at 
kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:108)
 ~[kafka_2.11-2.0.0.22.jar:?]
        at 
kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:183)
 ~[kafka_2.11-2.0.0.22.jar:?]
        at 
kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:182)
 ~[kafka_2.11-2.0.0.22.jar:?]
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
 ~[scala-library-2.11.12.jar:?]
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) 
~[scala-library-2.11.12.jar:?]
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) 
~[scala-library-2.11.12.jar:?]
        at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) 
~[scala-library-2.11.12.jar:?]
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) 
~[scala-library-2.11.12.jar:?]
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) 
~[scala-library-2.11.12.jar:?]
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) 
~[scala-library-2.11.12.jar:?]
        at 
kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:182)
 ~[kafka_2.11-2.0.0.22.jar:?]
        at 
kafka.server.ReplicaFetcherManager.shutdown(ReplicaFetcherManager.scala:37) 
~[kafka_2.11-2.0.0.22.jar:?]
        at kafka.server.ReplicaManager.shutdown(ReplicaManager.scala:1471) 
~[kafka_2.11-2.0.0.22.jar:?]
        at 
kafka.server.KafkaServer$$anonfun$shutdown$12.apply$mcV$sp(KafkaServer.scala:616)
 ~[kafka_2.11-2.0.0.22.jar:?]
        at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86) 
~[kafka_2.11-2.0.0.22.jar:?]
        at kafka.server.KafkaServer.shutdown(KafkaServer.scala:616) 
~[kafka_2.11-2.0.0.22.jar:?]
{noformat}
After that, we noticed that some of the replica fetcher thread fail to shutdown:
{noformat}
2018/09/27 08:22:46.176 ERROR [LogDirFailureChannel] 
[ReplicaFetcherThread-26-13085] [kafka-server] [] Error while rolling log 
segment for video-social-gestures-30 in dir /export/content/kafka/i001_caches
java.nio.channels.ClosedChannelException: null
        at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110) 
~[?:1.8.0_121]
        at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300) 
~[?:1.8.0_121]
        at 
org.apache.kafka.common.record.FileRecords.truncateTo(FileRecords.java:244) 
~[kafka-clients-2.0.0.22.jar:?]
        at 
org.apache.kafka.common.record.FileRecords.trim(FileRecords.java:206) 
~[kafka-clients-2.0.0.22.jar:?]
        at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:512) 
~[kafka_2.11-2.0.0.22.jar:?]
        at 
kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1493) 
~[kafka_2.11-2.0.0.22.jar:?]
        at 
kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1493) 
~[kafka_2.11-2.0.0.22.jar:?]
        at scala.Option.foreach(Option.scala:257) ~[scala-library-2.11.12.jar:?]
        at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1493) 
~[kafka_2.11-2.0.0.22.jar:?]
        at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1479) 
~[kafka_2.11-2.0.0.22.jar:?]
        at kafka.log.Log.maybeHandleIOException(Log.scala:1856) 
~[kafka_2.11-2.0.0.22.jar:?]
        at kafka.log.Log.roll(Log.scala:1479) ~[kafka_2.11-2.0.0.22.jar:?]
        at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1465) 
~[kafka_2.11-2.0.0.22.jar:?]
        at kafka.log.Log$$anonfun$append$2.apply(Log.scala:868) 
~[kafka_2.11-2.0.0.22.jar:?]
        at kafka.log.Log$$anonfun$append$2.apply(Log.scala:762) 
~[kafka_2.11-2.0.0.22.jar:?]
        at kafka.log.Log.maybeHandleIOException(Log.scala:1856) 
~[kafka_2.11-2.0.0.22.jar:?]
        at kafka.log.Log.append(Log.scala:762) ~[kafka_2.11-2.0.0.22.jar:?]
        at kafka.log.Log.appendAsFollower(Log.scala:743) 
~[kafka_2.11-2.0.0.22.jar:?]
        at 
kafka.cluster.Partition$$anonfun$doAppendRecordsToFollowerOrFutureReplica$1.apply(Partition.scala:601)
 ~[kafka_2.11-2.0.0.22.jar:?]
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) 
~[kafka_2.11-2.0.0.22.jar:?]
        at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257) 
~[kafka_2.11-2.0.0.22.jar:?]
        at 
kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica(Partition.scala:588)
 ~[kafka_2.11-2.0.0.22.jar:?]
        at 
kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica(Partition.scala:608)
 ~[kafka_2.11-2.0.0.22.jar:?]
        at 
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:130)
 ~[kafka_2.11-2.0.0.22.jar:?]
        at 
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:43)
 ~[kafka_2.11-2.0.0.22.jar:?]
        at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:188)
 ~[kafka_2.11-2.0.0.22.jar:?]
        at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:174)
 ~[kafka_2.11-2.0.0.22.jar:?]
        at scala.Option.foreach(Option.scala:257) ~[scala-library-2.11.12.jar:?]
        at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:174)
 ~[kafka_2.11-2.0.0.22.jar:?]
        at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:171)
 ~[kafka_2.11-2.0.0.22.jar:?]
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
~[scala-library-2.11.12.jar:?]
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
~[scala-library-2.11.12.jar:?]
        at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:171)
 ~[kafka_2.11-2.0.0.22.jar:?]
        at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
 ~[kafka_2.11-2.0.0.22.jar:?]
        at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
 ~[kafka_2.11-2.0.0.22.jar:?]
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) 
~[kafka_2.11-2.0.0.22.jar:?]
        at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:169)
 ~[kafka_2.11-2.0.0.22.jar:?]
        at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:115) 
~[kafka_2.11-2.0.0.22.jar:?]
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) 
~[kafka_2.11-2.0.0.22.jar:?]{noformat}
Worse more, we found out that if there is a exception thrown in 
ReplicaFetcherManager shutdown, we basically will skip purgatory shutdown and 
HW checkpoint and in our case we didn't see the "Shut down completely" log:
{code:java}
 def shutdown(checkpointHW: Boolean = true) {
    info("Shutting down")
    removeMetrics()
    if (logDirFailureHandler != null)
      logDirFailureHandler.shutdown()
    replicaFetcherManager.shutdown()
    replicaAlterLogDirsManager.shutdown()
    delayedFetchPurgatory.shutdown()
    delayedProducePurgatory.shutdown()
    delayedDeleteRecordsPurgatory.shutdown()
    if (checkpointHW)
      checkpointHighWatermarks()
    info("Shut down completely")
  }
{code}
The reason why we see this is that after KAFKA-6051, we close leaderEndPoint in 
replica fetcher thread initiateShutdown to try to preempt in-progress fetch 
request and accelerate repica fetcher thread shutdown. However, leaderEndpoint 
can throw an Exception when the replica fetcher thread is still actively 
fetching.

 

I am wondering whether we should try to catch the exception thrown in 
"leaderEndpoint.close()" instead of letting it throw up in the call stack. In 
my opinion, it is safe to do so because ReplicaFetcherThread.initiateShutdown 
will be called when:
 # Server shutdown – In this case we will shut down the process anyway so even 
though we fail to close leader enpoint cleanly there is no harm.
 # shutdownIdleFetcherThread – In this case the fetcher thread is idle and we 
will not use it again anyway so there is no harm either.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to