This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6b6a6b9 KAFKA-8526; Fallback to other log dirs after getOrCreateLog
failure (#6969)
6b6a6b9 is described below
commit 6b6a6b930fda853fd91dfbe85b4462e17654f804
Author: Igor Soarez <[email protected]>
AuthorDate: Tue Jul 23 16:57:38 2019 +0100
KAFKA-8526; Fallback to other log dirs after getOrCreateLog failure (#6969)
LogManager#getOrCreateLog() selects a log dir for the new replica from
_liveLogDirs, if disk failure is discovered at this point, before
LogDirFailureHandler finds out, try using other log dirs before failing
the operation.
Reviewers: Anna Povzner <[email protected]>, Jason Gustafson
<[email protected]>
---
core/src/main/scala/kafka/log/LogManager.scala | 109 ++++++++++++---------
.../test/scala/unit/kafka/log/LogManagerTest.scala | 50 +++++++++-
2 files changed, 112 insertions(+), 47 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogManager.scala
b/core/src/main/scala/kafka/log/LogManager.scala
index 320f346..6c724c3 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -34,6 +34,7 @@ import org.apache.kafka.common.errors.{KafkaStorageException,
LogDirNotFoundExce
import scala.collection.JavaConverters._
import scala.collection._
import scala.collection.mutable.ArrayBuffer
+import scala.util.{Failure, Success, Try}
/**
* The entry point to the kafka log management subsystem. The log manager is
responsible for log creation, retrieval, and cleaning.
@@ -676,7 +677,7 @@ class LogManager(logDirs: Seq[File],
if (!isNew && offlineLogDirs.nonEmpty)
throw new KafkaStorageException(s"Can not create log for
$topicPartition because log directories ${offlineLogDirs.mkString(",")} are
offline")
- val logDir = {
+ val logDirs: List[File] = {
val preferredLogDir = preferredLogDirs.get(topicPartition)
if (isFuture) {
@@ -687,55 +688,70 @@ class LogManager(logDirs: Seq[File],
}
if (preferredLogDir != null)
- preferredLogDir
+ List(new File(preferredLogDir))
else
- nextLogDir().getAbsolutePath
+ nextLogDirs()
}
- if (!isLogDirOnline(logDir))
- throw new KafkaStorageException(s"Can not create log for
$topicPartition because log directory $logDir is offline")
-
- try {
- val dir = {
- if (isFuture)
- new File(logDir, Log.logFutureDirName(topicPartition))
- else
- new File(logDir, Log.logDirName(topicPartition))
- }
- Files.createDirectories(dir.toPath)
-
- val log = Log(
- dir = dir,
- config = config,
- logStartOffset = 0L,
- recoveryPoint = 0L,
- maxProducerIdExpirationMs = maxPidExpirationMs,
- producerIdExpirationCheckIntervalMs =
LogManager.ProducerIdExpirationCheckIntervalMs,
- scheduler = scheduler,
- time = time,
- brokerTopicStats = brokerTopicStats,
- logDirFailureChannel = logDirFailureChannel)
+ val logDirName = {
if (isFuture)
- futureLogs.put(topicPartition, log)
+ Log.logFutureDirName(topicPartition)
else
- currentLogs.put(topicPartition, log)
+ Log.logDirName(topicPartition)
+ }
- info(s"Created log for partition $topicPartition in $logDir with
properties " +
- s"{${config.originals.asScala.mkString(", ")}}.")
- // Remove the preferred log dir since it has already been satisfied
- preferredLogDirs.remove(topicPartition)
+ val logDir = logDirs
+ .toStream // to prevent actually mapping the whole list, lazy map
+ .map(createLogDirectory(_, logDirName))
+ .find(_.isSuccess)
+ .getOrElse(Failure(new KafkaStorageException("No log directories
available. Tried " + logDirs.map(_.getAbsolutePath).mkString(", "))))
+ .get // If Failure, will throw
+
+ val log = Log(
+ dir = logDir,
+ config = config,
+ logStartOffset = 0L,
+ recoveryPoint = 0L,
+ maxProducerIdExpirationMs = maxPidExpirationMs,
+ producerIdExpirationCheckIntervalMs =
LogManager.ProducerIdExpirationCheckIntervalMs,
+ scheduler = scheduler,
+ time = time,
+ brokerTopicStats = brokerTopicStats,
+ logDirFailureChannel = logDirFailureChannel)
- log
- } catch {
- case e: IOException =>
- val msg = s"Error while creating log for $topicPartition in dir
$logDir"
- logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
- throw new KafkaStorageException(msg, e)
- }
+ if (isFuture)
+ futureLogs.put(topicPartition, log)
+ else
+ currentLogs.put(topicPartition, log)
+
+ info(s"Created log for partition $topicPartition in $logDir with
properties " + s"{${config.originals.asScala.mkString(", ")}}.")
+ // Remove the preferred log dir since it has already been satisfied
+ preferredLogDirs.remove(topicPartition)
+
+ log
}
}
}
+ private[log] def createLogDirectory(logDir: File, logDirName: String):
Try[File] = {
+ val logDirPath = logDir.getAbsolutePath
+ if (isLogDirOnline(logDirPath)) {
+ val dir = new File(logDirPath, logDirName)
+ try {
+ Files.createDirectories(dir.toPath)
+ Success(dir)
+ } catch {
+ case e: IOException =>
+ val msg = s"Error while creating log for $logDirName in dir
$logDirPath"
+ logDirFailureChannel.maybeAddOfflineLogDir(logDirPath, msg, e)
+ warn(msg, e)
+ Failure(new KafkaStorageException(msg, e))
+ }
+ } else {
+ Failure(new KafkaStorageException(s"Can not create log $logDirName
because log directory $logDirPath is offline"))
+ }
+ }
+
/**
* Delete logs marked for deletion. Delete all logs for which
`currentDefaultConfig.fileDeleteDelayMs`
* has elapsed after the delete was scheduled. Logs for which this interval
has not yet elapsed will be
@@ -869,13 +885,13 @@ class LogManager(logDirs: Seq[File],
}
/**
- * Choose the next directory in which to create a log. Currently this is done
- * by calculating the number of partitions in each directory and then
choosing the
- * data directory with the fewest partitions.
+ * Provides the full ordered list of suggested directories for the next
partition.
+ * Currently this is done by calculating the number of partitions in each
directory and then sorting the
+ * data directories by fewest partitions.
*/
- private def nextLogDir(): File = {
+ private def nextLogDirs(): List[File] = {
if(_liveLogDirs.size == 1) {
- _liveLogDirs.peek()
+ List(_liveLogDirs.peek())
} else {
// count the number of logs in each parent directory (including 0 for
empty directories
val logCounts = allLogs.groupBy(_.dir.getParent).mapValues(_.size)
@@ -883,8 +899,9 @@ class LogManager(logDirs: Seq[File],
val dirCounts = (zeros ++ logCounts).toBuffer
// choose the directory with the least logs in it
- val leastLoaded = dirCounts.sortBy(_._2).head
- new File(leastLoaded._1)
+ dirCounts.sortBy(_._2).map {
+ case (path: String, _: Int) => new File(path)
+ }.toList
}
}
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 1e6d2dc..bfbd423 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -23,11 +23,18 @@ import java.util.{Collections, Properties}
import kafka.server.FetchDataInfo
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils._
-import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.errors.OffsetOutOfRangeException
import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.junit.Assert._
import org.junit.{After, Before, Test}
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito.{doAnswer, spy}
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+
+import scala.collection.mutable
+import scala.util.{Failure, Try}
class LogManagerTest {
@@ -95,6 +102,47 @@ class LogManagerTest {
log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()),
leaderEpoch = 0)
}
+ @Test
+ def testCreateLogWithLogDirFallback() {
+ // Configure a number of directories one level deeper in logDir,
+ // so they all get cleaned up in tearDown().
+ val dirs = (0 to 4)
+ .map(_.toString)
+ .map(logDir.toPath.resolve(_).toFile)
+
+ // Create a new LogManager with the configured directories and an
overridden createLogDirectory.
+ logManager.shutdown()
+ logManager = spy(createLogManager(dirs))
+ val brokenDirs = mutable.Set[File]()
+ doAnswer(new Answer[Try[File]] {
+ override def answer(invocation: InvocationOnMock): Try[File] = {
+ // The first half of directories tried will fail, the rest goes
through.
+ val logDir = invocation.getArgument[File](0)
+ if (brokenDirs.contains(logDir) || brokenDirs.size < dirs.length / 2) {
+ brokenDirs.add(logDir)
+ Failure(new Throwable("broken dir"))
+ } else {
+ invocation.callRealMethod().asInstanceOf[Try[File]]
+ }
+ }
+ }).when(logManager).createLogDirectory(any(), any())
+ logManager.startup()
+
+ // Request creating a new log.
+ // LogManager should try using all configured log directories until one
succeeds.
+ logManager.getOrCreateLog(new TopicPartition(name, 0), logConfig, isNew =
true)
+
+ // Verify that half the directories were considered broken,
+ assertEquals(dirs.length / 2, brokenDirs.size)
+
+ // and that exactly one log file was created,
+ val containsLogFile: File => Boolean = dir => new File(dir, name +
"-0").exists()
+ assertEquals("More than one log file created", 1,
dirs.count(containsLogFile))
+
+ // and that it wasn't created in one of the broken directories.
+ assertFalse(brokenDirs.exists(containsLogFile))
+ }
+
/**
* Test that get on a non-existent returns None and no log is created.
*/