Repository: kafka
Updated Branches:
  refs/heads/trunk 05357b703 -> cbef33f3d


KAFKA-5864; ReplicaFetcherThread should not die due to replica in offline log 
directory

Author: Dong Lin <lindon...@gmail.com>

Reviewers: Jun Rao <jun...@gmail.com>, Ismael Juma <ism...@juma.me.uk>

Closes #3820 from lindong28/KAFKA-5864


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cbef33f3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cbef33f3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cbef33f3

Branch: refs/heads/trunk
Commit: cbef33f3d0bc686e1258352cd5e13143b3f09d78
Parents: 05357b7
Author: Dong Lin <lindon...@gmail.com>
Authored: Tue Oct 3 17:11:21 2017 -0700
Committer: Jun Rao <jun...@gmail.com>
Committed: Tue Oct 3 17:11:21 2017 -0700

----------------------------------------------------------------------
 .../kafka/consumer/ConsumerFetcherThread.scala  | 18 +++--
 core/src/main/scala/kafka/log/Log.scala         | 35 +++++----
 core/src/main/scala/kafka/log/LogManager.scala  |  8 +-
 .../kafka/server/AbstractFetcherManager.scala   |  2 +-
 .../kafka/server/AbstractFetcherThread.scala    | 41 +++++-----
 .../kafka/server/ReplicaFetcherThread.scala     | 82 ++++++++++++--------
 .../scala/kafka/server/ReplicaManager.scala     |  5 ++
 .../scala/kafka/utils/ShutdownableThread.scala  |  2 +-
 .../kafka/api/LogDirFailureTest.scala           | 54 ++++++++++---
 .../integration/UncleanLeaderElectionTest.scala | 14 ++--
 .../server/AbstractFetcherThreadTest.scala      | 16 ++--
 .../kafka/server/ReplicaFetcherThreadTest.scala |  9 ++-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  4 +-
 13 files changed, 181 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/cbef33f3/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala 
b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index 4f14570..4c7c227 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -97,17 +97,19 @@ class ConsumerFetcherThread(name: String,
 
   // any logic for partitions whose leader has changed
   def handlePartitionsWithErrors(partitions: Iterable[TopicPartition]) {
-    removePartitions(partitions.toSet)
-    consumerFetcherManager.addPartitionsWithError(partitions)
+    if (partitions.nonEmpty) {
+      removePartitions(partitions.toSet)
+      consumerFetcherManager.addPartitionsWithError(partitions)
+    }
   }
 
-  protected def buildFetchRequest(partitionMap: 
collection.Seq[(TopicPartition, PartitionFetchState)]): FetchRequest = {
+  protected def buildFetchRequest(partitionMap: 
collection.Seq[(TopicPartition, PartitionFetchState)]): 
ResultWithPartitions[FetchRequest] = {
     partitionMap.foreach { case ((topicPartition, partitionFetchState)) =>
       if (partitionFetchState.isReadyForFetch)
         fetchRequestBuilder.addFetch(topicPartition.topic, 
topicPartition.partition, partitionFetchState.fetchOffset, fetchSize)
     }
 
-    new FetchRequest(fetchRequestBuilder.build())
+    ResultWithPartitions(new FetchRequest(fetchRequestBuilder.build()), Set())
   }
 
   protected def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, 
PartitionData)] =
@@ -115,11 +117,15 @@ class ConsumerFetcherThread(name: String,
       new TopicPartition(t, p) -> new PartitionData(value)
     }
 
-  override def buildLeaderEpochRequest(allPartitions: Seq[(TopicPartition, 
PartitionFetchState)]): Map[TopicPartition, Int] = { Map() }
+  override def buildLeaderEpochRequest(allPartitions: Seq[(TopicPartition, 
PartitionFetchState)]): ResultWithPartitions[Map[TopicPartition, Int]] = {
+    ResultWithPartitions(Map(), Set())
+  }
 
   override def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): 
Map[TopicPartition, EpochEndOffset] = { Map() }
 
-  override def maybeTruncate(fetchedEpochs: Map[TopicPartition, 
EpochEndOffset]): Map[TopicPartition, Long] = { Map() }
+  override def maybeTruncate(fetchedEpochs: Map[TopicPartition, 
EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, Long]] = {
+    ResultWithPartitions(Map(), Set())
+  }
 }
 
 @deprecated("This object has been deprecated and will be removed in a future 
release. " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/cbef33f3/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index e6c774a..37137ec 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1412,28 +1412,31 @@ class Log(@volatile var dir: File,
    * Truncate this log so that it ends with the greatest offset < targetOffset.
    *
    * @param targetOffset The offset to truncate to, an upper bound on all 
offsets in the log after truncation is complete.
+   * @return True iff targetOffset < logEndOffset
    */
-  private[log] def truncateTo(targetOffset: Long) {
+  private[log] def truncateTo(targetOffset: Long): Boolean = {
     maybeHandleIOException(s"Error while truncating log to offset 
$targetOffset for $topicPartition in dir ${dir.getParent}") {
       if (targetOffset < 0)
         throw new IllegalArgumentException("Cannot truncate to a negative 
offset (%d).".format(targetOffset))
       if (targetOffset >= logEndOffset) {
         info("Truncating %s to %d has no effect as the largest offset in the 
log is %d.".format(name, targetOffset, logEndOffset - 1))
-        return
-      }
-      info("Truncating log %s to offset %d.".format(name, targetOffset))
-      lock synchronized {
-        if (segments.firstEntry.getValue.baseOffset > targetOffset) {
-          truncateFullyAndStartAt(targetOffset)
-        } else {
-          val deletable = logSegments.filter(segment => segment.baseOffset > 
targetOffset)
-          deletable.foreach(deleteSegment)
-          activeSegment.truncateTo(targetOffset)
-          updateLogEndOffset(targetOffset)
-          this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
-          this.logStartOffset = math.min(targetOffset, this.logStartOffset)
-          leaderEpochCache.clearAndFlushLatest(targetOffset)
-          loadProducerState(targetOffset, reloadFromCleanShutdown = false)
+        false
+      } else {
+        info("Truncating log %s to offset %d.".format(name, targetOffset))
+        lock synchronized {
+          if (segments.firstEntry.getValue.baseOffset > targetOffset) {
+            truncateFullyAndStartAt(targetOffset)
+          } else {
+            val deletable = logSegments.filter(segment => segment.baseOffset > 
targetOffset)
+            deletable.foreach(deleteSegment)
+            activeSegment.truncateTo(targetOffset)
+            updateLogEndOffset(targetOffset)
+            this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
+            this.logStartOffset = math.min(targetOffset, this.logStartOffset)
+            leaderEpochCache.clearAndFlushLatest(targetOffset)
+            loadProducerState(targetOffset, reloadFromCleanShutdown = false)
+          }
+          true
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cbef33f3/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 690f52a..f4bd8a2 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -437,6 +437,7 @@ class LogManager(logDirs: Array[File],
    * @param partitionOffsets Partition logs that need to be truncated
    */
   def truncateTo(partitionOffsets: Map[TopicPartition, Long]) {
+    var truncated = false
     for ((topicPartition, truncateOffset) <- partitionOffsets) {
       val log = logs.get(topicPartition)
       // If the log does not exist, skip it
@@ -446,7 +447,8 @@ class LogManager(logDirs: Array[File],
         if (needToStopCleaner)
           cleaner.abortAndPauseCleaning(topicPartition)
         try {
-          log.truncateTo(truncateOffset)
+          if (log.truncateTo(truncateOffset))
+            truncated = true
           if (needToStopCleaner)
             cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, 
topicPartition, log.activeSegment.baseOffset)
         } finally {
@@ -455,7 +457,9 @@ class LogManager(logDirs: Array[File],
         }
       }
     }
-    checkpointLogRecoveryOffsets()
+
+    if (truncated)
+      checkpointLogRecoveryOffsets()
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/cbef33f3/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index 2b2aa7b..0d7806c 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.common.utils.Utils
 abstract class AbstractFetcherManager(protected val name: String, clientId: 
String, numFetchers: Int = 1)
   extends Logging with KafkaMetricsGroup {
   // map of (source broker_id, fetcher_id per source broker) => fetcher
-  private val fetcherThreadMap = new mutable.HashMap[BrokerIdAndFetcherId, 
AbstractFetcherThread]
+  val fetcherThreadMap = new mutable.HashMap[BrokerIdAndFetcherId, 
AbstractFetcherThread]
   private val mapLock = new Object
   this.logIdent = "[" + name + "] "
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cbef33f3/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index fd26e11..e772ac3 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -18,6 +18,7 @@
 package kafka.server
 
 import java.util.concurrent.locks.ReentrantLock
+
 import kafka.cluster.BrokerEndPoint
 import kafka.utils.{DelayedItem, Pool, ShutdownableThread}
 import org.apache.kafka.common.errors.KafkaStorageException
@@ -27,10 +28,12 @@ import kafka.utils.CoreUtils.inLock
 import org.apache.kafka.common.errors.CorruptRecordException
 import org.apache.kafka.common.protocol.Errors
 import AbstractFetcherThread._
+
 import scala.collection.{Map, Set, mutable}
 import scala.collection.JavaConverters._
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicLong
+
 import com.yammer.metrics.core.Gauge
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.{FatalExitError, PartitionStates}
@@ -71,13 +74,15 @@ abstract class AbstractFetcherThread(name: String,
   // deal with partitions with errors, potentially due to leadership changes
   protected def handlePartitionsWithErrors(partitions: 
Iterable[TopicPartition])
 
-  protected def buildLeaderEpochRequest(allPartitions: Seq[(TopicPartition, 
PartitionFetchState)]): Map[TopicPartition, Int]
+  protected def buildLeaderEpochRequest(allPartitions: Seq[(TopicPartition, 
PartitionFetchState)]): ResultWithPartitions[Map[TopicPartition, Int]]
 
   protected def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): 
Map[TopicPartition, EpochEndOffset]
 
-  protected def maybeTruncate(fetchedEpochs: Map[TopicPartition, 
EpochEndOffset]): Map[TopicPartition, Long]
+  protected def maybeTruncate(fetchedEpochs: Map[TopicPartition, 
EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, Long]]
 
-  protected def buildFetchRequest(partitionMap: Seq[(TopicPartition, 
PartitionFetchState)]): REQ
+  protected def buildFetchRequest(partitionMap: Seq[(TopicPartition, 
PartitionFetchState)]): ResultWithPartitions[REQ]
+
+  case class ResultWithPartitions[R](result: R, partitionsWithError: 
Set[TopicPartition])
 
   protected def fetch(fetchRequest: REQ): Seq[(TopicPartition, PD)]
 
@@ -98,11 +103,12 @@ abstract class AbstractFetcherThread(name: String,
   override def doWork() {
     maybeTruncate()
     val fetchRequest = inLock(partitionMapLock) {
-      val fetchRequest = buildFetchRequest(states)
+      val ResultWithPartitions(fetchRequest, partitionsWithError) = 
buildFetchRequest(states)
       if (fetchRequest.isEmpty) {
         trace("There are no active partitions. Back off for %d ms before 
sending a fetch request".format(fetchBackOffMs))
         partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
       }
+      handlePartitionsWithErrors(partitionsWithError)
       fetchRequest
     }
     if (!fetchRequest.isEmpty)
@@ -119,7 +125,8 @@ abstract class AbstractFetcherThread(name: String,
     *   occur during truncation.
     */
   def maybeTruncate(): Unit = {
-    val epochRequests = inLock(partitionMapLock) { 
buildLeaderEpochRequest(states) }
+    val ResultWithPartitions(epochRequests, partitionsWithError) = 
inLock(partitionMapLock) { buildLeaderEpochRequest(states) }
+    handlePartitionsWithErrors(partitionsWithError)
 
     if (epochRequests.nonEmpty) {
       val fetchedEpochs = fetchEpochsFromLeader(epochRequests)
@@ -127,7 +134,8 @@ abstract class AbstractFetcherThread(name: String,
       inLock(partitionMapLock) {
         //Check no leadership changes happened whilst we were unlocked, 
fetching epochs
         val leaderEpochs = fetchedEpochs.filter { case (tp, _) => 
partitionStates.contains(tp) }
-        val truncationPoints = maybeTruncate(leaderEpochs)
+        val ResultWithPartitions(truncationPoints, partitionsWithError) = 
maybeTruncate(leaderEpochs)
+        handlePartitionsWithErrors(partitionsWithError)
         markTruncationComplete(truncationPoints)
       }
     }
@@ -136,11 +144,6 @@ abstract class AbstractFetcherThread(name: String,
   private def processFetchRequest(fetchRequest: REQ) {
     val partitionsWithError = mutable.Set[TopicPartition]()
 
-    def updatePartitionsWithError(partition: TopicPartition): Unit = {
-      partitionsWithError += partition
-      partitionStates.moveToEnd(partition)
-    }
-
     var responseData: Seq[(TopicPartition, PD)] = Seq.empty
 
     try {
@@ -151,7 +154,7 @@ abstract class AbstractFetcherThread(name: String,
         if (isRunning.get) {
           warn(s"Error in fetch to broker ${sourceBroker.id}, request 
$fetchRequest", t)
           inLock(partitionMapLock) {
-            
partitionStates.partitionSet.asScala.foreach(updatePartitionsWithError)
+            partitionsWithError ++= partitionStates.partitionSet.asScala
             // there is an error occurred while fetching partitions, sleep a 
while
             // note that `ReplicaFetcherThread.handlePartitionsWithError` will 
also introduce the same delay for every
             // partition with error effectively doubling the delay. It would 
be good to improve this.
@@ -195,10 +198,10 @@ abstract class AbstractFetcherThread(name: String,
                       // 2. If the message is corrupt due to a transient state 
in the log (truncation, partial writes can cause this), we simply continue and
                       // should get fixed in the subsequent fetches
                       logger.error(s"Found invalid messages during fetch for 
partition $topicPartition offset ${currentPartitionFetchState.fetchOffset} 
error ${ime.getMessage}")
-                      updatePartitionsWithError(topicPartition)
+                      partitionsWithError += topicPartition
                     case e: KafkaStorageException =>
                       logger.error(s"Error while processing data for partition 
$topicPartition", e)
-                      updatePartitionsWithError(topicPartition)
+                      partitionsWithError += topicPartition
                     case e: Throwable =>
                       throw new KafkaException(s"Error processing data for 
partition $topicPartition " +
                         s"offset ${currentPartitionFetchState.fetchOffset}", e)
@@ -212,12 +215,12 @@ abstract class AbstractFetcherThread(name: String,
                     case e: FatalExitError => throw e
                     case e: Throwable =>
                       error(s"Error getting offset for partition 
$topicPartition to broker ${sourceBroker.id}", e)
-                      updatePartitionsWithError(topicPartition)
+                      partitionsWithError += topicPartition
                   }
                 case _ =>
                   if (isRunning.get) {
                     error(s"Error for partition $topicPartition to broker 
%${sourceBroker.id}:${partitionData.exception.get}")
-                    updatePartitionsWithError(topicPartition)
+                    partitionsWithError += topicPartition
                   }
               }
             })
@@ -225,10 +228,9 @@ abstract class AbstractFetcherThread(name: String,
       }
     }
 
-    if (partitionsWithError.nonEmpty) {
+    if (partitionsWithError.nonEmpty)
       debug("handling partitions with error for 
%s".format(partitionsWithError))
-      handlePartitionsWithErrors(partitionsWithError)
-    }
+    handlePartitionsWithErrors(partitionsWithError)
   }
 
   def addPartitions(partitionAndOffsets: Map[TopicPartition, Long]) {
@@ -253,6 +255,7 @@ abstract class AbstractFetcherThread(name: String,
 
   /**
     * Loop through all partitions, marking them as truncation complete and 
applying the correct offset
+    *
     * @param partitions the partitions to mark truncation complete
     */
   private def markTruncationComplete(partitions: Map[TopicPartition, Long]) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/cbef33f3/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index cf652d6..b90e9e8 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -18,6 +18,7 @@
 package kafka.server
 
 import java.util
+
 import kafka.admin.AdminUtils
 import kafka.api.{FetchRequest => _, _}
 import kafka.cluster.{BrokerEndPoint, Replica}
@@ -26,6 +27,7 @@ import kafka.server.ReplicaFetcherThread._
 import kafka.server.epoch.LeaderEpochCache
 import org.apache.kafka.common.requests.EpochEndOffset._
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.KafkaStorageException
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
@@ -74,7 +76,7 @@ class ReplicaFetcherThread(name: String,
   private val fetchSize = brokerConfig.replicaFetchMaxBytes
   private val shouldSendLeaderEpochRequest: Boolean = 
brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV2
 
-  private def epochCache(tp: TopicPartition): LeaderEpochCache =  
replicaMgr.getReplica(tp).get.epochs.get
+  private def epochCacheOpt(tp: TopicPartition): Option[LeaderEpochCache] =  
replicaMgr.getReplica(tp).map(_.epochs.get)
 
   override def shutdown(): Unit = {
     super.shutdown()
@@ -83,7 +85,7 @@ class ReplicaFetcherThread(name: String,
 
   // process fetched data
   def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, 
partitionData: PartitionData) {
-    val replica = replicaMgr.getReplica(topicPartition).get
+    val replica = replicaMgr.getReplicaOrException(topicPartition)
     val records = partitionData.toRecords
 
     maybeWarnIfOversizedRecords(records, topicPartition)
@@ -128,7 +130,7 @@ class ReplicaFetcherThread(name: String,
    * Handle a partition whose offset is out of range and return a new fetch 
offset.
    */
   def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = {
-    val replica = replicaMgr.getReplica(topicPartition).get
+    val replica = replicaMgr.getReplicaOrException(topicPartition)
 
     /**
      * Unclean leader election: A follower goes down, in the meanwhile the 
leader keeps appending messages. The follower comes back up
@@ -194,7 +196,8 @@ class ReplicaFetcherThread(name: String,
 
   // any logic for partitions whose leader has changed
   def handlePartitionsWithErrors(partitions: Iterable[TopicPartition]) {
-    delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs.toLong)
+    if (partitions.nonEmpty)
+      delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs.toLong)
   }
 
   protected def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, 
PartitionData)] = {
@@ -226,20 +229,26 @@ class ReplicaFetcherThread(name: String,
     }
   }
 
-  override def buildFetchRequest(partitionMap: Seq[(TopicPartition, 
PartitionFetchState)]): FetchRequest = {
+  override def buildFetchRequest(partitionMap: Seq[(TopicPartition, 
PartitionFetchState)]): ResultWithPartitions[FetchRequest] = {
     val requestMap = new util.LinkedHashMap[TopicPartition, 
JFetchRequest.PartitionData]
+    val partitionsWithError = mutable.Set[TopicPartition]()
 
     partitionMap.foreach { case (topicPartition, partitionFetchState) =>
       // We will not include a replica in the fetch request if it should be 
throttled.
       if (partitionFetchState.isReadyForFetch && 
!shouldFollowerThrottle(quota, topicPartition)) {
-        val logStartOffset = 
replicaMgr.getReplicaOrException(topicPartition).logStartOffset
-        requestMap.put(topicPartition, new 
JFetchRequest.PartitionData(partitionFetchState.fetchOffset, logStartOffset, 
fetchSize))
+        try {
+          val logStartOffset = 
replicaMgr.getReplicaOrException(topicPartition).logStartOffset
+          requestMap.put(topicPartition, new 
JFetchRequest.PartitionData(partitionFetchState.fetchOffset, logStartOffset, 
fetchSize))
+        } catch {
+          case e: KafkaStorageException =>
+            partitionsWithError += topicPartition
+        }
       }
     }
 
     val requestBuilder = JFetchRequest.Builder.forReplica(fetchRequestVersion, 
replicaId, maxWait, minBytes, requestMap)
       .setMaxBytes(maxBytes)
-    new FetchRequest(requestBuilder)
+    ResultWithPartitions(new FetchRequest(requestBuilder), partitionsWithError)
   }
 
   /**
@@ -248,44 +257,49 @@ class ReplicaFetcherThread(name: String,
     *   otherwise we truncate to the leaders offset.
     * - If the leader replied with undefined epoch offset we must use the high 
watermark
     */
-  override def maybeTruncate(fetchedEpochs: Map[TopicPartition, 
EpochEndOffset]): Map[TopicPartition, Long] = {
+  override def maybeTruncate(fetchedEpochs: Map[TopicPartition, 
EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, Long]] = {
     val truncationPoints = 
scala.collection.mutable.HashMap.empty[TopicPartition, Long]
     val partitionsWithError = mutable.Set[TopicPartition]()
 
     fetchedEpochs.foreach { case (tp, epochOffset) =>
-      val replica = replicaMgr.getReplica(tp).get
-
-      if (epochOffset.hasError) {
-        info(s"Retrying leaderEpoch request for partition 
${replica.topicPartition} as the leader reported an error: 
${epochOffset.error}")
-        partitionsWithError += tp
-      } else {
-        val truncationOffset =
-          if (epochOffset.endOffset == UNDEFINED_EPOCH_OFFSET)
-            highWatermark(replica, epochOffset)
-          else if (epochOffset.endOffset >= replica.logEndOffset.messageOffset)
-            logEndOffset(replica, epochOffset)
-          else
-            epochOffset.endOffset
-
-        truncationPoints.put(tp, truncationOffset)
+      try {
+        val replica = replicaMgr.getReplicaOrException(tp)
+
+        if (epochOffset.hasError) {
+          info(s"Retrying leaderEpoch request for partition 
${replica.topicPartition} as the leader reported an error: 
${epochOffset.error}")
+          partitionsWithError += tp
+        } else {
+          val truncationOffset =
+            if (epochOffset.endOffset == UNDEFINED_EPOCH_OFFSET)
+              highWatermark(replica, epochOffset)
+            else if (epochOffset.endOffset >= 
replica.logEndOffset.messageOffset)
+              logEndOffset(replica, epochOffset)
+            else
+              epochOffset.endOffset
+
+          replicaMgr.logManager.truncateTo(Map(tp -> truncationOffset))
+          truncationPoints.put(tp, truncationOffset)
+        }
+      } catch {
+        case e: KafkaStorageException =>
+          info(s"Failed to truncate $tp", e)
+          partitionsWithError += tp
       }
     }
-    replicaMgr.logManager.truncateTo(truncationPoints)
 
-    // For partitions that encountered an error, delay them a bit before 
retrying the leader epoch request
-    delayPartitions(partitionsWithError, 
brokerConfig.replicaFetchBackoffMs.toLong)
-
-    truncationPoints
+    ResultWithPartitions(truncationPoints, partitionsWithError)
   }
 
-  override def buildLeaderEpochRequest(allPartitions: Seq[(TopicPartition, 
PartitionFetchState)]): Map[TopicPartition, Int] = {
-    val result = allPartitions
+  override def buildLeaderEpochRequest(allPartitions: Seq[(TopicPartition, 
PartitionFetchState)]): ResultWithPartitions[Map[TopicPartition, Int]] = {
+    val partitionEpochOpts = allPartitions
       .filter { case (_, state) => state.isTruncatingLog }
-      .map { case (tp, _) => tp -> epochCache(tp).latestEpoch }.toMap
+      .map { case (tp, _) => tp -> epochCacheOpt(tp) }.toMap
 
-    debug(s"Build leaderEpoch request $result")
+    val (partitionsWithEpoch, partitionsWithoutEpoch) = 
partitionEpochOpts.partition { case (tp, epochCacheOpt) => 
epochCacheOpt.nonEmpty }
 
-    result
+    debug(s"Build leaderEpoch request $partitionsWithEpoch")
+    val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> 
epochCacheOpt.get.latestEpoch() }
+    ResultWithPartitions(result, partitionsWithoutEpoch.keys.toSet)
   }
 
   override def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): 
Map[TopicPartition, EpochEndOffset] = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/cbef33f3/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index a361e16..3a4ecef 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1349,6 +1349,11 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  // Used only by test
+  def markPartitionOffline(tp: TopicPartition) {
+    allPartitions.put(tp, ReplicaManager.OfflinePartition)
+  }
+
   // logDir should be an absolute path
   def handleLogDirFailure(dir: String) {
     if (!logManager.isLogDirOnline(dir))

http://git-wip-us.apache.org/repos/asf/kafka/blob/cbef33f3/core/src/main/scala/kafka/utils/ShutdownableThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ShutdownableThread.scala 
b/core/src/main/scala/kafka/utils/ShutdownableThread.scala
index 59ebc54..6ed0968 100644
--- a/core/src/main/scala/kafka/utils/ShutdownableThread.scala
+++ b/core/src/main/scala/kafka/utils/ShutdownableThread.scala
@@ -27,7 +27,7 @@ abstract class ShutdownableThread(val name: String, val 
isInterruptible: Boolean
   this.setDaemon(false)
   this.logIdent = "[" + name + "]: "
   val isRunning: AtomicBoolean = new AtomicBoolean(true)
-  private val shutdownLatch = new CountDownLatch(1)
+  val shutdownLatch = new CountDownLatch(1)
 
   def shutdown(): Unit = {
     initiateShutdown()

http://git-wip-us.apache.org/repos/asf/kafka/blob/cbef33f3/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala 
b/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala
index 6749a57..b1ac47b 100644
--- a/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala
+++ b/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala
@@ -29,6 +29,9 @@ import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.errors.{KafkaStorageException, 
NotLeaderForPartitionException}
 import org.junit.{Before, Test}
 import org.junit.Assert.assertTrue
+import org.junit.Assert.assertEquals
+
+import scala.collection.JavaConverters._
 
 /**
   * Test whether clients can producer and consume when there is log directory 
failure
@@ -41,37 +44,66 @@ class LogDirFailureTest extends IntegrationTestHarness {
   val consumerCount: Int = 1
   val serverCount: Int = 2
   private val topic = "topic"
+  private val partitionNum = 12
 
-  this.logDirCount = 2
+  this.logDirCount = 3
   this.producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "0")
   this.producerConfig.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 
"100")
   
this.serverConfig.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp,
 "60000")
-
+  this.serverConfig.setProperty(KafkaConfig.NumReplicaFetchersProp, "1")
 
   @Before
   override def setUp() {
     super.setUp()
-    TestUtils.createTopic(zkUtils, topic, 1, 2, servers = servers)
+    TestUtils.createTopic(zkUtils, topic, partitionNum, serverCount, servers = 
servers)
   }
 
   @Test
   def testIOExceptionDuringLogRoll() {
-    testProduceAfterLogDirFailure(Roll)
+    testProduceAfterLogDirFailureOnLeader(Roll)
   }
 
   @Test
   def testIOExceptionDuringCheckpoint() {
-    testProduceAfterLogDirFailure(Checkpoint)
+    testProduceAfterLogDirFailureOnLeader(Checkpoint)
+  }
+
+  @Test
+  def testReplicaFetcherThreadAfterLogDirFailureOnFollower() {
+    val producer = producers.head
+    val partition = new TopicPartition(topic, 0)
+
+    val partitionInfo = 
producer.partitionsFor(topic).asScala.find(_.partition() == 0).get
+    val leaderServerId = partitionInfo.leader().id()
+    val leaderServer = servers.find(_.config.brokerId == leaderServerId).get
+    val followerServerId = partitionInfo.replicas().map(_.id()).find(_ != 
leaderServerId).get
+    val followerServer = servers.find(_.config.brokerId == 
followerServerId).get
+
+    followerServer.replicaManager.markPartitionOffline(partition)
+    // Send a message to another partition whose leader is the same as 
partition 0
+    // so that ReplicaFetcherThread on the follower will get response from 
leader immediately
+    val anotherPartitionWithTheSameLeader = (1 until partitionNum).find { i =>
+      leaderServer.replicaManager.getPartition(new TopicPartition(topic, 
i)).flatMap(_.leaderReplicaIfLocal).isDefined
+    }.get
+    val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, 
anotherPartitionWithTheSameLeader, topic.getBytes, "message".getBytes)
+    // When producer.send(...).get returns, it is guaranteed that 
ReplicaFetcherThread on the follower
+    // has fetched from the leader and attempts to append to the offline 
replica.
+    producer.send(record).get
+
+    assertEquals(serverCount, leaderServer.replicaManager.getPartition(new 
TopicPartition(topic, 
anotherPartitionWithTheSameLeader)).get.inSyncReplicas.size)
+    
followerServer.replicaManager.replicaFetcherManager.fetcherThreadMap.values.foreach
 { thread =>
+      assertTrue("ReplicaFetcherThread should still be working if its 
partition count > 0", thread.shutdownLatch.getCount > 0)
+    }
   }
 
-  def testProduceAfterLogDirFailure(failureType: LogDirFailureType) {
+  def testProduceAfterLogDirFailureOnLeader(failureType: LogDirFailureType) {
     val consumer = consumers.head
     subscribeAndWaitForAssignment(topic, consumer)
     val producer = producers.head
     val partition = new TopicPartition(topic, 0)
     val record = new ProducerRecord(topic, 0, s"key".getBytes, 
s"value".getBytes)
 
-    val leaderServerId = producer.partitionsFor(topic).get(0).leader().id()
+    val leaderServerId = 
producer.partitionsFor(topic).asScala.find(_.partition() == 0).get.leader().id()
     val leaderServer = servers.find(_.config.brokerId == leaderServerId).get
 
     // The first send() should succeed
@@ -81,8 +113,8 @@ class LogDirFailureTest extends IntegrationTestHarness {
     }, "Expected the first message", 3000L)
 
     // Make log directory of the partition on the leader broker inaccessible 
by replacing it with a file
-    val replica = leaderServer.replicaManager.getReplica(partition)
-    val logDir = replica.get.log.get.dir.getParentFile
+    val replica = leaderServer.replicaManager.getReplicaOrException(partition)
+    val logDir = replica.log.get.dir.getParentFile
     CoreUtils.swallow(Utils.delete(logDir))
     logDir.createNewFile()
     assertTrue(logDir.isFile)
@@ -99,7 +131,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
     }
 
     // Wait for ReplicaHighWatermarkCheckpoint to happen so that the log 
directory of the topic will be offline
-    TestUtils.waitUntilTrue(() => 
!leaderServer.logManager.liveLogDirs.contains(logDir), "Expected log directory 
offline", 3000L)
+    TestUtils.waitUntilTrue(() => 
!leaderServer.logManager.isLogDirOnline(logDir.getAbsolutePath), "Expected log 
directory offline", 3000L)
     assertTrue(leaderServer.replicaManager.getReplica(partition).isEmpty)
 
     // The second send() should fail due to either KafkaStorageException or 
NotLeaderForPartitionException
@@ -120,7 +152,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
     TestUtils.waitUntilTrue(() => {
       // ProduceResponse may contain KafkaStorageException and trigger 
metadata update
       producer.send(record)
-      producer.partitionsFor(topic).get(0).leader().id() != leaderServerId
+      producer.partitionsFor(topic).asScala.find(_.partition() == 
0).get.leader().id() != leaderServerId
     }, "Expected new leader for the partition", 6000L)
 
     // Consumer should receive some messages

http://git-wip-us.apache.org/repos/asf/kafka/blob/cbef33f3/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala 
b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 24421d0..0af0a04 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -182,14 +182,14 @@ class UncleanLeaderElectionTest extends 
ZooKeeperTestHarness {
     val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1
     debug("Follower for " + topic  + " is: %s".format(followerId))
 
-    produceMessage(servers, topic, "first")
+    produceMessage(servers, topic, null, "first")
     waitUntilMetadataIsPropagated(servers, topic, partitionId)
     assertEquals(List("first"), consumeAllMessages(topic))
 
     // shutdown follower server
     servers.filter(server => server.config.brokerId == followerId).map(server 
=> shutdownServer(server))
 
-    produceMessage(servers, topic, "second")
+    produceMessage(servers, topic, null, "second")
     assertEquals(List("first", "second"), consumeAllMessages(topic))
 
     // shutdown leader and then restart follower
@@ -199,7 +199,7 @@ class UncleanLeaderElectionTest extends 
ZooKeeperTestHarness {
     // wait until new leader is (uncleanly) elected
     waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, 
newLeaderOpt = Some(followerId))
 
-    produceMessage(servers, topic, "third")
+    produceMessage(servers, topic, null, "third")
 
     // second message was lost due to unclean election
     assertEquals(List("first", "third"), consumeAllMessages(topic))
@@ -215,14 +215,14 @@ class UncleanLeaderElectionTest extends 
ZooKeeperTestHarness {
     val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1
     debug("Follower for " + topic  + " is: %s".format(followerId))
 
-    produceMessage(servers, topic, "first")
+    produceMessage(servers, topic, null, "first")
     waitUntilMetadataIsPropagated(servers, topic, partitionId)
     assertEquals(List("first"), consumeAllMessages(topic))
 
     // shutdown follower server
     servers.filter(server => server.config.brokerId == followerId).map(server 
=> shutdownServer(server))
 
-    produceMessage(servers, topic, "second")
+    produceMessage(servers, topic, null, "second")
     assertEquals(List("first", "second"), consumeAllMessages(topic))
 
     // shutdown leader and then restart follower
@@ -234,7 +234,7 @@ class UncleanLeaderElectionTest extends 
ZooKeeperTestHarness {
 
     // message production and consumption should both fail while leader is down
     try {
-      produceMessage(servers, topic, "third")
+      produceMessage(servers, topic, null, "third")
       fail("Message produced while leader is down should fail, but it 
succeeded")
     } catch {
       case e: ExecutionException if e.getCause.isInstanceOf[TimeoutException] 
=> // expected
@@ -246,7 +246,7 @@ class UncleanLeaderElectionTest extends 
ZooKeeperTestHarness {
     servers.filter(server => server.config.brokerId == leaderId).map(server => 
server.startup())
     waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, 
newLeaderOpt = Some(leaderId))
 
-    produceMessage(servers, topic, "third")
+    produceMessage(servers, topic, null, "third")
     waitUntilMetadataIsPropagated(servers, topic, partitionId)
     servers.filter(server => server.config.brokerId == leaderId).map(server => 
shutdownServer(server))
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cbef33f3/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index 00cda21..fc49d8c 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -121,14 +121,18 @@ class AbstractFetcherThreadTest {
     override protected def fetch(fetchRequest: DummyFetchRequest): 
Seq[(TopicPartition, TestPartitionData)] =
       fetchRequest.offsets.mapValues(_ => new TestPartitionData()).toSeq
 
-    override protected def buildFetchRequest(partitionMap: 
collection.Seq[(TopicPartition, PartitionFetchState)]): DummyFetchRequest =
-      new DummyFetchRequest(partitionMap.map { case (k, v) => (k, 
v.fetchOffset) }.toMap)
+    override protected def buildFetchRequest(partitionMap: 
collection.Seq[(TopicPartition, PartitionFetchState)]): 
ResultWithPartitions[DummyFetchRequest] =
+      ResultWithPartitions(new DummyFetchRequest(partitionMap.map { case (k, 
v) => (k, v.fetchOffset) }.toMap), Set())
 
-    override def buildLeaderEpochRequest(allPartitions: Seq[(TopicPartition, 
PartitionFetchState)]): Map[TopicPartition, Int] = { Map() }
+    override def buildLeaderEpochRequest(allPartitions: Seq[(TopicPartition, 
PartitionFetchState)]): ResultWithPartitions[Map[TopicPartition, Int]] = {
+      ResultWithPartitions(Map(), Set())
+    }
 
     override def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): 
Map[TopicPartition, EpochEndOffset] = { Map() }
 
-    override def maybeTruncate(fetchedEpochs: Map[TopicPartition, 
EpochEndOffset]): Map[TopicPartition, Long] = { Map() }
+    override def maybeTruncate(fetchedEpochs: Map[TopicPartition, 
EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, Long]] = {
+      ResultWithPartitions(Map(), Set())
+    }
   }
 
 
@@ -201,14 +205,14 @@ class AbstractFetcherThreadTest {
       }
     }
 
-    override protected def buildFetchRequest(partitionMap: 
collection.Seq[(TopicPartition, PartitionFetchState)]): DummyFetchRequest = {
+    override protected def buildFetchRequest(partitionMap: 
collection.Seq[(TopicPartition, PartitionFetchState)]): 
ResultWithPartitions[DummyFetchRequest] = {
       val requestMap = new mutable.HashMap[TopicPartition, Long]
       partitionMap.foreach { case (topicPartition, partitionFetchState) =>
         // Add backoff delay check
         if (partitionFetchState.isReadyForFetch)
           requestMap.put(topicPartition, partitionFetchState.fetchOffset)
       }
-      new DummyFetchRequest(requestMap)
+      ResultWithPartitions(new DummyFetchRequest(requestMap), Set())
     }
 
     override def handlePartitionsWithErrors(partitions: 
Iterable[TopicPartition]) = delayPartitions(partitions, fetchBackOffMs.toLong)

http://git-wip-us.apache.org/repos/asf/kafka/blob/cbef33f3/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index 231b180..90d0346 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -122,7 +122,7 @@ class ReplicaFetcherThreadTest {
 
 
     //Expectations
-    expect(logManager.truncateTo(anyObject())).once
+    expect(logManager.truncateTo(anyObject())).times(2)
 
     replay(leaderEpochs, replicaManager, logManager, quota, replica)
 
@@ -171,7 +171,7 @@ class ReplicaFetcherThreadTest {
     val initialLEO = 200
 
     //Stubs
-    expect(logManager.truncateTo(capture(truncateToCapture))).once
+    expect(logManager.truncateTo(capture(truncateToCapture))).times(2)
     expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
     expect(replica.logEndOffset).andReturn(new 
LogOffsetMetadata(initialLEO)).anyTimes()
     expect(leaderEpochs.latestEpoch).andReturn(5).anyTimes()
@@ -194,8 +194,9 @@ class ReplicaFetcherThreadTest {
     thread.doWork()
 
     //We should have truncated to the offsets in the response
-    assertEquals(156, truncateToCapture.getValue.get(t1p0).get)
-    assertEquals(172, truncateToCapture.getValue.get(t2p1).get)
+    val truncationPoints = 
truncateToCapture.getValues.asScala.flatMap(_.toSeq).toMap
+    assertEquals(156, truncationPoints(t1p0))
+    assertEquals(172, truncationPoints(t2p1))
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/cbef33f3/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 902d1c3..abf0540 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1110,13 +1110,13 @@ object TestUtils extends Logging {
     values
   }
 
-  def produceMessage(servers: Seq[KafkaServer], topic: String, message: 
String) {
+  def produceMessage(servers: Seq[KafkaServer], topic: String, partition: 
Integer, message: String) {
     val producer = createNewProducer(
       TestUtils.getBrokerListStrFromServers(servers),
       retries = 5,
       requestTimeoutMs = 2000
     )
-    producer.send(new ProducerRecord(topic, topic.getBytes, 
message.getBytes)).get
+    producer.send(new ProducerRecord(topic, partition, topic.getBytes, 
message.getBytes)).get
     producer.close()
   }
 

Reply via email to