[ 
https://issues.apache.org/jira/browse/KAFKA-7464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16652394#comment-16652394
 ] 

ASF GitHub Bot commented on KAFKA-7464:
---------------------------------------

hzxa21 opened a new pull request #5808: KAFKA-7464: catch exceptions in 
"leaderEndpoint.close()" when shutting down ReplicaFetcherThread
URL: https://github.com/apache/kafka/pull/5808
 
 
   After KAFKA-6051, we close leaderEndPoint in replica fetcher thread 
initiateShutdown to try to preempt in-progress fetch request and accelerate 
repica fetcher thread shutdown. However, leaderEndpoint can throw an Exception 
when the replica fetcher thread is still actively fetching, which can cause 
ReplicaManager to fail to shutdown cleanly. This PR catches the exceptions 
thrown in "leaderEndpoint.close()" instead of letting it throw up in the call 
stack.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fail to shutdown ReplicaManager during broker cleaned shutdown
> --------------------------------------------------------------
>
>                 Key: KAFKA-7464
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7464
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.0.0
>            Reporter: Zhanxiang (Patrick) Huang
>            Assignee: Zhanxiang (Patrick) Huang
>            Priority: Blocker
>             Fix For: 2.0.1, 2.1.0
>
>
> In 2.0 deployment, we saw the following log when shutting down the 
> ReplicaManager in broker cleaned shutdown:
> {noformat}
> 2018/09/27 08:22:18.699 WARN [CoreUtils$] [Thread-1] [kafka-server] [] null
> java.lang.IllegalArgumentException: null
>         at java.nio.Buffer.position(Buffer.java:244) ~[?:1.8.0_121]
>         at sun.nio.ch.IOUtil.write(IOUtil.java:68) ~[?:1.8.0_121]
>         at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) 
> ~[?:1.8.0_121]
>         at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:214)
>  ~[kafka-clients-2.0.0.22.jar:?]
>         at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:164)
>  ~[kafka-clients-2.0.0.22.jar:?]
>         at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:806) 
> ~[kafka-clients-2.0.0.22.jar:?]
>         at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:107) 
> ~[kafka-clients-2.0.0.22.jar:?]
>         at 
> org.apache.kafka.common.network.Selector.doClose(Selector.java:751) 
> ~[kafka-clients-2.0.0.22.jar:?]
>         at org.apache.kafka.common.network.Selector.close(Selector.java:739) 
> ~[kafka-clients-2.0.0.22.jar:?]
>         at org.apache.kafka.common.network.Selector.close(Selector.java:701) 
> ~[kafka-clients-2.0.0.22.jar:?]
>         at org.apache.kafka.common.network.Selector.close(Selector.java:315) 
> ~[kafka-clients-2.0.0.22.jar:?]
>         at 
> org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:595) 
> ~[kafka-clients-2.0.0.22.jar:?]
>         at 
> kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:108)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:183)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:182)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>  ~[scala-library-2.11.12.jar:?]
>         at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) 
> ~[scala-library-2.11.12.jar:?]
>         at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) 
> ~[scala-library-2.11.12.jar:?]
>         at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) 
> ~[scala-library-2.11.12.jar:?]
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) 
> ~[scala-library-2.11.12.jar:?]
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) 
> ~[scala-library-2.11.12.jar:?]
>         at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>  ~[scala-library-2.11.12.jar:?]
>         at 
> kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:182)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.ReplicaFetcherManager.shutdown(ReplicaFetcherManager.scala:37) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.server.ReplicaManager.shutdown(ReplicaManager.scala:1471) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.KafkaServer$$anonfun$shutdown$12.apply$mcV$sp(KafkaServer.scala:616)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.server.KafkaServer.shutdown(KafkaServer.scala:616) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> {noformat}
> After that, we noticed that some of the replica fetcher thread fail to 
> shutdown:
> {noformat}
> 2018/09/27 08:22:46.176 ERROR [LogDirFailureChannel] 
> [ReplicaFetcherThread-26-13085] [kafka-server] [] Error while rolling log 
> segment for video-social-gestures-30 in dir /export/content/kafka/i001_caches
> java.nio.channels.ClosedChannelException: null
>         at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110) 
> ~[?:1.8.0_121]
>         at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300) 
> ~[?:1.8.0_121]
>         at 
> org.apache.kafka.common.record.FileRecords.truncateTo(FileRecords.java:244) 
> ~[kafka-clients-2.0.0.22.jar:?]
>         at 
> org.apache.kafka.common.record.FileRecords.trim(FileRecords.java:206) 
> ~[kafka-clients-2.0.0.22.jar:?]
>         at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:512) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1493) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1493) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at scala.Option.foreach(Option.scala:257) 
> ~[scala-library-2.11.12.jar:?]
>         at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1493) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1479) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.log.Log.maybeHandleIOException(Log.scala:1856) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.log.Log.roll(Log.scala:1479) ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1465) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.log.Log$$anonfun$append$2.apply(Log.scala:868) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.log.Log$$anonfun$append$2.apply(Log.scala:762) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.log.Log.maybeHandleIOException(Log.scala:1856) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.log.Log.append(Log.scala:762) ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.log.Log.appendAsFollower(Log.scala:743) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.cluster.Partition$$anonfun$doAppendRecordsToFollowerOrFutureReplica$1.apply(Partition.scala:601)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica(Partition.scala:588)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica(Partition.scala:608)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:130)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:43)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:188)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:174)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at scala.Option.foreach(Option.scala:257) 
> ~[scala-library-2.11.12.jar:?]
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:174)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:171)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  ~[scala-library-2.11.12.jar:?]
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
> ~[scala-library-2.11.12.jar:?]
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:171)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:169)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:115) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) 
> ~[kafka_2.11-2.0.0.22.jar:?]{noformat}
> Worse more, we found out that if there is a exception thrown in 
> ReplicaFetcherManager shutdown, we basically will skip purgatory shutdown and 
> HW checkpoint and in our case we didn't see the "Shut down completely" log:
> {code:java}
>  def shutdown(checkpointHW: Boolean = true) {
>     info("Shutting down")
>     removeMetrics()
>     if (logDirFailureHandler != null)
>       logDirFailureHandler.shutdown()
>     replicaFetcherManager.shutdown()
>     replicaAlterLogDirsManager.shutdown()
>     delayedFetchPurgatory.shutdown()
>     delayedProducePurgatory.shutdown()
>     delayedDeleteRecordsPurgatory.shutdown()
>     if (checkpointHW)
>       checkpointHighWatermarks()
>     info("Shut down completely")
>   }
> {code}
> The reason why we see this is that after KAFKA-6051, we close leaderEndPoint 
> in replica fetcher thread initiateShutdown to try to preempt in-progress 
> fetch request and accelerate repica fetcher thread shutdown. However, 
> leaderEndpoint can throw an Exception when the replica fetcher thread is 
> still actively fetching.
>  
> I am wondering whether we should try to catch the exception thrown in 
> "leaderEndpoint.close()" instead of letting it throw up in the call stack. In 
> my opinion, it is safe to do so because ReplicaFetcherThread.initiateShutdown 
> will be called when:
>  # Server shutdown – In this case we will shut down the process anyway so 
> even though we fail to close leader enpoint cleanly there is no harm.
>  # shutdownIdleFetcherThread – In this case the fetcher thread is idle and we 
> will not use it again anyway so there is no harm either.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to