http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/log/LogCleaner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 4898d11..27da43b 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -17,7 +17,7 @@ package kafka.log -import java.io.File +import java.io.{File, IOException} import java.nio._ import java.util.Date import java.util.concurrent.{CountDownLatch, TimeUnit} @@ -25,6 +25,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} import com.yammer.metrics.core.Gauge import kafka.common._ import kafka.metrics.KafkaMetricsGroup +import kafka.server.LogDirFailureChannel import kafka.utils._ import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Time @@ -38,29 +39,29 @@ import scala.collection.JavaConverters._ /** * The cleaner is responsible for removing obsolete records from logs which have the "compact" retention strategy. * A message with key K and offset O is obsolete if there exists a message with key K and offset O' such that O < O'. - * + * * Each log can be thought of being split into two sections of segments: a "clean" section which has previously been cleaned followed by a * "dirty" section that has not yet been cleaned. The dirty section is further divided into the "cleanable" section followed by an "uncleanable" section. * The uncleanable section is excluded from cleaning. The active log segment is always uncleanable. If there is a * compaction lag time set, segments whose largest message timestamp is within the compaction lag time of the cleaning operation are also uncleanable. * * The cleaning is carried out by a pool of background threads. Each thread chooses the dirtiest log that has the "compact" retention policy - * and cleans that. The dirtiness of the log is guessed by taking the ratio of bytes in the dirty section of the log to the total bytes in the log. - * + * and cleans that. The dirtiness of the log is guessed by taking the ratio of bytes in the dirty section of the log to the total bytes in the log. + * * To clean a log the cleaner first builds a mapping of key=>last_offset for the dirty section of the log. See kafka.log.OffsetMap for details of - * the implementation of the mapping. - * - * Once the key=>offset map is built, the log is cleaned by recopying each log segment but omitting any key that appears in the offset map with a + * the implementation of the mapping. + * + * Once the key=>offset map is built, the log is cleaned by recopying each log segment but omitting any key that appears in the offset map with a * higher offset than what is found in the segment (i.e. messages with a key that appears in the dirty section of the log). - * + * * To avoid segments shrinking to very small sizes with repeated cleanings we implement a rule by which if we will merge successive segments when * doing a cleaning if their log and index size are less than the maximum log and index size prior to the clean beginning. - * + * * Cleaned segments are swapped into the log as they become available. - * + * * One nuance that the cleaner must handle is log truncation. If a log is truncated while it is being cleaned the cleaning of that log is aborted. - * - * Messages with null payload are treated as deletes for the purpose of log compaction. This means that they receive special treatment by the cleaner. + * + * Messages with null payload are treated as deletes for the purpose of log compaction. This means that they receive special treatment by the cleaner. * The cleaner will only retain delete records for a period of time to avoid accumulating space indefinitely. This period of time is configurable on a per-topic * basis and is measured from the time the segment enters the clean portion of the log (at which point any prior message with that key has been removed). * Delete markers in the clean section of the log that are older than this time will not be retained when log segments are being recopied as part of cleaning. @@ -88,29 +89,30 @@ import scala.collection.JavaConverters._ class LogCleaner(val config: CleanerConfig, val logDirs: Array[File], val logs: Pool[TopicPartition, Log], + val logDirFailureChannel: LogDirFailureChannel, time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup { - + /* for managing the state of partitions being cleaned. package-private to allow access in tests */ - private[log] val cleanerManager = new LogCleanerManager(logDirs, logs) + private[log] val cleanerManager = new LogCleanerManager(logDirs, logs, logDirFailureChannel) /* a throttle used to limit the I/O of all the cleaner threads to a user-specified maximum rate */ - private val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond, - checkIntervalMs = 300, - throttleDown = true, + private val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond, + checkIntervalMs = 300, + throttleDown = true, "cleaner-io", "bytes", time = time) - + /* the threads */ private val cleaners = (0 until config.numThreads).map(new CleanerThread(_)) - + /* a metric to track the maximum utilization of any thread's buffer in the last cleaning */ - newGauge("max-buffer-utilization-percent", + newGauge("max-buffer-utilization-percent", new Gauge[Int] { def value: Int = cleaners.map(_.lastStats).map(100 * _.bufferUtilization).max.toInt }) /* a metric to track the recopy rate of each thread's last cleaning */ - newGauge("cleaner-recopy-percent", + newGauge("cleaner-recopy-percent", new Gauge[Int] { def value: Int = { val stats = cleaners.map(_.lastStats) @@ -123,7 +125,7 @@ class LogCleaner(val config: CleanerConfig, new Gauge[Int] { def value: Int = cleaners.map(_.lastStats).map(_.elapsedSecs).max.toInt }) - + /** * Start the background cleaning */ @@ -131,7 +133,7 @@ class LogCleaner(val config: CleanerConfig, info("Starting the log cleaner") cleaners.foreach(_.start()) } - + /** * Stop the background cleaning */ @@ -139,7 +141,7 @@ class LogCleaner(val config: CleanerConfig, info("Shutting down the log cleaner.") cleaners.foreach(_.shutdown()) } - + /** * Abort the cleaning of a particular partition, if it's in progress. This call blocks until the cleaning of * the partition is aborted. @@ -155,6 +157,10 @@ class LogCleaner(val config: CleanerConfig, cleanerManager.updateCheckpoints(dataDir, update=None) } + def handleLogDirFailure(dir: String) { + cleanerManager.handleLogDirFailure(dir) + } + /** * Truncate cleaner offset checkpoint for the given partition if its checkpointed offset is larger than the given offset */ @@ -197,21 +203,21 @@ class LogCleaner(val config: CleanerConfig, } isCleaned } - + /** * The cleaner threads do the actual log cleaning. Each thread processes does its cleaning repeatedly by * choosing the dirtiest log, cleaning it, and then swapping in the cleaned segments. */ private class CleanerThread(threadId: Int) extends ShutdownableThread(name = "kafka-log-cleaner-thread-" + threadId, isInterruptible = false) { - + override val loggerName = classOf[LogCleaner].getName - + if(config.dedupeBufferSize / config.numThreads > Int.MaxValue) warn("Cannot use more than 2G of cleaner buffer space per cleaner thread, ignoring excess buffer space...") val cleaner = new Cleaner(id = threadId, - offsetMap = new SkimpyOffsetMap(memory = math.min(config.dedupeBufferSize / config.numThreads, Int.MaxValue).toInt, + offsetMap = new SkimpyOffsetMap(memory = math.min(config.dedupeBufferSize / config.numThreads, Int.MaxValue).toInt, hashAlgorithm = config.hashAlgorithm), ioBufferSize = config.ioBufferSize / config.numThreads / 2, maxIoBufferSize = config.maxMessageSize, @@ -219,7 +225,7 @@ class LogCleaner(val config: CleanerConfig, throttler = throttler, time = time, checkDone = checkDone) - + @volatile var lastStats: CleanerStats = new CleanerStats() private val backOffWaitLatch = new CountDownLatch(1) @@ -241,7 +247,7 @@ class LogCleaner(val config: CleanerConfig, backOffWaitLatch.countDown() awaitShutdown() } - + /** * Clean a log if there is a dirty log available, otherwise sleep for a bit */ @@ -258,6 +264,9 @@ class LogCleaner(val config: CleanerConfig, endOffset = nextDirtyOffset } catch { case _: LogCleaningAbortedException => // task can be aborted, let it go. + case e: IOException => + error(s"Failed to clean up log for ${cleanable.topicPartition} in dir ${cleanable.log.dir.getParent} due to IOException", e) + logDirFailureChannel.maybeAddLogFailureEvent(cleanable.log.dir.getParent) } finally { cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset) } @@ -275,36 +284,36 @@ class LogCleaner(val config: CleanerConfig, if (!cleaned) backOffWaitLatch.await(config.backOffMs, TimeUnit.MILLISECONDS) } - + /** * Log out statistics on a single run of the cleaner. */ def recordStats(id: Int, name: String, from: Long, to: Long, stats: CleanerStats) { this.lastStats = stats def mb(bytes: Double) = bytes / (1024*1024) - val message = - "%n\tLog cleaner thread %d cleaned log %s (dirty section = [%d, %d])%n".format(id, name, from, to) + - "\t%,.1f MB of log processed in %,.1f seconds (%,.1f MB/sec).%n".format(mb(stats.bytesRead), - stats.elapsedSecs, - mb(stats.bytesRead/stats.elapsedSecs)) + - "\tIndexed %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.mapBytesRead), - stats.elapsedIndexSecs, - mb(stats.mapBytesRead)/stats.elapsedIndexSecs, + val message = + "%n\tLog cleaner thread %d cleaned log %s (dirty section = [%d, %d])%n".format(id, name, from, to) + + "\t%,.1f MB of log processed in %,.1f seconds (%,.1f MB/sec).%n".format(mb(stats.bytesRead), + stats.elapsedSecs, + mb(stats.bytesRead/stats.elapsedSecs)) + + "\tIndexed %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.mapBytesRead), + stats.elapsedIndexSecs, + mb(stats.mapBytesRead)/stats.elapsedIndexSecs, 100 * stats.elapsedIndexSecs/stats.elapsedSecs) + "\tBuffer utilization: %.1f%%%n".format(100 * stats.bufferUtilization) + - "\tCleaned %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.bytesRead), - stats.elapsedSecs - stats.elapsedIndexSecs, - mb(stats.bytesRead)/(stats.elapsedSecs - stats.elapsedIndexSecs), 100 * (stats.elapsedSecs - stats.elapsedIndexSecs).toDouble/stats.elapsedSecs) + + "\tCleaned %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.bytesRead), + stats.elapsedSecs - stats.elapsedIndexSecs, + mb(stats.bytesRead)/(stats.elapsedSecs - stats.elapsedIndexSecs), 100 * (stats.elapsedSecs - stats.elapsedIndexSecs).toDouble/stats.elapsedSecs) + "\tStart size: %,.1f MB (%,d messages)%n".format(mb(stats.bytesRead), stats.messagesRead) + - "\tEnd size: %,.1f MB (%,d messages)%n".format(mb(stats.bytesWritten), stats.messagesWritten) + - "\t%.1f%% size reduction (%.1f%% fewer messages)%n".format(100.0 * (1.0 - stats.bytesWritten.toDouble/stats.bytesRead), + "\tEnd size: %,.1f MB (%,d messages)%n".format(mb(stats.bytesWritten), stats.messagesWritten) + + "\t%.1f%% size reduction (%.1f%% fewer messages)%n".format(100.0 * (1.0 - stats.bytesWritten.toDouble/stats.bytesRead), 100.0 * (1.0 - stats.messagesWritten.toDouble/stats.messagesRead)) info(message) if (stats.invalidMessagesRead > 0) { warn("\tFound %d invalid messages during compaction.".format(stats.invalidMessagesRead)) } } - + } } @@ -327,14 +336,14 @@ private[log] class Cleaner(val id: Int, throttler: Throttler, time: Time, checkDone: (TopicPartition) => Unit) extends Logging { - + override val loggerName = classOf[LogCleaner].getName this.logIdent = "Cleaner " + id + ": " /* buffer used for read i/o */ private var readBuffer = ByteBuffer.allocate(ioBufferSize) - + /* buffer used for write i/o */ private var writeBuffer = ByteBuffer.allocate(ioBufferSize) @@ -352,7 +361,7 @@ private[log] class Cleaner(val id: Int, private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = { // figure out the timestamp below which it is safe to remove delete tombstones // this position is defined to be a configurable time beneath the last modified time of the last clean segment - val deleteHorizonMs = + val deleteHorizonMs = cleanable.log.logSegments(0, cleanable.firstDirtyOffset).lastOption match { case None => 0L case Some(seg) => seg.lastModified - cleanable.log.config.deleteRetentionMs
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/log/LogCleanerManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index ed0cb69..af8707c 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -24,11 +24,13 @@ import java.util.concurrent.locks.ReentrantLock import com.yammer.metrics.core.Gauge import kafka.common.LogCleaningAbortedException import kafka.metrics.KafkaMetricsGroup +import kafka.server.LogDirFailureChannel import kafka.server.checkpoints.OffsetCheckpointFile import kafka.utils.CoreUtils._ import kafka.utils.{Logging, Pool} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.utils.Time +import org.apache.kafka.common.errors.KafkaStorageException import scala.collection.{immutable, mutable} @@ -45,7 +47,9 @@ private[log] case object LogCleaningPaused extends LogCleaningState * While a partition is in the LogCleaningPaused state, it won't be scheduled for cleaning again, until cleaning is * requested to be resumed. */ -private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicPartition, Log]) extends Logging with KafkaMetricsGroup { +private[log] class LogCleanerManager(val logDirs: Array[File], + val logs: Pool[TopicPartition, Log], + val logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup { import LogCleanerManager._ @@ -53,19 +57,19 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To // package-private for testing private[log] val offsetCheckpointFile = "cleaner-offset-checkpoint" - + /* the offset checkpoints holding the last cleaned point for each log */ - private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpointFile(new File(dir, offsetCheckpointFile)))).toMap + @volatile private var checkpoints = logDirs.map(dir => (dir, new OffsetCheckpointFile(new File(dir, offsetCheckpointFile), logDirFailureChannel))).toMap /* the set of logs currently being cleaned */ private val inProgress = mutable.HashMap[TopicPartition, LogCleaningState]() /* a global lock used to control all access to the in-progress set and the offset checkpoints */ private val lock = new ReentrantLock - + /* for coordinating the pausing and the cleaning of a partition */ private val pausedCleaningCond = lock.newCondition() - + /* a gauge for tracking the cleanable ratio of the dirtiest log */ @volatile private var dirtiestLogCleanableRatio = 0.0 newGauge("max-dirty-percent", new Gauge[Int] { def value = (100 * dirtiestLogCleanableRatio).toInt }) @@ -77,8 +81,20 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To /** * @return the position processed for all logs. */ - def allCleanerCheckpoints: Map[TopicPartition, Long] = - checkpoints.values.flatMap(_.read()).toMap + def allCleanerCheckpoints: Map[TopicPartition, Long] = { + inLock(lock) { + checkpoints.values.flatMap(checkpoint => { + try { + checkpoint.read() + } catch { + case e: KafkaStorageException => + error(s"Failed to access checkpoint file ${checkpoint.f.getName} in dir ${checkpoint.f.getParentFile.getAbsolutePath}", e) + Map.empty[TopicPartition, Long] + } + }).toMap + } + } + /** * Choose the log to clean next and add it to the in-progress set. We recompute this @@ -217,8 +233,22 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To def updateCheckpoints(dataDir: File, update: Option[(TopicPartition,Long)]) { inLock(lock) { val checkpoint = checkpoints(dataDir) - val existing = checkpoint.read().filterKeys(logs.keys) ++ update - checkpoint.write(existing) + if (checkpoint != null) { + try { + val existing = checkpoint.read().filterKeys(logs.keys) ++ update + checkpoint.write(existing) + } catch { + case e: KafkaStorageException => + error(s"Failed to access checkpoint file ${checkpoint.f.getName} in dir ${checkpoint.f.getParentFile.getAbsolutePath}", e) + } + } + } + } + + def handleLogDirFailure(dir: String) { + info(s"Stopping cleaning logs in dir $dir") + inLock(lock) { + checkpoints = checkpoints.filterKeys(_.getAbsolutePath != dir) } } @@ -226,10 +256,11 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To inLock(lock) { if (logs.get(topicPartition).config.compact) { val checkpoint = checkpoints(dataDir) - val existing = checkpoint.read() - - if (existing.getOrElse(topicPartition, 0L) > offset) - checkpoint.write(existing + (topicPartition -> offset)) + if (checkpoint != null) { + val existing = checkpoint.read() + if (existing.getOrElse(topicPartition, 0L) > offset) + checkpoint.write(existing + (topicPartition -> offset)) + } } } } @@ -241,7 +272,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To inLock(lock) { inProgress(topicPartition) match { case LogCleaningInProgress => - updateCheckpoints(dataDir,Option(topicPartition, endOffset)) + updateCheckpoints(dataDir, Option(topicPartition, endOffset)) inProgress.remove(topicPartition) case LogCleaningAborted => inProgress.put(topicPartition, LogCleaningPaused) http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/log/LogManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 2df5241..f459cc1 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -21,29 +21,33 @@ import java.io._ import java.nio.file.Files import java.util.concurrent._ +import com.yammer.metrics.core.Gauge import kafka.admin.AdminUtils -import kafka.common.{KafkaException, KafkaStorageException} +import kafka.common.KafkaException +import kafka.metrics.KafkaMetricsGroup import kafka.server.checkpoints.OffsetCheckpointFile import kafka.server.{BrokerState, RecoveringFromUncleanShutdown, _} import kafka.utils._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.utils.Time - +import org.apache.kafka.common.errors.KafkaStorageException import scala.collection.JavaConverters._ import scala.collection._ +import scala.collection.mutable.ArrayBuffer /** * The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning. * All read and write operations are delegated to the individual log instances. - * + * * The log manager maintains logs in one or more directories. New logs are created in the data directory * with the fewest logs. No attempt is made to move partitions after the fact or balance based on * size or I/O rate. - * + * * A background thread handles log retention by periodically truncating excess log segments. */ @threadsafe -class LogManager(val logDirs: Array[File], +class LogManager(logDirs: Array[File], + initialOfflineDirs: Array[File], val topicConfigs: Map[String, LogConfig], // note that this doesn't get updated after creation val defaultConfig: LogConfig, val cleanerConfig: CleanerConfig, @@ -56,7 +60,8 @@ class LogManager(val logDirs: Array[File], scheduler: Scheduler, val brokerState: BrokerState, brokerTopicStats: BrokerTopicStats, - time: Time) extends Logging { + logDirFailureChannel: LogDirFailureChannel, + time: Time) extends Logging with KafkaMetricsGroup { val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint" val LogStartOffsetCheckpointFile = "log-start-offset-checkpoint" val LockFile = ".lock" @@ -66,140 +71,243 @@ class LogManager(val logDirs: Array[File], private val logs = new Pool[TopicPartition, Log]() private val logsToBeDeleted = new LinkedBlockingQueue[Log]() - createAndValidateLogDirs(logDirs) - private val dirLocks = lockLogDirs(logDirs) - private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpointFile(new File(dir, RecoveryPointCheckpointFile)))).toMap - private val logStartOffsetCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpointFile(new File(dir, LogStartOffsetCheckpointFile)))).toMap + private val _liveLogDirs: ConcurrentLinkedQueue[File] = createAndValidateLogDirs(logDirs, initialOfflineDirs) + + def liveLogDirs: Array[File] = { + if (_liveLogDirs.size() == logDirs.size) + logDirs + else + _liveLogDirs.asScala.toArray + } + + private val dirLocks = lockLogDirs(liveLogDirs) + @volatile private var recoveryPointCheckpoints = liveLogDirs.map(dir => + (dir, new OffsetCheckpointFile(new File(dir, RecoveryPointCheckpointFile), logDirFailureChannel))).toMap + @volatile private var logStartOffsetCheckpoints = liveLogDirs.map(dir => + (dir, new OffsetCheckpointFile(new File(dir, LogStartOffsetCheckpointFile), logDirFailureChannel))).toMap + + private def offlineLogDirs = logDirs.filterNot(_liveLogDirs.contains) + loadLogs() + // public, so we can access this from kafka.admin.DeleteTopicTest val cleaner: LogCleaner = if(cleanerConfig.enableCleaner) - new LogCleaner(cleanerConfig, logDirs, logs, time = time) + new LogCleaner(cleanerConfig, liveLogDirs, logs, logDirFailureChannel, time = time) else null - + + val offlineLogDirectoryCount = newGauge( + "OfflineLogDirectoryCount", + new Gauge[Int] { + def value = offlineLogDirs.length + } + ) + + for (dir <- logDirs) { + newGauge( + "LogDirectoryOffline", + new Gauge[Int] { + def value = if (_liveLogDirs.contains(dir)) 0 else 1 + }, + Map("logDirectory" -> dir.getAbsolutePath) + ) + } + /** - * Create and check validity of the given directories, specifically: + * Create and check validity of the given directories that are not in the given offline directories, specifically: * <ol> * <li> Ensure that there are no duplicates in the directory list * <li> Create each directory if it doesn't exist - * <li> Check that each path is a readable directory + * <li> Check that each path is a readable directory * </ol> */ - private def createAndValidateLogDirs(dirs: Seq[File]) { + private def createAndValidateLogDirs(dirs: Seq[File], initialOfflineDirs: Seq[File]): ConcurrentLinkedQueue[File] = { if(dirs.map(_.getCanonicalPath).toSet.size < dirs.size) - throw new KafkaException("Duplicate log directory found: " + logDirs.mkString(", ")) - for(dir <- dirs) { - if(!dir.exists) { - info("Log directory '" + dir.getAbsolutePath + "' not found, creating it.") - val created = dir.mkdirs() - if(!created) - throw new KafkaException("Failed to create data directory " + dir.getAbsolutePath) + throw new KafkaException("Duplicate log directory found: " + dirs.mkString(", ")) + + val liveLogDirs = new ConcurrentLinkedQueue[File]() + + for (dir <- dirs if !initialOfflineDirs.contains(dir)) { + try { + if (!dir.exists) { + info("Log directory '" + dir.getAbsolutePath + "' not found, creating it.") + val created = dir.mkdirs() + if (!created) + throw new IOException("Failed to create data directory " + dir.getAbsolutePath) + } + if (!dir.isDirectory || !dir.canRead) + throw new IOException(dir.getAbsolutePath + " is not a readable log directory.") + liveLogDirs.add(dir) + } catch { + case e: IOException => + error(s"Failed to create or validate data directory $dir.getAbsolutePath", e) + } + } + if (liveLogDirs.isEmpty) { + fatal(s"Shutdown broker because none of the specified log dirs from " + dirs.mkString(", ") + " can be created or validated") + Exit.halt(1) + } + + liveLogDirs + } + + def handleLogDirFailure(dir: String) { + info(s"Stopping serving logs in dir $dir") + logCreationOrDeletionLock synchronized { + _liveLogDirs.remove(new File(dir)) + if (_liveLogDirs.isEmpty) { + fatal(s"Shutdown broker because all log dirs in ${logDirs.mkString(", ")} have failed") + Exit.halt(1) } - if(!dir.isDirectory || !dir.canRead) - throw new KafkaException(dir.getAbsolutePath + " is not a readable log directory.") + + recoveryPointCheckpoints = recoveryPointCheckpoints.filterKeys(file => file.getAbsolutePath != dir) + logStartOffsetCheckpoints = logStartOffsetCheckpoints.filterKeys(file => file.getAbsolutePath != dir) + if (cleaner != null) + cleaner.handleLogDirFailure(dir) + + val offlineTopicPartitions = logs.filter { case (tp, log) => log.dir.getParent == dir}.map { case (tp, log) => tp } + + offlineTopicPartitions.foreach(topicPartition => { + val removedLog = logs.remove(topicPartition) + if (removedLog != null) { + removedLog.closeHandlers() + removedLog.removeLogMetrics() + } + }) + info(s"Partitions ${offlineTopicPartitions.mkString(",")} are offline due to failure on log directory $dir") + dirLocks.filter(_.file.getParent == dir).foreach(dir => CoreUtils.swallow(dir.destroy())) } } - + /** * Lock all the given directories */ private def lockLogDirs(dirs: Seq[File]): Seq[FileLock] = { - dirs.map { dir => - val lock = new FileLock(new File(dir, LockFile)) - if(!lock.tryLock()) - throw new KafkaException("Failed to acquire lock on file .lock in " + lock.file.getParentFile.getAbsolutePath + - ". A Kafka instance in another process or thread is using this directory.") - lock + dirs.flatMap { dir => + try { + val lock = new FileLock(new File(dir, LockFile)) + if (!lock.tryLock()) + throw new KafkaException("Failed to acquire lock on file .lock in " + lock.file.getParent + + ". A Kafka instance in another process or thread is using this directory.") + Some(lock) + } catch { + case e: IOException => + error(s"Disk error while locking directory $dir", e) + logDirFailureChannel.maybeAddLogFailureEvent(dir.getAbsolutePath) + None + } + } + } + + private def loadLogs(logDir: File, recoveryPoints: Map[TopicPartition, Long], logStartOffsets: Map[TopicPartition, Long]): Unit = { + debug("Loading log '" + logDir.getName + "'") + val topicPartition = Log.parseTopicPartitionName(logDir) + val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) + val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L) + val logStartOffset = logStartOffsets.getOrElse(topicPartition, 0L) + + val current = Log( + dir = logDir, + config = config, + logStartOffset = logStartOffset, + recoveryPoint = logRecoveryPoint, + maxProducerIdExpirationMs = maxPidExpirationMs, + scheduler = scheduler, + time = time, + brokerTopicStats = brokerTopicStats) + + if (logDir.getName.endsWith(Log.DeleteDirSuffix)) { + this.logsToBeDeleted.add(current) + } else { + val previous = this.logs.put(topicPartition, current) + if (previous != null) { + throw new IllegalArgumentException( + "Duplicate log directories found: %s, %s!".format( + current.dir.getAbsolutePath, previous.dir.getAbsolutePath)) + } } } - + /** * Recover and load all logs in the given data directories */ private def loadLogs(): Unit = { info("Loading logs.") val startMs = time.milliseconds - val threadPools = mutable.ArrayBuffer.empty[ExecutorService] + val threadPools = ArrayBuffer.empty[ExecutorService] + val offlineDirs = ArrayBuffer.empty[String] val jobs = mutable.Map.empty[File, Seq[Future[_]]] - for (dir <- this.logDirs) { - val pool = Executors.newFixedThreadPool(ioThreads) - threadPools.append(pool) - - val cleanShutdownFile = new File(dir, Log.CleanShutdownFile) - - if (cleanShutdownFile.exists) { - debug( - "Found clean shutdown file. " + - "Skipping recovery for all logs in data directory: " + - dir.getAbsolutePath) - } else { - // log recovery itself is being performed by `Log` class during initialization - brokerState.newState(RecoveringFromUncleanShutdown) - } - - var recoveryPoints = Map[TopicPartition, Long]() - try { - recoveryPoints = this.recoveryPointCheckpoints(dir).read - } catch { - case e: Exception => - warn("Error occurred while reading recovery-point-offset-checkpoint file of directory " + dir, e) - warn("Resetting the recovery checkpoint to 0") - } - - var logStartOffsets = Map[TopicPartition, Long]() + for (dir <- liveLogDirs) { try { - logStartOffsets = this.logStartOffsetCheckpoints(dir).read - } catch { - case e: Exception => - warn("Error occurred while reading log-start-offset-checkpoint file of directory " + dir, e) - } + val pool = Executors.newFixedThreadPool(ioThreads) + threadPools.append(pool) + + val cleanShutdownFile = new File(dir, Log.CleanShutdownFile) + + if (cleanShutdownFile.exists) { + debug( + "Found clean shutdown file. " + + "Skipping recovery for all logs in data directory: " + + dir.getAbsolutePath) + } else { + // log recovery itself is being performed by `Log` class during initialization + brokerState.newState(RecoveringFromUncleanShutdown) + } - val jobsForDir = for { - dirContent <- Option(dir.listFiles).toList - logDir <- dirContent if logDir.isDirectory - } yield { - CoreUtils.runnable { - debug("Loading log '" + logDir.getName + "'") + var recoveryPoints = Map[TopicPartition, Long]() + try { + recoveryPoints = this.recoveryPointCheckpoints(dir).read + } catch { + case e: Exception => + warn("Error occurred while reading recovery-point-offset-checkpoint file of directory " + dir, e) + warn("Resetting the recovery checkpoint to 0") + } - val topicPartition = Log.parseTopicPartitionName(logDir) - val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) - val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L) - val logStartOffset = logStartOffsets.getOrElse(topicPartition, 0L) + var logStartOffsets = Map[TopicPartition, Long]() + try { + logStartOffsets = this.logStartOffsetCheckpoints(dir).read + } catch { + case e: Exception => + warn("Error occurred while reading log-start-offset-checkpoint file of directory " + dir, e) + } - val current = Log( - dir = logDir, - config = config, - logStartOffset = logStartOffset, - recoveryPoint = logRecoveryPoint, - maxProducerIdExpirationMs = maxPidExpirationMs, - scheduler = scheduler, - time = time, - brokerTopicStats = brokerTopicStats) - if (logDir.getName.endsWith(Log.DeleteDirSuffix)) { - this.logsToBeDeleted.add(current) - } else { - val previous = this.logs.put(topicPartition, current) - if (previous != null) { - throw new IllegalArgumentException( - "Duplicate log directories found: %s, %s!".format( - current.dir.getAbsolutePath, previous.dir.getAbsolutePath)) + val jobsForDir = for { + dirContent <- Option(dir.listFiles).toList + logDir <- dirContent if logDir.isDirectory + } yield { + CoreUtils.runnable { + try { + loadLogs(logDir, recoveryPoints, logStartOffsets) + } catch { + case e: IOException => + offlineDirs.append(dir.getAbsolutePath) + error("Error while loading log dir " + dir.getAbsolutePath, e) } } } + jobs(cleanShutdownFile) = jobsForDir.map(pool.submit) + } catch { + case e: IOException => + offlineDirs.append(dir.getAbsolutePath) + error("Error while loading log dir " + dir.getAbsolutePath, e) } - - jobs(cleanShutdownFile) = jobsForDir.map(pool.submit) } - try { for ((cleanShutdownFile, dirJobs) <- jobs) { dirJobs.foreach(_.get) - cleanShutdownFile.delete() + try { + cleanShutdownFile.delete() + } catch { + case e: IOException => + offlineDirs.append(cleanShutdownFile.getParent) + error(s"Error while deleting the clean shutdown file $cleanShutdownFile", e) + } } + offlineDirs.foreach(logDirFailureChannel.maybeAddLogFailureEvent) } catch { case e: ExecutionException => { error("There was an error in one of the threads during logs loading: " + e.getCause) @@ -231,7 +339,7 @@ class LogManager(val logDirs: Array[File], period = flushCheckMs, TimeUnit.MILLISECONDS) scheduler.schedule("kafka-recovery-point-checkpoint", - checkpointRecoveryPointOffsets _, + checkpointLogRecoveryOffsets _, delay = InitialTaskDelayMs, period = flushRecoveryOffsetCheckpointMs, TimeUnit.MILLISECONDS) @@ -256,7 +364,12 @@ class LogManager(val logDirs: Array[File], def shutdown() { info("Shutting down.") - val threadPools = mutable.ArrayBuffer.empty[ExecutorService] + removeMetric("OfflineLogDirectoryCount") + for (dir <- logDirs) { + removeMetric("LogDirectoryOffline", Map("logDirectory" -> dir.getAbsolutePath)) + } + + val threadPools = ArrayBuffer.empty[ExecutorService] val jobs = mutable.Map.empty[File, Seq[Future[_]]] // stop the cleaner first @@ -265,7 +378,7 @@ class LogManager(val logDirs: Array[File], } // close logs in each dir - for (dir <- this.logDirs) { + for (dir <- liveLogDirs) { debug("Flushing and closing logs at " + dir) val pool = Executors.newFixedThreadPool(ioThreads) @@ -337,11 +450,12 @@ class LogManager(val logDirs: Array[File], } } } - checkpointRecoveryPointOffsets() + checkpointLogRecoveryOffsets() } /** * Delete all data in a partition and start the log at the new offset + * * @param newOffset The new offset to start the log with */ def truncateFullyAndStartAt(topicPartition: TopicPartition, newOffset: Long) { @@ -357,15 +471,15 @@ class LogManager(val logDirs: Array[File], cleaner.resumeCleaning(topicPartition) } } - checkpointRecoveryPointOffsets() + checkpointLogRecoveryOffsets() } /** - * Write out the current recovery point for all logs to a text file in the log directory + * Write out the current recovery point for all logs to a text file in the log directory * to avoid recovering the whole log on startup. */ - def checkpointRecoveryPointOffsets() { - this.logDirs.foreach(checkpointLogRecoveryOffsetsInDir) + def checkpointLogRecoveryOffsets() { + liveLogDirs.foreach(checkpointLogRecoveryOffsetsInDir) } /** @@ -373,7 +487,7 @@ class LogManager(val logDirs: Array[File], * to avoid exposing data that have been deleted by DeleteRecordsRequest */ def checkpointLogStartOffsets() { - this.logDirs.foreach(checkpointLogStartOffsetsInDir) + liveLogDirs.foreach(checkpointLogStartOffsetsInDir) } /** @@ -382,7 +496,13 @@ class LogManager(val logDirs: Array[File], private def checkpointLogRecoveryOffsetsInDir(dir: File): Unit = { val recoveryPoints = this.logsByDir.get(dir.toString) if (recoveryPoints.isDefined) { - this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint)) + try { + this.recoveryPointCheckpoints.get(dir).foreach(_.write(recoveryPoints.get.mapValues(_.recoveryPoint))) + } catch { + case e: IOException => + error(s"Disk error while writing to recovery point file in directory $dir", e) + logDirFailureChannel.maybeAddLogFailureEvent(dir.getAbsolutePath) + } } } @@ -390,10 +510,17 @@ class LogManager(val logDirs: Array[File], * Checkpoint log start offset for all logs in provided directory. */ private def checkpointLogStartOffsetsInDir(dir: File): Unit = { - val logs = this.logsByDir.get(dir.toString) + val logs = this.logsByDir.get(dir.getAbsolutePath) if (logs.isDefined) { - this.logStartOffsetCheckpoints(dir).write( - logs.get.filter{case (tp, log) => log.logStartOffset > log.logSegments.head.baseOffset}.mapValues(_.logStartOffset)) + try { + this.logStartOffsetCheckpoints.get(dir).foreach(_.write( + logs.get.filter { case (tp, log) => log.logStartOffset > log.logSegments.head.baseOffset }.mapValues(_.logStartOffset) + )) + } catch { + case e: IOException => + error(s"Disk error while writing to logStartOffset file in directory $dir", e) + logDirFailureChannel.maybeAddLogFailureEvent(dir.getAbsolutePath) + } } } @@ -403,33 +530,47 @@ class LogManager(val logDirs: Array[File], def getLog(topicPartition: TopicPartition): Option[Log] = Option(logs.get(topicPartition)) /** - * Create a log for the given topic and the given partition * If the log already exists, just return a copy of the existing log + * Otherwise if isNew=true or if there is no offline log directory, create a log for the given topic and the given partition + * Otherwise throw KafkaStorageException + * + * @param isNew Whether the replica should have existed on the broker or not + * @throws KafkaStorageException if isNew=false, log is not found in the cache and there is offline log directory on the broker */ - def createLog(topicPartition: TopicPartition, config: LogConfig): Log = { + def getOrCreateLog(topicPartition: TopicPartition, config: LogConfig, isNew: Boolean = false): Log = { logCreationOrDeletionLock synchronized { - // create the log if it has not already been created in another thread getLog(topicPartition).getOrElse { + // create the log if it has not already been created in another thread + if (!isNew && offlineLogDirs.nonEmpty) + throw new KafkaStorageException(s"Can not create log for $topicPartition because log directories ${offlineLogDirs.mkString(",")} are offline") + val dataDir = nextLogDir() - val dir = new File(dataDir, topicPartition.topic + "-" + topicPartition.partition) - Files.createDirectories(dir.toPath) - - val log = Log( - dir = dir, - config = config, - logStartOffset = 0L, - recoveryPoint = 0L, - maxProducerIdExpirationMs = maxPidExpirationMs, - scheduler = scheduler, - time = time, - brokerTopicStats = brokerTopicStats) - logs.put(topicPartition, log) - info("Created log for partition [%s,%d] in %s with properties {%s}." - .format(topicPartition.topic, - topicPartition.partition, - dataDir.getAbsolutePath, - config.originals.asScala.mkString(", "))) - log + try { + val dir = new File(dataDir, topicPartition.topic + "-" + topicPartition.partition) + Files.createDirectories(dir.toPath) + + val log = Log( + dir = dir, + config = config, + logStartOffset = 0L, + recoveryPoint = 0L, + maxProducerIdExpirationMs = maxPidExpirationMs, + scheduler = scheduler, + time = time, + brokerTopicStats = brokerTopicStats) + logs.put(topicPartition, log) + + info("Created log for partition [%s,%d] in %s with properties {%s}." + .format(topicPartition.topic, + topicPartition.partition, + dataDir.getAbsolutePath, + config.originals.asScala.mkString(", "))) + log + } catch { + case e: IOException => + logDirFailureChannel.maybeAddLogFailureEvent(dataDir.getAbsolutePath) + throw new KafkaStorageException(s"Error while creating log for $topicPartition in dir ${dataDir.getAbsolutePath}", e) + } } } } @@ -439,30 +580,27 @@ class LogManager(val logDirs: Array[File], */ private def deleteLogs(): Unit = { try { - var failed = 0 - while (!logsToBeDeleted.isEmpty && failed < logsToBeDeleted.size()) { + while (!logsToBeDeleted.isEmpty) { val removedLog = logsToBeDeleted.take() if (removedLog != null) { try { removedLog.delete() info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.") } catch { - case e: Throwable => - error(s"Exception in deleting $removedLog. Moving it to the end of the queue.", e) - failed = failed + 1 - logsToBeDeleted.put(removedLog) + case e: KafkaStorageException => + error(s"Exception while deleting $removedLog in dir ${removedLog.dir.getParent}.", e) } } } } catch { - case e: Throwable => + case e: Throwable => error(s"Exception in kafka-delete-logs thread.", e) } } /** - * Rename the directory of the given topic-partition "logdir" as "logdir.uuid.delete" and - * add it in the queue for deletion. + * Rename the directory of the given topic-partition "logdir" as "logdir.uuid.delete" and + * add it in the queue for deletion. * @param topicPartition TopicPartition that needs to be deleted */ def asyncDelete(topicPartition: TopicPartition) = { @@ -470,30 +608,38 @@ class LogManager(val logDirs: Array[File], logs.remove(topicPartition) } if (removedLog != null) { - //We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it. - if (cleaner != null) { - cleaner.abortCleaning(topicPartition) - cleaner.updateCheckpoints(removedLog.dir.getParentFile) - } - val dirName = Log.logDeleteDirName(removedLog.name) - removedLog.close() - val renamedDir = new File(removedLog.dir.getParent, dirName) - val renameSuccessful = removedLog.dir.renameTo(renamedDir) - if (renameSuccessful) { - checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile) - removedLog.dir = renamedDir - // change the file pointers for log and index file - for (logSegment <- removedLog.logSegments) { - logSegment.log.setFile(new File(renamedDir, logSegment.log.file.getName)) - logSegment.index.file = new File(renamedDir, logSegment.index.file.getName) + try { + //We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it. + if (cleaner != null) { + cleaner.abortCleaning(topicPartition) + cleaner.updateCheckpoints(removedLog.dir.getParentFile) } + val dirName = Log.logDeleteDirName(removedLog.name) + removedLog.close() + val renamedDir = new File(removedLog.dir.getParent, dirName) + val renameSuccessful = removedLog.dir.renameTo(renamedDir) + if (renameSuccessful) { + checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile) + removedLog.dir = renamedDir + // change the file pointers for log and index file + for (logSegment <- removedLog.logSegments) { + logSegment.log.setFile(new File(renamedDir, logSegment.log.file.getName)) + logSegment.index.file = new File(renamedDir, logSegment.index.file.getName) + } - logsToBeDeleted.add(removedLog) - removedLog.removeLogMetrics() - info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion") - } else { - throw new KafkaStorageException("Failed to rename log directory from " + removedLog.dir.getAbsolutePath + " to " + renamedDir.getAbsolutePath) + logsToBeDeleted.add(removedLog) + removedLog.removeLogMetrics() + info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion") + } else { + throw new IOException("Failed to rename log directory from " + removedLog.dir.getAbsolutePath + " to " + renamedDir.getAbsolutePath) + } + } catch { + case e: IOException => + logDirFailureChannel.maybeAddLogFailureEvent(removedLog.dir.getParent) + throw new KafkaStorageException(s"Error while deleting $topicPartition in dir ${removedLog.dir.getParent}.", e) } + } else if (offlineLogDirs.nonEmpty) { + throw new KafkaStorageException("Failed to delete log for " + topicPartition + " because it may be in one of the offline directories " + offlineLogDirs.mkString(",")) } } @@ -503,14 +649,14 @@ class LogManager(val logDirs: Array[File], * data directory with the fewest partitions. */ private def nextLogDir(): File = { - if(logDirs.size == 1) { - logDirs(0) + if(_liveLogDirs.size == 1) { + _liveLogDirs.peek() } else { // count the number of logs in each parent directory (including 0 for empty directories val logCounts = allLogs.groupBy(_.dir.getParent).mapValues(_.size) - val zeros = logDirs.map(dir => (dir.getPath, 0)).toMap + val zeros = _liveLogDirs.asScala.map(dir => (dir.getPath, 0)).toMap val dirCounts = (zeros ++ logCounts).toBuffer - + // choose the directory with the least logs in it val leastLoaded = dirCounts.sortBy(_._2).head new File(leastLoaded._1) @@ -552,6 +698,13 @@ class LogManager(val logDirs: Array[File], } } + def isLogDirOnline(logDir: String): Boolean = { + if (!logDirs.exists(_.getAbsolutePath == logDir)) + throw new RuntimeException(s"Log dir $logDir is not found in the config.") + + _liveLogDirs.contains(new File(logDir)) + } + /** * Flush any log which has exceeded its flush interval and has unwritten messages. */ @@ -575,11 +728,13 @@ class LogManager(val logDirs: Array[File], object LogManager { def apply(config: KafkaConfig, + initialOfflineDirs: Seq[String], zkUtils: ZkUtils, brokerState: BrokerState, kafkaScheduler: KafkaScheduler, time: Time, - brokerTopicStats: BrokerTopicStats): LogManager = { + brokerTopicStats: BrokerTopicStats, + logDirFailureChannel: LogDirFailureChannel): LogManager = { val defaultProps = KafkaServer.copyKafkaConfigToLog(config) val defaultLogConfig = LogConfig(defaultProps) @@ -598,6 +753,7 @@ object LogManager { enableCleaner = config.logCleanerEnable) new LogManager(logDirs = config.logDirs.map(new File(_)).toArray, + initialOfflineDirs = initialOfflineDirs.map(new File(_)).toArray, topicConfigs = topicConfigs, defaultConfig = defaultLogConfig, cleanerConfig = cleanerConfig, @@ -609,7 +765,8 @@ object LogManager { maxPidExpirationMs = config.transactionIdExpirationMs, scheduler = kafkaScheduler, brokerState = brokerState, - time = time, - brokerTopicStats = brokerTopicStats) + brokerTopicStats = brokerTopicStats, + logDirFailureChannel = logDirFailureChannel, + time = time) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/log/LogSegment.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 3e4c47d..0449a4a 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -20,8 +20,6 @@ import java.io.{File, IOException} import java.nio.file.Files import java.nio.file.attribute.FileTime import java.util.concurrent.TimeUnit - -import kafka.common._ import kafka.metrics.{KafkaMetricsGroup, KafkaTimer} import kafka.server.epoch.LeaderEpochCache import kafka.server.{FetchDataInfo, LogOffsetMetadata} @@ -383,28 +381,13 @@ class LogSegment(val log: FileRecords, /** * Change the suffix for the index and log file for this log segment + * IOException from this method should be handled by the caller */ def changeFileSuffixes(oldSuffix: String, newSuffix: String) { - - def kafkaStorageException(fileType: String, e: IOException) = - new KafkaStorageException(s"Failed to change the $fileType file suffix from $oldSuffix to $newSuffix for log segment $baseOffset", e) - - try log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix))) - catch { - case e: IOException => throw kafkaStorageException("log", e) - } - try index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, oldSuffix, newSuffix))) - catch { - case e: IOException => throw kafkaStorageException("index", e) - } - try timeIndex.renameTo(new File(CoreUtils.replaceSuffix(timeIndex.file.getPath, oldSuffix, newSuffix))) - catch { - case e: IOException => throw kafkaStorageException("timeindex", e) - } - try txnIndex.renameTo(new File(CoreUtils.replaceSuffix(txnIndex.file.getPath, oldSuffix, newSuffix))) - catch { - case e: IOException => throw kafkaStorageException("txnindex", e) - } + log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix))) + index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, oldSuffix, newSuffix))) + timeIndex.renameTo(new File(CoreUtils.replaceSuffix(timeIndex.file.getPath, oldSuffix, newSuffix))) + txnIndex.renameTo(new File(CoreUtils.replaceSuffix(txnIndex.file.getPath, oldSuffix, newSuffix))) } /** @@ -481,9 +464,17 @@ class LogSegment(val log: FileRecords, } /** + * Close file handlers used by the log segment but don't write to disk. This is used when the disk may have failed + */ + def closeHandlers() { + CoreUtils.swallow(index.closeHandler()) + CoreUtils.swallow(timeIndex.closeHandler()) + CoreUtils.swallow(log.closeHandlers()) + CoreUtils.swallow(txnIndex.close()) + } + + /** * Delete this log segment from the filesystem. - * - * @throws KafkaStorageException if the delete fails. */ def delete() { val deletedLog = log.delete() @@ -491,13 +482,13 @@ class LogSegment(val log: FileRecords, val deletedTimeIndex = timeIndex.delete() val deletedTxnIndex = txnIndex.delete() if (!deletedLog && log.file.exists) - throw new KafkaStorageException("Delete of log " + log.file.getName + " failed.") + throw new IOException("Delete of log " + log.file.getName + " failed.") if (!deletedIndex && index.file.exists) - throw new KafkaStorageException("Delete of index " + index.file.getName + " failed.") + throw new IOException("Delete of index " + index.file.getName + " failed.") if (!deletedTimeIndex && timeIndex.file.exists) - throw new KafkaStorageException("Delete of time index " + timeIndex.file.getName + " failed.") + throw new IOException("Delete of time index " + timeIndex.file.getName + " failed.") if (!deletedTxnIndex && txnIndex.file.exists) - throw new KafkaStorageException("Delete of transaction index " + txnIndex.file.getName + " failed.") + throw new IOException("Delete of transaction index " + txnIndex.file.getName + " failed.") } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/log/ProducerStateManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index 7cc8e8e..ce56a6c 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -638,9 +638,13 @@ class ProducerStateManager(val topicPartition: TopicPartition, } private def listSnapshotFiles: List[File] = { - if (logDir.exists && logDir.isDirectory) - logDir.listFiles.filter(f => f.isFile && isSnapshotFile(f.getName)).toList - else + if (logDir.exists && logDir.isDirectory) { + val files = logDir.listFiles + if (files != null) + files.filter(f => f.isFile && isSnapshotFile(f.getName)).toList + else + List.empty[File] + } else List.empty[File] } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index b17d255..9b01043 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -18,22 +18,19 @@ package kafka.server import java.util.concurrent.locks.ReentrantLock - import kafka.cluster.BrokerEndPoint -import kafka.consumer.PartitionTopicInfo import kafka.utils.{DelayedItem, Pool, ShutdownableThread} +import org.apache.kafka.common.errors.KafkaStorageException import kafka.common.{ClientIdAndBroker, KafkaException} import kafka.metrics.KafkaMetricsGroup import kafka.utils.CoreUtils.inLock import org.apache.kafka.common.errors.CorruptRecordException import org.apache.kafka.common.protocol.Errors import AbstractFetcherThread._ - import scala.collection.{Map, Set, mutable} import scala.collection.JavaConverters._ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicLong - import com.yammer.metrics.core.Gauge import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.internals.{FatalExitError, PartitionStates} @@ -198,7 +195,10 @@ abstract class AbstractFetcherThread(name: String, // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and // should get fixed in the subsequent fetches logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentPartitionFetchState.fetchOffset + " error " + ime.getMessage) - updatePartitionsWithError(topicPartition); + updatePartitionsWithError(topicPartition) + case e: KafkaStorageException => + logger.error(s"Error while processing data for partition $topicPartition", e) + updatePartitionsWithError(topicPartition) case e: Throwable => throw new KafkaException("error processing data for partition [%s,%d] offset %d" .format(topic, partitionId, currentPartitionFetchState.fetchOffset), e) http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala index 8630026..8ac9864 100755 --- a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala +++ b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala @@ -30,7 +30,6 @@ case class BrokerMetadata(brokerId: Int) */ class BrokerMetadataCheckpoint(val file: File) extends Logging { private val lock = new Object() - Files.deleteIfExists(new File(file + ".tmp").toPath()) // try to delete any existing temp files for cleanliness def write(brokerMetadata: BrokerMetadata) = { lock synchronized { @@ -57,6 +56,8 @@ class BrokerMetadataCheckpoint(val file: File) extends Logging { } def read(): Option[BrokerMetadata] = { + Files.deleteIfExists(new File(file + ".tmp").toPath()) // try to delete any existing temp files for cleanliness + lock synchronized { try { val brokerMetaProps = new VerifiableProperties(Utils.loadProps(file.getAbsolutePath())) http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala index e5b301c..a6a8202 100644 --- a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala +++ b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala @@ -20,9 +20,7 @@ package kafka.server import java.util.concurrent.TimeUnit -import com.yammer.metrics.core.Meter import kafka.metrics.KafkaMetricsGroup -import kafka.utils.Pool import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.requests.DeleteRecordsResponse @@ -76,12 +74,16 @@ class DelayedDeleteRecords(delayMs: Long, if (status.acksPending) { val (lowWatermarkReached, error, lw) = replicaManager.getPartition(topicPartition) match { case Some(partition) => - partition.leaderReplicaIfLocal match { - case Some(_) => - val leaderLW = partition.lowWatermarkIfLeader - (leaderLW >= status.requiredOffset, Errors.NONE, leaderLW) - case None => - (false, Errors.NOT_LEADER_FOR_PARTITION, DeleteRecordsResponse.INVALID_LOW_WATERMARK) + if (partition eq ReplicaManager.OfflinePartition) { + (false, Errors.KAFKA_STORAGE_ERROR, DeleteRecordsResponse.INVALID_LOW_WATERMARK) + } else { + partition.leaderReplicaIfLocal match { + case Some(_) => + val leaderLW = partition.lowWatermarkIfLeader + (leaderLW >= status.requiredOffset, Errors.NONE, leaderLW) + case None => + (false, Errors.NOT_LEADER_FOR_PARTITION, DeleteRecordsResponse.INVALID_LOW_WATERMARK) + } } case None => (false, Errors.UNKNOWN_TOPIC_OR_PARTITION, DeleteRecordsResponse.INVALID_LOW_WATERMARK) http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/server/DelayedFetch.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index 8a9ce02..e478053 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit import kafka.metrics.KafkaMetricsGroup import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.{NotLeaderForPartitionException, UnknownTopicOrPartitionException} +import org.apache.kafka.common.errors.{NotLeaderForPartitionException, UnknownTopicOrPartitionException, KafkaStorageException} import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.IsolationLevel @@ -71,6 +71,7 @@ class DelayedFetch(delayMs: Long, * Case B: This broker does not know of some partitions it tries to fetch * Case C: The fetch offset locates not on the last segment of the log * Case D: The accumulated bytes from all the fetching partitions exceeds the minimum bytes + * Case E: The partition is in an offline log directory on this broker * * Upon completion, should return whatever data is available for each valid partition */ @@ -117,6 +118,9 @@ class DelayedFetch(delayMs: Long, } } } catch { + case _: KafkaStorageException => // Case E + debug("Partition %s is in an offline log directory, satisfy %s immediately".format(topicPartition, fetchMetadata)) + return forceComplete() case _: UnknownTopicOrPartitionException => // Case B debug("Broker no longer know of %s, satisfy %s immediately".format(topicPartition, fetchMetadata)) return forceComplete() http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/server/DelayedProduce.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 0ff8d34..0d452cc 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -95,7 +95,10 @@ class DelayedProduce(delayMs: Long, if (status.acksPending) { val (hasEnough, error) = replicaManager.getPartition(topicPartition) match { case Some(partition) => - partition.checkEnoughReplicasReachOffset(status.requiredOffset) + if (partition eq ReplicaManager.OfflinePartition) + (false, Errors.KAFKA_STORAGE_ERROR) + else + partition.checkEnoughReplicasReachOffset(status.requiredOffset) case None => // Case A (false, Errors.UNKNOWN_TOPIC_OR_PARTITION) http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 9e9299f..1fb8901 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger import kafka.admin.{AdminUtils, RackAwareMode} import kafka.api.{ApiVersion, ControlledShutdownRequest, ControlledShutdownResponse, KAFKA_0_11_0_IV0} import kafka.cluster.Partition -import kafka.common.{KafkaStorageException, OffsetAndMetadata, OffsetMetadata, TopicAndPartition} +import kafka.common.{OffsetAndMetadata, OffsetMetadata, TopicAndPartition} import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota} import kafka.controller.KafkaController import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult} @@ -36,7 +36,7 @@ import kafka.log.{Log, LogManager, TimestampOffset} import kafka.network.{RequestChannel, RequestOrResponseSend} import kafka.security.SecurityUtils import kafka.security.auth._ -import kafka.utils.{CoreUtils, Exit, Logging, ZKGroupTopicDirs, ZkUtils} +import kafka.utils.{CoreUtils, Logging, ZKGroupTopicDirs, ZkUtils} import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.FatalExitError import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal} @@ -54,7 +54,7 @@ import org.apache.kafka.common.requests.SaslHandshakeResponse import org.apache.kafka.common.resource.{Resource => AdminResource} import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding} -import scala.collection._ +import scala.collection.{mutable, _} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.util.{Failure, Success, Try} @@ -144,40 +144,33 @@ class KafkaApis(val requestChannel: RequestChannel, val correlationId = request.header.correlationId val leaderAndIsrRequest = request.body[LeaderAndIsrRequest] - try { - def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]) { - // for each new leader or follower, call coordinator to handle consumer group migration. - // this callback is invoked under the replica state change lock to ensure proper order of - // leadership changes - updatedLeaders.foreach { partition => - if (partition.topic == GROUP_METADATA_TOPIC_NAME) - groupCoordinator.handleGroupImmigration(partition.partitionId) - else if (partition.topic == TRANSACTION_STATE_TOPIC_NAME) - txnCoordinator.handleTxnImmigration(partition.partitionId, partition.getLeaderEpoch) - } - - updatedFollowers.foreach { partition => - if (partition.topic == GROUP_METADATA_TOPIC_NAME) - groupCoordinator.handleGroupEmigration(partition.partitionId) - else if (partition.topic == TRANSACTION_STATE_TOPIC_NAME) - txnCoordinator.handleTxnEmigration(partition.partitionId, partition.getLeaderEpoch) - } + def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]) { + // for each new leader or follower, call coordinator to handle consumer group migration. + // this callback is invoked under the replica state change lock to ensure proper order of + // leadership changes + updatedLeaders.foreach { partition => + if (partition.topic == GROUP_METADATA_TOPIC_NAME) + groupCoordinator.handleGroupImmigration(partition.partitionId) + else if (partition.topic == TRANSACTION_STATE_TOPIC_NAME) + txnCoordinator.handleTxnImmigration(partition.partitionId, partition.getLeaderEpoch) } - if (authorize(request.session, ClusterAction, Resource.ClusterResource)) { - val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, onLeadershipChange) - val leaderAndIsrResponse = new LeaderAndIsrResponse(result.error, result.responseMap.asJava) - sendResponseExemptThrottle(RequestChannel.Response(request, leaderAndIsrResponse)) - } else { - val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap - sendResponseMaybeThrottle(request, _ => - new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava)) + updatedFollowers.foreach { partition => + if (partition.topic == GROUP_METADATA_TOPIC_NAME) + groupCoordinator.handleGroupEmigration(partition.partitionId) + else if (partition.topic == TRANSACTION_STATE_TOPIC_NAME) + txnCoordinator.handleTxnEmigration(partition.partitionId, partition.getLeaderEpoch) } - } catch { - case e: FatalExitError => throw e - case e: KafkaStorageException => - fatal("Disk error during leadership change.", e) - Exit.halt(1) + } + + if (authorize(request.session, ClusterAction, Resource.ClusterResource)) { + val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, onLeadershipChange) + val leaderAndIsrResponse = new LeaderAndIsrResponse(result.error, result.responseMap.asJava) + sendResponseExemptThrottle(RequestChannel.Response(request, leaderAndIsrResponse)) + } else { + val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap + sendResponseMaybeThrottle(request, _ => + new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava)) } } @@ -681,7 +674,9 @@ class KafkaApis(val requestChannel: RequestChannel, } catch { // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages // are typically transient and there is no value in logging the entire stack trace for the same - case e @ ( _ : UnknownTopicOrPartitionException | _ : NotLeaderForPartitionException) => + case e @ (_ : UnknownTopicOrPartitionException | + _ : NotLeaderForPartitionException | + _ : KafkaStorageException) => debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( correlationId, clientId, topicPartition, e.getMessage)) (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) @@ -750,6 +745,7 @@ class KafkaApis(val requestChannel: RequestChannel, // would have received a clear exception and there is no value in logging the entire stack trace for the same case e @ (_ : UnknownTopicOrPartitionException | _ : NotLeaderForPartitionException | + _ : KafkaStorageException | _ : UnsupportedForMessageFormatException) => debug(s"Offset request with correlation id $correlationId from client $clientId on " + s"partition $topicPartition failed due to ${e.getMessage}") @@ -1527,7 +1523,7 @@ class KafkaApis(val requestChannel: RequestChannel, case e: Exception => error(s"Received an exception while trying to update the offsets cache on transaction marker append", e) val updatedErrors = new ConcurrentHashMap[TopicPartition, Errors]() - successfulOffsetsPartitions.foreach(updatedErrors.put(_, Errors.UNKNOWN)) + successfulOffsetsPartitions.foreach(updatedErrors.put(_, Errors.UNKNOWN_SERVER_ERROR)) updateErrors(producerId, updatedErrors) } } @@ -1865,7 +1861,7 @@ class KafkaApis(val requestChannel: RequestChannel, Some(new AclDeletionResult(ApiError.fromThrowable(throwable), aclBinding)) } }.asJava - + filterResponseMap.put(i, new AclFilterResponse(deletionResults)) } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index cc34e14..fc9e4b8 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -47,7 +47,7 @@ import org.apache.kafka.common.utils.{AppInfoParser, Time} import org.apache.kafka.common.{ClusterResource, Node} import scala.collection.JavaConverters._ -import scala.collection.{Map, mutable} +import scala.collection.{Seq, Map, mutable} object KafkaServer { // Copy the subset of properties that are relevant to Logs @@ -110,6 +110,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP var socketServer: SocketServer = null var requestHandlerPool: KafkaRequestHandlerPool = null + var logDirFailureChannel: LogDirFailureChannel = null var logManager: LogManager = null var replicaManager: ReplicaManager = null @@ -195,7 +196,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP info(s"Cluster ID = $clusterId") /* generate brokerId */ - config.brokerId = getBrokerId + val (brokerId, initialOfflineDirs) = getBrokerIdAndOfflineDirs + config.brokerId = brokerId this.logIdent = "[Kafka Server " + config.brokerId + "], " /* create and configure metrics */ @@ -211,8 +213,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP quotaManagers = QuotaFactory.instantiate(config, metrics, time) notifyClusterListeners(kafkaMetricsReporters ++ reporters.asScala) + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size) + /* start log manager */ - logManager = LogManager(config, zkUtils, brokerState, kafkaScheduler, time, brokerTopicStats) + logManager = LogManager(config, initialOfflineDirs, zkUtils, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel) logManager.startup() metadataCache = new MetadataCache(config.brokerId) @@ -307,7 +311,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP protected def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager = new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown, quotaManagers.follower, - brokerTopicStats, metadataCache) + brokerTopicStats, metadataCache, logDirFailureChannel) private def initZk(): ZkUtils = { info(s"Connecting to zookeeper on ${config.zkConnect}") @@ -582,7 +586,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) { CoreUtils.swallow(controlledShutdown()) brokerState.newState(BrokerShuttingDown) - + if (socketServer != null) CoreUtils.swallow(socketServer.shutdown()) if (requestHandlerPool != null) @@ -651,16 +655,25 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP * <li> config has broker.id and there is no meta.properties file, creates new meta.properties and stores broker.id * <ol> * - * @return A brokerId. + * The log directories whose meta.properties can not be accessed due to IOException will be returned to the caller + * + * @return A 2-tuple containing the brokerId and a sequence of offline log directories. */ - private def getBrokerId: Int = { + private def getBrokerIdAndOfflineDirs: (Int, Seq[String]) = { var brokerId = config.brokerId val brokerIdSet = mutable.HashSet[Int]() + val offlineDirs = mutable.ArrayBuffer.empty[String] for (logDir <- config.logDirs) { - val brokerMetadataOpt = brokerMetadataCheckpoints(logDir).read() - brokerMetadataOpt.foreach { brokerMetadata => - brokerIdSet.add(brokerMetadata.brokerId) + try { + val brokerMetadataOpt = brokerMetadataCheckpoints(logDir).read() + brokerMetadataOpt.foreach { brokerMetadata => + brokerIdSet.add(brokerMetadata.brokerId) + } + } catch { + case e : IOException => + offlineDirs += logDir + error(s"Fail to read ${brokerMetaPropsFile} under log directory ${logDir}", e) } } @@ -678,16 +691,17 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP else if(brokerIdSet.size == 1) // pick broker.id from meta.properties brokerId = brokerIdSet.last - brokerId + + (brokerId, offlineDirs) } private def checkpointBrokerId(brokerId: Int) { var logDirsWithoutMetaProps: List[String] = List() - for (logDir <- config.logDirs) { - val brokerMetadataOpt = brokerMetadataCheckpoints(logDir).read() + for (logDir <- logManager.liveLogDirs) { + val brokerMetadataOpt = brokerMetadataCheckpoints(logDir.getAbsolutePath).read() if(brokerMetadataOpt.isEmpty) - logDirsWithoutMetaProps ++= List(logDir) + logDirsWithoutMetaProps ++= List(logDir.getAbsolutePath) } for(logDir <- logDirsWithoutMetaProps) { http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/server/LogDirFailureChannel.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/LogDirFailureChannel.scala b/core/src/main/scala/kafka/server/LogDirFailureChannel.scala new file mode 100644 index 0000000..23d9986 --- /dev/null +++ b/core/src/main/scala/kafka/server/LogDirFailureChannel.scala @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package kafka.server + +import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap} + +/* + * LogDirFailureChannel allows an external thread to block waiting for new offline log dir. + * + * LogDirFailureChannel should be a singleton object which can be accessed by any class that does disk-IO operation. + * If IOException is encountered while accessing a log directory, the corresponding class can insert the the log directory name + * to the LogDirFailureChannel using maybeAddLogFailureEvent(). Then a thread which is blocked waiting for new offline log directories + * can take the name of the new offline log directory out of the LogDirFailureChannel and handles the log failure properly. + * + */ +class LogDirFailureChannel(logDirNum: Int) { + + private val offlineLogDirs = new ConcurrentHashMap[String, String] + private val logDirFailureEvent = new ArrayBlockingQueue[String](logDirNum) + + /* + * If the given logDir is not already offline, add it to the + * set of offline log dirs and enqueue it to the logDirFailureEvent queue + */ + def maybeAddLogFailureEvent(logDir: String): Unit = { + if (offlineLogDirs.putIfAbsent(logDir, logDir) == null) { + logDirFailureEvent.add(logDir) + } + } + + /* + * Get the next offline log dir from logDirFailureEvent queue. + * The method will wait if necessary until a new offline log directory becomes available + */ + def takeNextLogFailureEvent(): String = { + logDirFailureEvent.take() + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/server/MetadataCache.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index 466645b..2c28df7 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -31,7 +31,7 @@ import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.requests.{MetadataResponse, PartitionState, UpdateMetadataRequest} +import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest} /** * A cache for the state (e.g., current leader) of each partition. This cache is updated through @@ -39,7 +39,7 @@ import org.apache.kafka.common.requests.{MetadataResponse, PartitionState, Updat */ class MetadataCache(brokerId: Int) extends Logging { private val stateChangeLogger = KafkaController.stateChangeLogger - private val cache = mutable.Map[String, mutable.Map[Int, PartitionStateInfo]]() + private val cache = mutable.Map[String, mutable.Map[Int, MetadataPartitionState]]() private var controllerId: Option[Int] = None private val aliveBrokers = mutable.Map[Int, Broker]() private val aliveNodes = mutable.Map[Int, collection.Map[ListenerName, Node]]() @@ -73,12 +73,13 @@ class MetadataCache(brokerId: Int) extends Logging { val replicas = partitionState.allReplicas val replicaInfo = getEndpoints(replicas, listenerName, errorUnavailableEndpoints) + val offlineReplicaInfo = getEndpoints(partitionState.offlineReplicas, listenerName, errorUnavailableEndpoints) maybeLeader match { case None => debug(s"Error while fetching metadata for $topicPartition: leader not available") new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, partitionId, Node.noNode(), - replicaInfo.asJava, java.util.Collections.emptyList()) + replicaInfo.asJava, java.util.Collections.emptyList(), offlineReplicaInfo.asJava) case Some(leader) => val isr = leaderAndIsr.isr @@ -89,15 +90,15 @@ class MetadataCache(brokerId: Int) extends Logging { s"following brokers ${replicas.filterNot(replicaInfo.map(_.id).contains).mkString(",")}") new MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, partitionId, leader, - replicaInfo.asJava, isrInfo.asJava) + replicaInfo.asJava, isrInfo.asJava, offlineReplicaInfo.asJava) } else if (isrInfo.size < isr.size) { debug(s"Error while fetching metadata for $topicPartition: in sync replica information not available for " + s"following brokers ${isr.filterNot(isrInfo.map(_.id).contains).mkString(",")}") new MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, partitionId, leader, - replicaInfo.asJava, isrInfo.asJava) + replicaInfo.asJava, isrInfo.asJava, offlineReplicaInfo.asJava) } else { new MetadataResponse.PartitionMetadata(Errors.NONE, partitionId, leader, replicaInfo.asJava, - isrInfo.asJava) + isrInfo.asJava, offlineReplicaInfo.asJava) } } } @@ -147,14 +148,14 @@ class MetadataCache(brokerId: Int) extends Logging { private def addOrUpdatePartitionInfo(topic: String, partitionId: Int, - stateInfo: PartitionStateInfo) { + stateInfo: MetadataPartitionState) { inWriteLock(partitionMetadataLock) { val infos = cache.getOrElseUpdate(topic, mutable.Map()) infos(partitionId) = stateInfo } } - def getPartitionInfo(topic: String, partitionId: Int): Option[PartitionStateInfo] = { + def getPartitionInfo(topic: String, partitionId: Int): Option[MetadataPartitionState] = { inReadLock(partitionMetadataLock) { cache.get(topic).flatMap(_.get(partitionId)) } @@ -223,10 +224,10 @@ class MetadataCache(brokerId: Int) extends Logging { } } - private def partitionStateToPartitionStateInfo(partitionState: PartitionState): PartitionStateInfo = { + private def partitionStateToPartitionStateInfo(partitionState: UpdateMetadataRequest.PartitionState): MetadataPartitionState = { val leaderAndIsr = LeaderAndIsr(partitionState.leader, partitionState.leaderEpoch, partitionState.isr.asScala.map(_.toInt).toList, partitionState.zkVersion) val leaderInfo = LeaderIsrAndControllerEpoch(leaderAndIsr, partitionState.controllerEpoch) - PartitionStateInfo(leaderInfo, partitionState.replicas.asScala.map(_.toInt)) + MetadataPartitionState(leaderInfo, partitionState.replicas.asScala.map(_.toInt), partitionState.offlineReplicas.asScala.map(_.toInt)) } def contains(topic: String): Boolean = {
