Repository: kafka Updated Branches: refs/heads/trunk 05357b703 -> cbef33f3d
KAFKA-5864; ReplicaFetcherThread should not die due to replica in offline log directory Author: Dong Lin <lindon...@gmail.com> Reviewers: Jun Rao <jun...@gmail.com>, Ismael Juma <ism...@juma.me.uk> Closes #3820 from lindong28/KAFKA-5864 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cbef33f3 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cbef33f3 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cbef33f3 Branch: refs/heads/trunk Commit: cbef33f3d0bc686e1258352cd5e13143b3f09d78 Parents: 05357b7 Author: Dong Lin <lindon...@gmail.com> Authored: Tue Oct 3 17:11:21 2017 -0700 Committer: Jun Rao <jun...@gmail.com> Committed: Tue Oct 3 17:11:21 2017 -0700 ---------------------------------------------------------------------- .../kafka/consumer/ConsumerFetcherThread.scala | 18 +++-- core/src/main/scala/kafka/log/Log.scala | 35 +++++---- core/src/main/scala/kafka/log/LogManager.scala | 8 +- .../kafka/server/AbstractFetcherManager.scala | 2 +- .../kafka/server/AbstractFetcherThread.scala | 41 +++++----- .../kafka/server/ReplicaFetcherThread.scala | 82 ++++++++++++-------- .../scala/kafka/server/ReplicaManager.scala | 5 ++ .../scala/kafka/utils/ShutdownableThread.scala | 2 +- .../kafka/api/LogDirFailureTest.scala | 54 ++++++++++--- .../integration/UncleanLeaderElectionTest.scala | 14 ++-- .../server/AbstractFetcherThreadTest.scala | 16 ++-- .../kafka/server/ReplicaFetcherThreadTest.scala | 9 ++- .../test/scala/unit/kafka/utils/TestUtils.scala | 4 +- 13 files changed, 181 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/cbef33f3/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala index 4f14570..4c7c227 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala @@ -97,17 +97,19 @@ class ConsumerFetcherThread(name: String, // any logic for partitions whose leader has changed def handlePartitionsWithErrors(partitions: Iterable[TopicPartition]) { - removePartitions(partitions.toSet) - consumerFetcherManager.addPartitionsWithError(partitions) + if (partitions.nonEmpty) { + removePartitions(partitions.toSet) + consumerFetcherManager.addPartitionsWithError(partitions) + } } - protected def buildFetchRequest(partitionMap: collection.Seq[(TopicPartition, PartitionFetchState)]): FetchRequest = { + protected def buildFetchRequest(partitionMap: collection.Seq[(TopicPartition, PartitionFetchState)]): ResultWithPartitions[FetchRequest] = { partitionMap.foreach { case ((topicPartition, partitionFetchState)) => if (partitionFetchState.isReadyForFetch) fetchRequestBuilder.addFetch(topicPartition.topic, topicPartition.partition, partitionFetchState.fetchOffset, fetchSize) } - new FetchRequest(fetchRequestBuilder.build()) + ResultWithPartitions(new FetchRequest(fetchRequestBuilder.build()), Set()) } protected def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)] = @@ -115,11 +117,15 @@ class ConsumerFetcherThread(name: String, new TopicPartition(t, p) -> new PartitionData(value) } - override def buildLeaderEpochRequest(allPartitions: Seq[(TopicPartition, PartitionFetchState)]): Map[TopicPartition, Int] = { Map() } + override def buildLeaderEpochRequest(allPartitions: Seq[(TopicPartition, PartitionFetchState)]): ResultWithPartitions[Map[TopicPartition, Int]] = { + ResultWithPartitions(Map(), Set()) + } override def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = { Map() } - override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): Map[TopicPartition, Long] = { Map() } + override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, Long]] = { + ResultWithPartitions(Map(), Set()) + } } @deprecated("This object has been deprecated and will be removed in a future release. " + http://git-wip-us.apache.org/repos/asf/kafka/blob/cbef33f3/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index e6c774a..37137ec 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1412,28 +1412,31 @@ class Log(@volatile var dir: File, * Truncate this log so that it ends with the greatest offset < targetOffset. * * @param targetOffset The offset to truncate to, an upper bound on all offsets in the log after truncation is complete. + * @return True iff targetOffset < logEndOffset */ - private[log] def truncateTo(targetOffset: Long) { + private[log] def truncateTo(targetOffset: Long): Boolean = { maybeHandleIOException(s"Error while truncating log to offset $targetOffset for $topicPartition in dir ${dir.getParent}") { if (targetOffset < 0) throw new IllegalArgumentException("Cannot truncate to a negative offset (%d).".format(targetOffset)) if (targetOffset >= logEndOffset) { info("Truncating %s to %d has no effect as the largest offset in the log is %d.".format(name, targetOffset, logEndOffset - 1)) - return - } - info("Truncating log %s to offset %d.".format(name, targetOffset)) - lock synchronized { - if (segments.firstEntry.getValue.baseOffset > targetOffset) { - truncateFullyAndStartAt(targetOffset) - } else { - val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset) - deletable.foreach(deleteSegment) - activeSegment.truncateTo(targetOffset) - updateLogEndOffset(targetOffset) - this.recoveryPoint = math.min(targetOffset, this.recoveryPoint) - this.logStartOffset = math.min(targetOffset, this.logStartOffset) - leaderEpochCache.clearAndFlushLatest(targetOffset) - loadProducerState(targetOffset, reloadFromCleanShutdown = false) + false + } else { + info("Truncating log %s to offset %d.".format(name, targetOffset)) + lock synchronized { + if (segments.firstEntry.getValue.baseOffset > targetOffset) { + truncateFullyAndStartAt(targetOffset) + } else { + val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset) + deletable.foreach(deleteSegment) + activeSegment.truncateTo(targetOffset) + updateLogEndOffset(targetOffset) + this.recoveryPoint = math.min(targetOffset, this.recoveryPoint) + this.logStartOffset = math.min(targetOffset, this.logStartOffset) + leaderEpochCache.clearAndFlushLatest(targetOffset) + loadProducerState(targetOffset, reloadFromCleanShutdown = false) + } + true } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/cbef33f3/core/src/main/scala/kafka/log/LogManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 690f52a..f4bd8a2 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -437,6 +437,7 @@ class LogManager(logDirs: Array[File], * @param partitionOffsets Partition logs that need to be truncated */ def truncateTo(partitionOffsets: Map[TopicPartition, Long]) { + var truncated = false for ((topicPartition, truncateOffset) <- partitionOffsets) { val log = logs.get(topicPartition) // If the log does not exist, skip it @@ -446,7 +447,8 @@ class LogManager(logDirs: Array[File], if (needToStopCleaner) cleaner.abortAndPauseCleaning(topicPartition) try { - log.truncateTo(truncateOffset) + if (log.truncateTo(truncateOffset)) + truncated = true if (needToStopCleaner) cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset) } finally { @@ -455,7 +457,9 @@ class LogManager(logDirs: Array[File], } } } - checkpointLogRecoveryOffsets() + + if (truncated) + checkpointLogRecoveryOffsets() } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/cbef33f3/core/src/main/scala/kafka/server/AbstractFetcherManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala index 2b2aa7b..0d7806c 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala @@ -30,7 +30,7 @@ import org.apache.kafka.common.utils.Utils abstract class AbstractFetcherManager(protected val name: String, clientId: String, numFetchers: Int = 1) extends Logging with KafkaMetricsGroup { // map of (source broker_id, fetcher_id per source broker) => fetcher - private val fetcherThreadMap = new mutable.HashMap[BrokerIdAndFetcherId, AbstractFetcherThread] + val fetcherThreadMap = new mutable.HashMap[BrokerIdAndFetcherId, AbstractFetcherThread] private val mapLock = new Object this.logIdent = "[" + name + "] " http://git-wip-us.apache.org/repos/asf/kafka/blob/cbef33f3/core/src/main/scala/kafka/server/AbstractFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index fd26e11..e772ac3 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -18,6 +18,7 @@ package kafka.server import java.util.concurrent.locks.ReentrantLock + import kafka.cluster.BrokerEndPoint import kafka.utils.{DelayedItem, Pool, ShutdownableThread} import org.apache.kafka.common.errors.KafkaStorageException @@ -27,10 +28,12 @@ import kafka.utils.CoreUtils.inLock import org.apache.kafka.common.errors.CorruptRecordException import org.apache.kafka.common.protocol.Errors import AbstractFetcherThread._ + import scala.collection.{Map, Set, mutable} import scala.collection.JavaConverters._ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicLong + import com.yammer.metrics.core.Gauge import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.internals.{FatalExitError, PartitionStates} @@ -71,13 +74,15 @@ abstract class AbstractFetcherThread(name: String, // deal with partitions with errors, potentially due to leadership changes protected def handlePartitionsWithErrors(partitions: Iterable[TopicPartition]) - protected def buildLeaderEpochRequest(allPartitions: Seq[(TopicPartition, PartitionFetchState)]): Map[TopicPartition, Int] + protected def buildLeaderEpochRequest(allPartitions: Seq[(TopicPartition, PartitionFetchState)]): ResultWithPartitions[Map[TopicPartition, Int]] protected def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] - protected def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): Map[TopicPartition, Long] + protected def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, Long]] - protected def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): REQ + protected def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): ResultWithPartitions[REQ] + + case class ResultWithPartitions[R](result: R, partitionsWithError: Set[TopicPartition]) protected def fetch(fetchRequest: REQ): Seq[(TopicPartition, PD)] @@ -98,11 +103,12 @@ abstract class AbstractFetcherThread(name: String, override def doWork() { maybeTruncate() val fetchRequest = inLock(partitionMapLock) { - val fetchRequest = buildFetchRequest(states) + val ResultWithPartitions(fetchRequest, partitionsWithError) = buildFetchRequest(states) if (fetchRequest.isEmpty) { trace("There are no active partitions. Back off for %d ms before sending a fetch request".format(fetchBackOffMs)) partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) } + handlePartitionsWithErrors(partitionsWithError) fetchRequest } if (!fetchRequest.isEmpty) @@ -119,7 +125,8 @@ abstract class AbstractFetcherThread(name: String, * occur during truncation. */ def maybeTruncate(): Unit = { - val epochRequests = inLock(partitionMapLock) { buildLeaderEpochRequest(states) } + val ResultWithPartitions(epochRequests, partitionsWithError) = inLock(partitionMapLock) { buildLeaderEpochRequest(states) } + handlePartitionsWithErrors(partitionsWithError) if (epochRequests.nonEmpty) { val fetchedEpochs = fetchEpochsFromLeader(epochRequests) @@ -127,7 +134,8 @@ abstract class AbstractFetcherThread(name: String, inLock(partitionMapLock) { //Check no leadership changes happened whilst we were unlocked, fetching epochs val leaderEpochs = fetchedEpochs.filter { case (tp, _) => partitionStates.contains(tp) } - val truncationPoints = maybeTruncate(leaderEpochs) + val ResultWithPartitions(truncationPoints, partitionsWithError) = maybeTruncate(leaderEpochs) + handlePartitionsWithErrors(partitionsWithError) markTruncationComplete(truncationPoints) } } @@ -136,11 +144,6 @@ abstract class AbstractFetcherThread(name: String, private def processFetchRequest(fetchRequest: REQ) { val partitionsWithError = mutable.Set[TopicPartition]() - def updatePartitionsWithError(partition: TopicPartition): Unit = { - partitionsWithError += partition - partitionStates.moveToEnd(partition) - } - var responseData: Seq[(TopicPartition, PD)] = Seq.empty try { @@ -151,7 +154,7 @@ abstract class AbstractFetcherThread(name: String, if (isRunning.get) { warn(s"Error in fetch to broker ${sourceBroker.id}, request $fetchRequest", t) inLock(partitionMapLock) { - partitionStates.partitionSet.asScala.foreach(updatePartitionsWithError) + partitionsWithError ++= partitionStates.partitionSet.asScala // there is an error occurred while fetching partitions, sleep a while // note that `ReplicaFetcherThread.handlePartitionsWithError` will also introduce the same delay for every // partition with error effectively doubling the delay. It would be good to improve this. @@ -195,10 +198,10 @@ abstract class AbstractFetcherThread(name: String, // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and // should get fixed in the subsequent fetches logger.error(s"Found invalid messages during fetch for partition $topicPartition offset ${currentPartitionFetchState.fetchOffset} error ${ime.getMessage}") - updatePartitionsWithError(topicPartition) + partitionsWithError += topicPartition case e: KafkaStorageException => logger.error(s"Error while processing data for partition $topicPartition", e) - updatePartitionsWithError(topicPartition) + partitionsWithError += topicPartition case e: Throwable => throw new KafkaException(s"Error processing data for partition $topicPartition " + s"offset ${currentPartitionFetchState.fetchOffset}", e) @@ -212,12 +215,12 @@ abstract class AbstractFetcherThread(name: String, case e: FatalExitError => throw e case e: Throwable => error(s"Error getting offset for partition $topicPartition to broker ${sourceBroker.id}", e) - updatePartitionsWithError(topicPartition) + partitionsWithError += topicPartition } case _ => if (isRunning.get) { error(s"Error for partition $topicPartition to broker %${sourceBroker.id}:${partitionData.exception.get}") - updatePartitionsWithError(topicPartition) + partitionsWithError += topicPartition } } }) @@ -225,10 +228,9 @@ abstract class AbstractFetcherThread(name: String, } } - if (partitionsWithError.nonEmpty) { + if (partitionsWithError.nonEmpty) debug("handling partitions with error for %s".format(partitionsWithError)) - handlePartitionsWithErrors(partitionsWithError) - } + handlePartitionsWithErrors(partitionsWithError) } def addPartitions(partitionAndOffsets: Map[TopicPartition, Long]) { @@ -253,6 +255,7 @@ abstract class AbstractFetcherThread(name: String, /** * Loop through all partitions, marking them as truncation complete and applying the correct offset + * * @param partitions the partitions to mark truncation complete */ private def markTruncationComplete(partitions: Map[TopicPartition, Long]) { http://git-wip-us.apache.org/repos/asf/kafka/blob/cbef33f3/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index cf652d6..b90e9e8 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -18,6 +18,7 @@ package kafka.server import java.util + import kafka.admin.AdminUtils import kafka.api.{FetchRequest => _, _} import kafka.cluster.{BrokerEndPoint, Replica} @@ -26,6 +27,7 @@ import kafka.server.ReplicaFetcherThread._ import kafka.server.epoch.LeaderEpochCache import org.apache.kafka.common.requests.EpochEndOffset._ import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.KafkaStorageException import org.apache.kafka.common.internals.FatalExitError import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors @@ -74,7 +76,7 @@ class ReplicaFetcherThread(name: String, private val fetchSize = brokerConfig.replicaFetchMaxBytes private val shouldSendLeaderEpochRequest: Boolean = brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV2 - private def epochCache(tp: TopicPartition): LeaderEpochCache = replicaMgr.getReplica(tp).get.epochs.get + private def epochCacheOpt(tp: TopicPartition): Option[LeaderEpochCache] = replicaMgr.getReplica(tp).map(_.epochs.get) override def shutdown(): Unit = { super.shutdown() @@ -83,7 +85,7 @@ class ReplicaFetcherThread(name: String, // process fetched data def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PartitionData) { - val replica = replicaMgr.getReplica(topicPartition).get + val replica = replicaMgr.getReplicaOrException(topicPartition) val records = partitionData.toRecords maybeWarnIfOversizedRecords(records, topicPartition) @@ -128,7 +130,7 @@ class ReplicaFetcherThread(name: String, * Handle a partition whose offset is out of range and return a new fetch offset. */ def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = { - val replica = replicaMgr.getReplica(topicPartition).get + val replica = replicaMgr.getReplicaOrException(topicPartition) /** * Unclean leader election: A follower goes down, in the meanwhile the leader keeps appending messages. The follower comes back up @@ -194,7 +196,8 @@ class ReplicaFetcherThread(name: String, // any logic for partitions whose leader has changed def handlePartitionsWithErrors(partitions: Iterable[TopicPartition]) { - delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs.toLong) + if (partitions.nonEmpty) + delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs.toLong) } protected def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)] = { @@ -226,20 +229,26 @@ class ReplicaFetcherThread(name: String, } } - override def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): FetchRequest = { + override def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): ResultWithPartitions[FetchRequest] = { val requestMap = new util.LinkedHashMap[TopicPartition, JFetchRequest.PartitionData] + val partitionsWithError = mutable.Set[TopicPartition]() partitionMap.foreach { case (topicPartition, partitionFetchState) => // We will not include a replica in the fetch request if it should be throttled. if (partitionFetchState.isReadyForFetch && !shouldFollowerThrottle(quota, topicPartition)) { - val logStartOffset = replicaMgr.getReplicaOrException(topicPartition).logStartOffset - requestMap.put(topicPartition, new JFetchRequest.PartitionData(partitionFetchState.fetchOffset, logStartOffset, fetchSize)) + try { + val logStartOffset = replicaMgr.getReplicaOrException(topicPartition).logStartOffset + requestMap.put(topicPartition, new JFetchRequest.PartitionData(partitionFetchState.fetchOffset, logStartOffset, fetchSize)) + } catch { + case e: KafkaStorageException => + partitionsWithError += topicPartition + } } } val requestBuilder = JFetchRequest.Builder.forReplica(fetchRequestVersion, replicaId, maxWait, minBytes, requestMap) .setMaxBytes(maxBytes) - new FetchRequest(requestBuilder) + ResultWithPartitions(new FetchRequest(requestBuilder), partitionsWithError) } /** @@ -248,44 +257,49 @@ class ReplicaFetcherThread(name: String, * otherwise we truncate to the leaders offset. * - If the leader replied with undefined epoch offset we must use the high watermark */ - override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): Map[TopicPartition, Long] = { + override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, Long]] = { val truncationPoints = scala.collection.mutable.HashMap.empty[TopicPartition, Long] val partitionsWithError = mutable.Set[TopicPartition]() fetchedEpochs.foreach { case (tp, epochOffset) => - val replica = replicaMgr.getReplica(tp).get - - if (epochOffset.hasError) { - info(s"Retrying leaderEpoch request for partition ${replica.topicPartition} as the leader reported an error: ${epochOffset.error}") - partitionsWithError += tp - } else { - val truncationOffset = - if (epochOffset.endOffset == UNDEFINED_EPOCH_OFFSET) - highWatermark(replica, epochOffset) - else if (epochOffset.endOffset >= replica.logEndOffset.messageOffset) - logEndOffset(replica, epochOffset) - else - epochOffset.endOffset - - truncationPoints.put(tp, truncationOffset) + try { + val replica = replicaMgr.getReplicaOrException(tp) + + if (epochOffset.hasError) { + info(s"Retrying leaderEpoch request for partition ${replica.topicPartition} as the leader reported an error: ${epochOffset.error}") + partitionsWithError += tp + } else { + val truncationOffset = + if (epochOffset.endOffset == UNDEFINED_EPOCH_OFFSET) + highWatermark(replica, epochOffset) + else if (epochOffset.endOffset >= replica.logEndOffset.messageOffset) + logEndOffset(replica, epochOffset) + else + epochOffset.endOffset + + replicaMgr.logManager.truncateTo(Map(tp -> truncationOffset)) + truncationPoints.put(tp, truncationOffset) + } + } catch { + case e: KafkaStorageException => + info(s"Failed to truncate $tp", e) + partitionsWithError += tp } } - replicaMgr.logManager.truncateTo(truncationPoints) - // For partitions that encountered an error, delay them a bit before retrying the leader epoch request - delayPartitions(partitionsWithError, brokerConfig.replicaFetchBackoffMs.toLong) - - truncationPoints + ResultWithPartitions(truncationPoints, partitionsWithError) } - override def buildLeaderEpochRequest(allPartitions: Seq[(TopicPartition, PartitionFetchState)]): Map[TopicPartition, Int] = { - val result = allPartitions + override def buildLeaderEpochRequest(allPartitions: Seq[(TopicPartition, PartitionFetchState)]): ResultWithPartitions[Map[TopicPartition, Int]] = { + val partitionEpochOpts = allPartitions .filter { case (_, state) => state.isTruncatingLog } - .map { case (tp, _) => tp -> epochCache(tp).latestEpoch }.toMap + .map { case (tp, _) => tp -> epochCacheOpt(tp) }.toMap - debug(s"Build leaderEpoch request $result") + val (partitionsWithEpoch, partitionsWithoutEpoch) = partitionEpochOpts.partition { case (tp, epochCacheOpt) => epochCacheOpt.nonEmpty } - result + debug(s"Build leaderEpoch request $partitionsWithEpoch") + val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> epochCacheOpt.get.latestEpoch() } + ResultWithPartitions(result, partitionsWithoutEpoch.keys.toSet) } override def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = { http://git-wip-us.apache.org/repos/asf/kafka/blob/cbef33f3/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index a361e16..3a4ecef 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1349,6 +1349,11 @@ class ReplicaManager(val config: KafkaConfig, } } + // Used only by test + def markPartitionOffline(tp: TopicPartition) { + allPartitions.put(tp, ReplicaManager.OfflinePartition) + } + // logDir should be an absolute path def handleLogDirFailure(dir: String) { if (!logManager.isLogDirOnline(dir)) http://git-wip-us.apache.org/repos/asf/kafka/blob/cbef33f3/core/src/main/scala/kafka/utils/ShutdownableThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ShutdownableThread.scala b/core/src/main/scala/kafka/utils/ShutdownableThread.scala index 59ebc54..6ed0968 100644 --- a/core/src/main/scala/kafka/utils/ShutdownableThread.scala +++ b/core/src/main/scala/kafka/utils/ShutdownableThread.scala @@ -27,7 +27,7 @@ abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean this.setDaemon(false) this.logIdent = "[" + name + "]: " val isRunning: AtomicBoolean = new AtomicBoolean(true) - private val shutdownLatch = new CountDownLatch(1) + val shutdownLatch = new CountDownLatch(1) def shutdown(): Unit = { initiateShutdown() http://git-wip-us.apache.org/repos/asf/kafka/blob/cbef33f3/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala b/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala index 6749a57..b1ac47b 100644 --- a/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala +++ b/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala @@ -29,6 +29,9 @@ import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.errors.{KafkaStorageException, NotLeaderForPartitionException} import org.junit.{Before, Test} import org.junit.Assert.assertTrue +import org.junit.Assert.assertEquals + +import scala.collection.JavaConverters._ /** * Test whether clients can producer and consume when there is log directory failure @@ -41,37 +44,66 @@ class LogDirFailureTest extends IntegrationTestHarness { val consumerCount: Int = 1 val serverCount: Int = 2 private val topic = "topic" + private val partitionNum = 12 - this.logDirCount = 2 + this.logDirCount = 3 this.producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "0") this.producerConfig.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100") this.serverConfig.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp, "60000") - + this.serverConfig.setProperty(KafkaConfig.NumReplicaFetchersProp, "1") @Before override def setUp() { super.setUp() - TestUtils.createTopic(zkUtils, topic, 1, 2, servers = servers) + TestUtils.createTopic(zkUtils, topic, partitionNum, serverCount, servers = servers) } @Test def testIOExceptionDuringLogRoll() { - testProduceAfterLogDirFailure(Roll) + testProduceAfterLogDirFailureOnLeader(Roll) } @Test def testIOExceptionDuringCheckpoint() { - testProduceAfterLogDirFailure(Checkpoint) + testProduceAfterLogDirFailureOnLeader(Checkpoint) + } + + @Test + def testReplicaFetcherThreadAfterLogDirFailureOnFollower() { + val producer = producers.head + val partition = new TopicPartition(topic, 0) + + val partitionInfo = producer.partitionsFor(topic).asScala.find(_.partition() == 0).get + val leaderServerId = partitionInfo.leader().id() + val leaderServer = servers.find(_.config.brokerId == leaderServerId).get + val followerServerId = partitionInfo.replicas().map(_.id()).find(_ != leaderServerId).get + val followerServer = servers.find(_.config.brokerId == followerServerId).get + + followerServer.replicaManager.markPartitionOffline(partition) + // Send a message to another partition whose leader is the same as partition 0 + // so that ReplicaFetcherThread on the follower will get response from leader immediately + val anotherPartitionWithTheSameLeader = (1 until partitionNum).find { i => + leaderServer.replicaManager.getPartition(new TopicPartition(topic, i)).flatMap(_.leaderReplicaIfLocal).isDefined + }.get + val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, anotherPartitionWithTheSameLeader, topic.getBytes, "message".getBytes) + // When producer.send(...).get returns, it is guaranteed that ReplicaFetcherThread on the follower + // has fetched from the leader and attempts to append to the offline replica. + producer.send(record).get + + assertEquals(serverCount, leaderServer.replicaManager.getPartition(new TopicPartition(topic, anotherPartitionWithTheSameLeader)).get.inSyncReplicas.size) + followerServer.replicaManager.replicaFetcherManager.fetcherThreadMap.values.foreach { thread => + assertTrue("ReplicaFetcherThread should still be working if its partition count > 0", thread.shutdownLatch.getCount > 0) + } } - def testProduceAfterLogDirFailure(failureType: LogDirFailureType) { + def testProduceAfterLogDirFailureOnLeader(failureType: LogDirFailureType) { val consumer = consumers.head subscribeAndWaitForAssignment(topic, consumer) val producer = producers.head val partition = new TopicPartition(topic, 0) val record = new ProducerRecord(topic, 0, s"key".getBytes, s"value".getBytes) - val leaderServerId = producer.partitionsFor(topic).get(0).leader().id() + val leaderServerId = producer.partitionsFor(topic).asScala.find(_.partition() == 0).get.leader().id() val leaderServer = servers.find(_.config.brokerId == leaderServerId).get // The first send() should succeed @@ -81,8 +113,8 @@ class LogDirFailureTest extends IntegrationTestHarness { }, "Expected the first message", 3000L) // Make log directory of the partition on the leader broker inaccessible by replacing it with a file - val replica = leaderServer.replicaManager.getReplica(partition) - val logDir = replica.get.log.get.dir.getParentFile + val replica = leaderServer.replicaManager.getReplicaOrException(partition) + val logDir = replica.log.get.dir.getParentFile CoreUtils.swallow(Utils.delete(logDir)) logDir.createNewFile() assertTrue(logDir.isFile) @@ -99,7 +131,7 @@ class LogDirFailureTest extends IntegrationTestHarness { } // Wait for ReplicaHighWatermarkCheckpoint to happen so that the log directory of the topic will be offline - TestUtils.waitUntilTrue(() => !leaderServer.logManager.liveLogDirs.contains(logDir), "Expected log directory offline", 3000L) + TestUtils.waitUntilTrue(() => !leaderServer.logManager.isLogDirOnline(logDir.getAbsolutePath), "Expected log directory offline", 3000L) assertTrue(leaderServer.replicaManager.getReplica(partition).isEmpty) // The second send() should fail due to either KafkaStorageException or NotLeaderForPartitionException @@ -120,7 +152,7 @@ class LogDirFailureTest extends IntegrationTestHarness { TestUtils.waitUntilTrue(() => { // ProduceResponse may contain KafkaStorageException and trigger metadata update producer.send(record) - producer.partitionsFor(topic).get(0).leader().id() != leaderServerId + producer.partitionsFor(topic).asScala.find(_.partition() == 0).get.leader().id() != leaderServerId }, "Expected new leader for the partition", 6000L) // Consumer should receive some messages http://git-wip-us.apache.org/repos/asf/kafka/blob/cbef33f3/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index 24421d0..0af0a04 100755 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -182,14 +182,14 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1 debug("Follower for " + topic + " is: %s".format(followerId)) - produceMessage(servers, topic, "first") + produceMessage(servers, topic, null, "first") waitUntilMetadataIsPropagated(servers, topic, partitionId) assertEquals(List("first"), consumeAllMessages(topic)) // shutdown follower server servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server)) - produceMessage(servers, topic, "second") + produceMessage(servers, topic, null, "second") assertEquals(List("first", "second"), consumeAllMessages(topic)) // shutdown leader and then restart follower @@ -199,7 +199,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { // wait until new leader is (uncleanly) elected waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, newLeaderOpt = Some(followerId)) - produceMessage(servers, topic, "third") + produceMessage(servers, topic, null, "third") // second message was lost due to unclean election assertEquals(List("first", "third"), consumeAllMessages(topic)) @@ -215,14 +215,14 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1 debug("Follower for " + topic + " is: %s".format(followerId)) - produceMessage(servers, topic, "first") + produceMessage(servers, topic, null, "first") waitUntilMetadataIsPropagated(servers, topic, partitionId) assertEquals(List("first"), consumeAllMessages(topic)) // shutdown follower server servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server)) - produceMessage(servers, topic, "second") + produceMessage(servers, topic, null, "second") assertEquals(List("first", "second"), consumeAllMessages(topic)) // shutdown leader and then restart follower @@ -234,7 +234,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { // message production and consumption should both fail while leader is down try { - produceMessage(servers, topic, "third") + produceMessage(servers, topic, null, "third") fail("Message produced while leader is down should fail, but it succeeded") } catch { case e: ExecutionException if e.getCause.isInstanceOf[TimeoutException] => // expected @@ -246,7 +246,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { servers.filter(server => server.config.brokerId == leaderId).map(server => server.startup()) waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, newLeaderOpt = Some(leaderId)) - produceMessage(servers, topic, "third") + produceMessage(servers, topic, null, "third") waitUntilMetadataIsPropagated(servers, topic, partitionId) servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server)) http://git-wip-us.apache.org/repos/asf/kafka/blob/cbef33f3/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala index 00cda21..fc49d8c 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala @@ -121,14 +121,18 @@ class AbstractFetcherThreadTest { override protected def fetch(fetchRequest: DummyFetchRequest): Seq[(TopicPartition, TestPartitionData)] = fetchRequest.offsets.mapValues(_ => new TestPartitionData()).toSeq - override protected def buildFetchRequest(partitionMap: collection.Seq[(TopicPartition, PartitionFetchState)]): DummyFetchRequest = - new DummyFetchRequest(partitionMap.map { case (k, v) => (k, v.fetchOffset) }.toMap) + override protected def buildFetchRequest(partitionMap: collection.Seq[(TopicPartition, PartitionFetchState)]): ResultWithPartitions[DummyFetchRequest] = + ResultWithPartitions(new DummyFetchRequest(partitionMap.map { case (k, v) => (k, v.fetchOffset) }.toMap), Set()) - override def buildLeaderEpochRequest(allPartitions: Seq[(TopicPartition, PartitionFetchState)]): Map[TopicPartition, Int] = { Map() } + override def buildLeaderEpochRequest(allPartitions: Seq[(TopicPartition, PartitionFetchState)]): ResultWithPartitions[Map[TopicPartition, Int]] = { + ResultWithPartitions(Map(), Set()) + } override def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = { Map() } - override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): Map[TopicPartition, Long] = { Map() } + override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, Long]] = { + ResultWithPartitions(Map(), Set()) + } } @@ -201,14 +205,14 @@ class AbstractFetcherThreadTest { } } - override protected def buildFetchRequest(partitionMap: collection.Seq[(TopicPartition, PartitionFetchState)]): DummyFetchRequest = { + override protected def buildFetchRequest(partitionMap: collection.Seq[(TopicPartition, PartitionFetchState)]): ResultWithPartitions[DummyFetchRequest] = { val requestMap = new mutable.HashMap[TopicPartition, Long] partitionMap.foreach { case (topicPartition, partitionFetchState) => // Add backoff delay check if (partitionFetchState.isReadyForFetch) requestMap.put(topicPartition, partitionFetchState.fetchOffset) } - new DummyFetchRequest(requestMap) + ResultWithPartitions(new DummyFetchRequest(requestMap), Set()) } override def handlePartitionsWithErrors(partitions: Iterable[TopicPartition]) = delayPartitions(partitions, fetchBackOffMs.toLong) http://git-wip-us.apache.org/repos/asf/kafka/blob/cbef33f3/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala index 231b180..90d0346 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala @@ -122,7 +122,7 @@ class ReplicaFetcherThreadTest { //Expectations - expect(logManager.truncateTo(anyObject())).once + expect(logManager.truncateTo(anyObject())).times(2) replay(leaderEpochs, replicaManager, logManager, quota, replica) @@ -171,7 +171,7 @@ class ReplicaFetcherThreadTest { val initialLEO = 200 //Stubs - expect(logManager.truncateTo(capture(truncateToCapture))).once + expect(logManager.truncateTo(capture(truncateToCapture))).times(2) expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes() expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLEO)).anyTimes() expect(leaderEpochs.latestEpoch).andReturn(5).anyTimes() @@ -194,8 +194,9 @@ class ReplicaFetcherThreadTest { thread.doWork() //We should have truncated to the offsets in the response - assertEquals(156, truncateToCapture.getValue.get(t1p0).get) - assertEquals(172, truncateToCapture.getValue.get(t2p1).get) + val truncationPoints = truncateToCapture.getValues.asScala.flatMap(_.toSeq).toMap + assertEquals(156, truncationPoints(t1p0)) + assertEquals(172, truncationPoints(t2p1)) } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/cbef33f3/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 902d1c3..abf0540 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -1110,13 +1110,13 @@ object TestUtils extends Logging { values } - def produceMessage(servers: Seq[KafkaServer], topic: String, message: String) { + def produceMessage(servers: Seq[KafkaServer], topic: String, partition: Integer, message: String) { val producer = createNewProducer( TestUtils.getBrokerListStrFromServers(servers), retries = 5, requestTimeoutMs = 2000 ) - producer.send(new ProducerRecord(topic, topic.getBytes, message.getBytes)).get + producer.send(new ProducerRecord(topic, partition, topic.getBytes, message.getBytes)).get producer.close() }