[ 
https://issues.apache.org/jira/browse/KAFKA-7464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655709#comment-16655709
 ] 

ASF GitHub Bot commented on KAFKA-7464:
---------------------------------------

lindong28 closed pull request #5808: KAFKA-7464: catch exceptions in 
"leaderEndpoint.close()" when shutting down ReplicaFetcherThread
URL: https://github.com/apache/kafka/pull/5808
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index aeeaf29516a..6b119308205 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -115,7 +115,19 @@ class ReplicaFetcherThread(name: String,
   override def initiateShutdown(): Boolean = {
     val justShutdown = super.initiateShutdown()
     if (justShutdown) {
-      leaderEndpoint.close()
+      // leaderEndpoint.close can throw an exception when the replica fetcher 
thread is still
+      // actively fetching because the selector can close the channel while 
sending the request
+      // after we initiate leaderEndpoint.close and the leaderEndpoint.close 
itself may also close
+      // the channel again. When this race condition happens, an exception 
will be thrown.
+      // Throwing the exception to the caller may fail the ReplicaManager 
shutdown. It is safe to catch
+      // the exception without here causing correctness issue because we are 
going to shutdown the thread
+      // and will not re-use the leaderEndpoint anyway.
+      try {
+        leaderEndpoint.close()
+      } catch {
+        case t: Throwable =>
+          debug(s"Fail to close leader endpoint $leaderEndpoint after 
initiating replica fetcher thread shutdown", t)
+      }
     }
     justShutdown
   }
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index 4d54c81044a..c9d9b966964 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -24,16 +24,16 @@ import kafka.cluster.Partition
 import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.server.epoch.LeaderEpochFileCache
 import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
-import kafka.utils.TestUtils
+import kafka.utils.{LogCaptureAppender, TestUtils}
 import org.apache.kafka.clients.ClientResponse
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.internals.PartitionStates
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.protocol.Errors._
 import org.apache.kafka.common.requests.{EpochEndOffset, 
OffsetsForLeaderEpochRequest}
 import org.apache.kafka.common.requests.EpochEndOffset._
 import org.apache.kafka.common.utils.SystemTime
+import org.apache.log4j.Level
 import org.easymock.EasyMock._
 import org.easymock.{Capture, CaptureType, IAnswer}
 import org.junit.Assert._
@@ -793,6 +793,45 @@ class ReplicaFetcherThreadTest {
     assertEquals(49, truncateToCapture.getValue)
   }
 
+  @Test
+  def 
shouldCatchExceptionFromBlockingSendWhenShuttingDownReplicaFetcherThread(): 
Unit = {
+    val props = TestUtils.createBrokerConfig(1, "localhost:1234")
+    val config = KafkaConfig.fromProps(props)
+    val mockBlockingSend = createMock(classOf[BlockingSend])
+
+    expect(mockBlockingSend.close()).andThrow(new 
IllegalArgumentException()).once()
+    replay(mockBlockingSend)
+
+    val thread = new ReplicaFetcherThread(
+      name = "bob",
+      fetcherId = 0,
+      sourceBroker = brokerEndPoint,
+      brokerConfig = config,
+      replicaMgr = null,
+      metrics =  new Metrics(),
+      time = new SystemTime(),
+      quota = null,
+      leaderEndpointBlockingSend = Some(mockBlockingSend))
+
+    val previousLevel = 
LogCaptureAppender.setClassLoggerLevel(thread.getClass, Level.DEBUG)
+    val logCaptureAppender = LogCaptureAppender.createAndRegister()
+
+    try {
+      thread.initiateShutdown()
+
+      val event = logCaptureAppender.getMessages.find(e => e.getLevel == 
Level.DEBUG
+        && e.getRenderedMessage.contains(s"Fail to close leader endpoint 
$mockBlockingSend after initiating replica fetcher thread shutdown")
+        && e.getThrowableInformation != null
+        && e.getThrowableInformation.getThrowable.getClass.getName.equals(new 
IllegalArgumentException().getClass.getName))
+      assertTrue(event.isDefined)
+
+      verify(mockBlockingSend)
+    } finally {
+      LogCaptureAppender.unregister(logCaptureAppender)
+      LogCaptureAppender.setClassLoggerLevel(thread.getClass, previousLevel)
+    }
+  }
+
   def stub(replica: Replica, partition: Partition, replicaManager: 
ReplicaManager) = {
     
expect(replicaManager.localReplica(t1p0)).andReturn(Some(replica)).anyTimes()
     
expect(replicaManager.localReplicaOrException(t1p0)).andReturn(replica).anyTimes()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fail to shutdown ReplicaManager during broker cleaned shutdown
> --------------------------------------------------------------
>
>                 Key: KAFKA-7464
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7464
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.0.0
>            Reporter: Zhanxiang (Patrick) Huang
>            Assignee: Zhanxiang (Patrick) Huang
>            Priority: Critical
>             Fix For: 2.0.1, 2.1.0
>
>
> In 2.0 deployment, we saw the following log when shutting down the 
> ReplicaManager in broker cleaned shutdown:
> {noformat}
> 2018/09/27 08:22:18.699 WARN [CoreUtils$] [Thread-1] [kafka-server] [] null
> java.lang.IllegalArgumentException: null
>         at java.nio.Buffer.position(Buffer.java:244) ~[?:1.8.0_121]
>         at sun.nio.ch.IOUtil.write(IOUtil.java:68) ~[?:1.8.0_121]
>         at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) 
> ~[?:1.8.0_121]
>         at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:214)
>  ~[kafka-clients-2.0.0.22.jar:?]
>         at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:164)
>  ~[kafka-clients-2.0.0.22.jar:?]
>         at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:806) 
> ~[kafka-clients-2.0.0.22.jar:?]
>         at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:107) 
> ~[kafka-clients-2.0.0.22.jar:?]
>         at 
> org.apache.kafka.common.network.Selector.doClose(Selector.java:751) 
> ~[kafka-clients-2.0.0.22.jar:?]
>         at org.apache.kafka.common.network.Selector.close(Selector.java:739) 
> ~[kafka-clients-2.0.0.22.jar:?]
>         at org.apache.kafka.common.network.Selector.close(Selector.java:701) 
> ~[kafka-clients-2.0.0.22.jar:?]
>         at org.apache.kafka.common.network.Selector.close(Selector.java:315) 
> ~[kafka-clients-2.0.0.22.jar:?]
>         at 
> org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:595) 
> ~[kafka-clients-2.0.0.22.jar:?]
>         at 
> kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:108)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:183)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:182)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>  ~[scala-library-2.11.12.jar:?]
>         at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) 
> ~[scala-library-2.11.12.jar:?]
>         at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) 
> ~[scala-library-2.11.12.jar:?]
>         at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) 
> ~[scala-library-2.11.12.jar:?]
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) 
> ~[scala-library-2.11.12.jar:?]
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) 
> ~[scala-library-2.11.12.jar:?]
>         at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>  ~[scala-library-2.11.12.jar:?]
>         at 
> kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:182)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.ReplicaFetcherManager.shutdown(ReplicaFetcherManager.scala:37) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.server.ReplicaManager.shutdown(ReplicaManager.scala:1471) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.KafkaServer$$anonfun$shutdown$12.apply$mcV$sp(KafkaServer.scala:616)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.server.KafkaServer.shutdown(KafkaServer.scala:616) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> {noformat}
> After that, we noticed that some of the replica fetcher thread fail to 
> shutdown:
> {noformat}
> 2018/09/27 08:22:46.176 ERROR [LogDirFailureChannel] 
> [ReplicaFetcherThread-26-13085] [kafka-server] [] Error while rolling log 
> segment for video-social-gestures-30 in dir /export/content/kafka/i001_caches
> java.nio.channels.ClosedChannelException: null
>         at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110) 
> ~[?:1.8.0_121]
>         at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300) 
> ~[?:1.8.0_121]
>         at 
> org.apache.kafka.common.record.FileRecords.truncateTo(FileRecords.java:244) 
> ~[kafka-clients-2.0.0.22.jar:?]
>         at 
> org.apache.kafka.common.record.FileRecords.trim(FileRecords.java:206) 
> ~[kafka-clients-2.0.0.22.jar:?]
>         at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:512) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1493) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1493) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at scala.Option.foreach(Option.scala:257) 
> ~[scala-library-2.11.12.jar:?]
>         at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1493) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1479) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.log.Log.maybeHandleIOException(Log.scala:1856) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.log.Log.roll(Log.scala:1479) ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1465) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.log.Log$$anonfun$append$2.apply(Log.scala:868) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.log.Log$$anonfun$append$2.apply(Log.scala:762) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.log.Log.maybeHandleIOException(Log.scala:1856) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.log.Log.append(Log.scala:762) ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.log.Log.appendAsFollower(Log.scala:743) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.cluster.Partition$$anonfun$doAppendRecordsToFollowerOrFutureReplica$1.apply(Partition.scala:601)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica(Partition.scala:588)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica(Partition.scala:608)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:130)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:43)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:188)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:174)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at scala.Option.foreach(Option.scala:257) 
> ~[scala-library-2.11.12.jar:?]
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:174)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:171)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  ~[scala-library-2.11.12.jar:?]
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
> ~[scala-library-2.11.12.jar:?]
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:171)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:169)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:115) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) 
> ~[kafka_2.11-2.0.0.22.jar:?]{noformat}
> Worse more, we found out that if there is a exception thrown in 
> ReplicaFetcherManager shutdown, we basically will skip purgatory shutdown and 
> HW checkpoint and in our case we didn't see the "Shut down completely" log:
> {code:java}
>  def shutdown(checkpointHW: Boolean = true) {
>     info("Shutting down")
>     removeMetrics()
>     if (logDirFailureHandler != null)
>       logDirFailureHandler.shutdown()
>     replicaFetcherManager.shutdown()
>     replicaAlterLogDirsManager.shutdown()
>     delayedFetchPurgatory.shutdown()
>     delayedProducePurgatory.shutdown()
>     delayedDeleteRecordsPurgatory.shutdown()
>     if (checkpointHW)
>       checkpointHighWatermarks()
>     info("Shut down completely")
>   }
> {code}
> The reason why we see this is that after KAFKA-6051, we close leaderEndPoint 
> in replica fetcher thread initiateShutdown to try to preempt in-progress 
> fetch request and accelerate repica fetcher thread shutdown. However, 
> leaderEndpoint can throw an Exception when the replica fetcher thread is 
> still actively fetching.
>  
> I am wondering whether we should try to catch the exception thrown in 
> "leaderEndpoint.close()" instead of letting it throw up in the call stack. In 
> my opinion, it is safe to do so because ReplicaFetcherThread.initiateShutdown 
> will be called when:
>  # Server shutdown – In this case we will shut down the process anyway so 
> even though we fail to close leader enpoint cleanly there is no harm.
>  # shutdownIdleFetcherThread – In this case the fetcher thread is idle and we 
> will not use it again anyway so there is no harm either.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to