http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index d7420dd..e856ca1 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -18,16 +18,13 @@ package kafka.server import java.util - import kafka.admin.AdminUtils import kafka.api.{FetchRequest => _, _} import kafka.cluster.{BrokerEndPoint, Replica} -import kafka.common.KafkaStorageException import kafka.log.LogConfig import kafka.server.ReplicaFetcherThread._ import kafka.server.epoch.LeaderEpochCache import org.apache.kafka.common.requests.EpochEndOffset._ -import kafka.utils.Exit import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.internals.FatalExitError import org.apache.kafka.common.metrics.Metrics @@ -35,7 +32,6 @@ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.requests.{EpochEndOffset, FetchResponse, ListOffsetRequest, ListOffsetResponse, OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse, FetchRequest => JFetchRequest} import org.apache.kafka.common.utils.Time - import scala.collection.JavaConverters._ import scala.collection.{Map, mutable} @@ -83,41 +79,35 @@ class ReplicaFetcherThread(name: String, // process fetched data def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PartitionData) { - try { - val replica = replicaMgr.getReplica(topicPartition).get - val records = partitionData.toRecords - - maybeWarnIfOversizedRecords(records, topicPartition) - - if (fetchOffset != replica.logEndOffset.messageOffset) - throw new RuntimeException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(topicPartition, fetchOffset, replica.logEndOffset.messageOffset)) - if (logger.isTraceEnabled) - trace("Follower %d has replica log end offset %d for partition %s. Received %d messages and leader hw %d" - .format(replica.brokerId, replica.logEndOffset.messageOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark)) - - // Append the leader's messages to the log - replica.log.get.appendAsFollower(records) - - if (logger.isTraceEnabled) - trace("Follower %d has replica log end offset %d after appending %d bytes of messages for partition %s" - .format(replica.brokerId, replica.logEndOffset.messageOffset, records.sizeInBytes, topicPartition)) - val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.highWatermark) - val leaderLogStartOffset = partitionData.logStartOffset - // for the follower replica, we do not need to keep - // its segment base offset the physical position, - // these values will be computed upon making the leader - replica.highWatermark = new LogOffsetMetadata(followerHighWatermark) - replica.maybeIncrementLogStartOffset(leaderLogStartOffset) - if (logger.isTraceEnabled) - trace(s"Follower ${replica.brokerId} set replica high watermark for partition $topicPartition to $followerHighWatermark") - if (quota.isThrottled(topicPartition)) - quota.record(records.sizeInBytes) - replicaMgr.brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes) - } catch { - case e: KafkaStorageException => - fatal(s"Disk error while replicating data for $topicPartition", e) - Exit.halt(1) - } + val replica = replicaMgr.getReplica(topicPartition).get + val records = partitionData.toRecords + + maybeWarnIfOversizedRecords(records, topicPartition) + + if (fetchOffset != replica.logEndOffset.messageOffset) + throw new RuntimeException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(topicPartition, fetchOffset, replica.logEndOffset.messageOffset)) + if (logger.isTraceEnabled) + trace("Follower %d has replica log end offset %d for partition %s. Received %d messages and leader hw %d" + .format(replica.brokerId, replica.logEndOffset.messageOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark)) + + // Append the leader's messages to the log + replica.log.get.appendAsFollower(records) + + if (logger.isTraceEnabled) + trace("Follower %d has replica log end offset %d after appending %d bytes of messages for partition %s" + .format(replica.brokerId, replica.logEndOffset.messageOffset, records.sizeInBytes, topicPartition)) + val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.highWatermark) + val leaderLogStartOffset = partitionData.logStartOffset + // for the follower replica, we do not need to keep + // its segment base offset the physical position, + // these values will be computed upon making the leader + replica.highWatermark = new LogOffsetMetadata(followerHighWatermark) + replica.maybeIncrementLogStartOffset(leaderLogStartOffset) + if (logger.isTraceEnabled) + trace(s"Follower ${replica.brokerId} set replica high watermark for partition $topicPartition to $followerHighWatermark") + if (quota.isThrottled(topicPartition)) + quota.record(records.sizeInBytes) + replicaMgr.brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes) } def maybeWarnIfOversizedRecords(records: MemoryRecords, topicPartition: TopicPartition): Unit = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 853b7c4..40887be 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -16,14 +16,13 @@ */ package kafka.server -import java.io.{File, IOException} +import java.io.File import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import com.yammer.metrics.core.Gauge import kafka.api._ import kafka.cluster.{Partition, Replica} -import kafka.common.KafkaStorageException import kafka.controller.KafkaController import kafka.log.{Log, LogAppendInfo, LogManager} import kafka.metrics.KafkaMetricsGroup @@ -31,11 +30,12 @@ import kafka.server.QuotaFactory.UnboundedQuota import kafka.server.checkpoints.OffsetCheckpointFile import kafka.utils._ import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordException, InvalidTimestampException, InvalidTopicException, NotEnoughReplicasException, NotLeaderForPartitionException, OffsetOutOfRangeException, PolicyViolationException, _} +import org.apache.kafka.common.errors.{KafkaStorageException, ControllerMovedException, CorruptRecordException, InvalidTimestampException, InvalidTopicException, NotEnoughReplicasException, NotLeaderForPartitionException, OffsetOutOfRangeException, PolicyViolationException, _} import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION +import org.apache.kafka.common.protocol.Errors.KAFKA_STORAGE_ERROR import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.EpochEndOffset._ import org.apache.kafka.common.requests.FetchRequest.PartitionData @@ -123,6 +123,7 @@ object ReplicaManager { val HighWatermarkFilename = "replication-offset-checkpoint" val IsrChangePropagationBlackOut = 5000L val IsrChangePropagationInterval = 60000L + val OfflinePartition = new Partition("", -1, null, null, isOffline = true) } class ReplicaManager(val config: KafkaConfig, @@ -135,6 +136,7 @@ class ReplicaManager(val config: KafkaConfig, quotaManager: ReplicationQuotaManager, val brokerTopicStats: BrokerTopicStats, val metadataCache: MetadataCache, + logDirFailureChannel: LogDirFailureChannel, val delayedProducePurgatory: DelayedOperationPurgatory[DelayedProduce], val delayedFetchPurgatory: DelayedOperationPurgatory[DelayedFetch], val delayedDeleteRecordsPurgatory: DelayedOperationPurgatory[DelayedDeleteRecords], @@ -150,9 +152,10 @@ class ReplicaManager(val config: KafkaConfig, quotaManager: ReplicationQuotaManager, brokerTopicStats: BrokerTopicStats, metadataCache: MetadataCache, + logDirFailureChannel: LogDirFailureChannel, threadNamePrefix: Option[String] = None) { this(config, metrics, time, zkUtils, scheduler, logManager, isShuttingDown, - quotaManager, brokerTopicStats, metadataCache, + quotaManager, brokerTopicStats, metadataCache, logDirFailureChannel, DelayedOperationPurgatory[DelayedProduce]( purgatoryName = "Produce", brokerId = config.brokerId, purgeInterval = config.producerPurgatoryPurgeIntervalRequests), @@ -173,13 +176,27 @@ class ReplicaManager(val config: KafkaConfig, private val replicaStateChangeLock = new Object val replicaFetcherManager = createReplicaFetcherManager(metrics, time, threadNamePrefix, quotaManager) private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false) - val highWatermarkCheckpoints = config.logDirs.map(dir => (new File(dir).getAbsolutePath, new OffsetCheckpointFile(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap + @volatile var highWatermarkCheckpoints = logManager.liveLogDirs.map(dir => + (dir.getAbsolutePath, new OffsetCheckpointFile(new File(dir, ReplicaManager.HighWatermarkFilename), logDirFailureChannel))).toMap + private var hwThreadInitialized = false this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: " val stateChangeLogger = KafkaController.stateChangeLogger private val isrChangeSet: mutable.Set[TopicPartition] = new mutable.HashSet[TopicPartition]() private val lastIsrChangeMs = new AtomicLong(System.currentTimeMillis()) private val lastIsrPropagationMs = new AtomicLong(System.currentTimeMillis()) + private var logDirFailureHandler: LogDirFailureHandler = null + + private class LogDirFailureHandler(name: String, haltBrokerOnDirFailure: Boolean) extends ShutdownableThread(name) { + override def doWork() { + val newOfflineLogDir = logDirFailureChannel.takeNextLogFailureEvent() + if (haltBrokerOnDirFailure) { + fatal(s"Halting broker because dir $newOfflineLogDir is offline") + Exit.halt(1) + } + handleLogDirFailure(newOfflineLogDir) + } + } val leaderCount = newGauge( "LeaderCount", @@ -193,6 +210,12 @@ class ReplicaManager(val config: KafkaConfig, def value = allPartitions.size } ) + val offlineReplicaCount = newGauge( + "OfflineReplicaCount", + new Gauge[Int] { + def value = allPartitions.values.count(_ eq ReplicaManager.OfflinePartition) + } + ) val underReplicatedPartitions = newGauge( "UnderReplicatedPartitions", new Gauge[Int] { @@ -277,20 +300,30 @@ class ReplicaManager(val config: KafkaConfig, // A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 1.5 before it is removed from ISR scheduler.schedule("isr-expiration", maybeShrinkIsr _, period = config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS) scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges _, period = 2500L, unit = TimeUnit.MILLISECONDS) + val haltBrokerOnFailure = config.interBrokerProtocolVersion < KAFKA_0_11_1_IV0 + logDirFailureHandler = new LogDirFailureHandler("LogDirFailureHandler", haltBrokerOnFailure) + logDirFailureHandler.start() } def stopReplica(topicPartition: TopicPartition, deletePartition: Boolean): Errors = { stateChangeLogger.trace(s"Broker $localBrokerId handling stop replica (delete=$deletePartition) for partition $topicPartition") val error = Errors.NONE getPartition(topicPartition) match { - case Some(_) => + case Some(partition) => if (deletePartition) { + if (partition eq ReplicaManager.OfflinePartition) + throw new KafkaStorageException(s"Partition $topicPartition is on an offline disk") val removedPartition = allPartitions.remove(topicPartition) if (removedPartition != null) { - removedPartition.delete() // this will delete the local log - val topicHasPartitions = allPartitions.keys.exists(tp => topicPartition.topic == tp.topic) + val topicHasPartitions = allPartitions.values.exists(partition => topicPartition.topic == partition.topic) if (!topicHasPartitions) brokerTopicStats.removeMetrics(topicPartition.topic) + // this will delete the local log. This call may throw exception if the log is on offline directory + removedPartition.delete() + } else if (logManager.getLog(topicPartition).isDefined) { + // Delete log and corresponding folders in case replica manager doesn't hold them anymore. + // This could happen when topic is being deleted while broker is down and recovers. + logManager.asyncDelete(topicPartition) } } case None => @@ -317,8 +350,14 @@ class ReplicaManager(val config: KafkaConfig, // First stop fetchers for all partitions, then stop the corresponding replicas replicaFetcherManager.removeFetcherForPartitions(partitions) for (topicPartition <- partitions){ - val error = stopReplica(topicPartition, stopReplicaRequest.deletePartitions) - responseMap.put(topicPartition, error) + try { + val error = stopReplica(topicPartition, stopReplicaRequest.deletePartitions) + responseMap.put(topicPartition, error) + } catch { + case e: KafkaStorageException => + stateChangeLogger.error(s"Broker $localBrokerId ignoring stop replica (delete=${stopReplicaRequest.deletePartitions}) for partition $topicPartition due to storage exception", e) + responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR) + } } (responseMap, Errors.NONE) } @@ -332,8 +371,15 @@ class ReplicaManager(val config: KafkaConfig, Option(allPartitions.get(topicPartition)) def getReplicaOrException(topicPartition: TopicPartition): Replica = { - getReplica(topicPartition).getOrElse { - throw new ReplicaNotAvailableException(s"Replica $localBrokerId is not available for partition $topicPartition") + getPartition(topicPartition) match { + case Some(partition) => + if (partition eq ReplicaManager.OfflinePartition) + throw new KafkaStorageException(s"Replica $localBrokerId is in an offline log directory for partition $topicPartition") + else + partition.getReplica(localBrokerId).getOrElse( + throw new ReplicaNotAvailableException(s"Replica $localBrokerId is not available for partition $topicPartition")) + case None => + throw new ReplicaNotAvailableException(s"Replica $localBrokerId is not available for partition $topicPartition") } } @@ -343,7 +389,9 @@ class ReplicaManager(val config: KafkaConfig, case None => throw new UnknownTopicOrPartitionException(s"Partition $topicPartition doesn't exist on $localBrokerId") case Some(partition) => - partition.leaderReplicaIfLocal match { + if (partition eq ReplicaManager.OfflinePartition) + throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory on broker $localBrokerId") + else partition.leaderReplicaIfLocal match { case Some(leaderReplica) => leaderReplica case None => throw new NotLeaderForPartitionException(s"Leader not local for partition $topicPartition on broker $localBrokerId") @@ -352,10 +400,17 @@ class ReplicaManager(val config: KafkaConfig, } def getReplica(topicPartition: TopicPartition, replicaId: Int): Option[Replica] = - getPartition(topicPartition).flatMap(_.getReplica(replicaId)) + getPartition(topicPartition).filter(_ ne ReplicaManager.OfflinePartition).flatMap(_.getReplica(replicaId)) def getReplica(tp: TopicPartition): Option[Replica] = getReplica(tp, localBrokerId) + def getLogDir(topicPartition: TopicPartition): Option[String] = { + getReplica(topicPartition).flatMap(_.log) match { + case Some(log) => Some(log.dir.getParent) + case None => None + } + } + /** * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas; * the callback function will be triggered either when timeout or the required acks are satisfied; @@ -422,8 +477,14 @@ class ReplicaManager(val config: KafkaConfig, (topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(new InvalidTopicException(s"Cannot delete records of internal topic ${topicPartition.topic}")))) } else { try { - val partition = getPartition(topicPartition).getOrElse( - throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d".format(topicPartition, localBrokerId))) + val partition = getPartition(topicPartition) match { + case Some(p) => + if (p eq ReplicaManager.OfflinePartition) + throw new KafkaStorageException("Partition %s is in an offline log directory on broker %d".format(topicPartition, localBrokerId)) + p + case None => + throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d".format(topicPartition, localBrokerId)) + } val convertedOffset = if (requestedOffset == DeleteRecordsRequest.HIGH_WATERMARK) { partition.leaderReplicaIfLocal match { @@ -443,14 +504,11 @@ class ReplicaManager(val config: KafkaConfig, } catch { // NOTE: Failed produce requests metric is not incremented for known exceptions // it is supposed to indicate un-expected failures of a broker in handling a produce request - case e: KafkaStorageException => - fatal("Halting due to unrecoverable I/O error while handling DeleteRecordsRequest: ", e) - Runtime.getRuntime.halt(1) - (topicPartition, null) case e@ (_: UnknownTopicOrPartitionException | _: NotLeaderForPartitionException | _: OffsetOutOfRangeException | _: PolicyViolationException | + _: KafkaStorageException | _: NotEnoughReplicasException) => (topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(e))) case t: Throwable => @@ -543,6 +601,8 @@ class ReplicaManager(val config: KafkaConfig, val partitionOpt = getPartition(topicPartition) val info = partitionOpt match { case Some(partition) => + if (partition eq ReplicaManager.OfflinePartition) + throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory on broker $localBrokerId") partition.appendRecordsToLeader(records, isFromClient, requiredAcks) case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d" @@ -567,15 +627,12 @@ class ReplicaManager(val config: KafkaConfig, } catch { // NOTE: Failed produce requests metric is not incremented for known exceptions // it is supposed to indicate un-expected failures of a broker in handling a produce request - case e: KafkaStorageException => - fatal("Halting due to unrecoverable I/O error while handling produce request: ", e) - Exit.halt(1) - (topicPartition, null) case e@ (_: UnknownTopicOrPartitionException | _: NotLeaderForPartitionException | _: RecordTooLargeException | _: RecordBatchTooLargeException | _: CorruptRecordException | + _: KafkaStorageException | _: InvalidTimestampException) => (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e))) case t: Throwable => @@ -747,6 +804,7 @@ class ReplicaManager(val config: KafkaConfig, case e@ (_: UnknownTopicOrPartitionException | _: NotLeaderForPartitionException | _: ReplicaNotAvailableException | + _: KafkaStorageException | _: OffsetOutOfRangeException) => LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), highWatermark = -1L, @@ -793,7 +851,7 @@ class ReplicaManager(val config: KafkaConfig, * the quota is exceeded and the replica is not in sync. */ def shouldLeaderThrottle(quota: ReplicaQuota, topicPartition: TopicPartition, replicaId: Int): Boolean = { - val isReplicaInSync = getPartition(topicPartition).flatMap { partition => + val isReplicaInSync = getPartition(topicPartition).filter(_ ne ReplicaManager.OfflinePartition).flatMap { partition => partition.getReplica(replicaId).map(partition.inSyncReplicas.contains) }.getOrElse(false) quota.isThrottled(topicPartition) && quota.isQuotaExceeded && !isReplicaInSync @@ -819,7 +877,8 @@ class ReplicaManager(val config: KafkaConfig, } } - def becomeLeaderOrFollower(correlationId: Int,leaderAndISRRequest: LeaderAndIsrRequest, + def becomeLeaderOrFollower(correlationId: Int, + leaderAndISRRequest: LeaderAndIsrRequest, onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): BecomeLeaderOrFollowerResult = { leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) => stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]" @@ -842,9 +901,14 @@ class ReplicaManager(val config: KafkaConfig, leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) => val partition = getOrCreatePartition(topicPartition) val partitionLeaderEpoch = partition.getLeaderEpoch - // If the leader epoch is valid record the epoch of the controller that made the leadership decision. - // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path - if (partitionLeaderEpoch < stateInfo.leaderEpoch) { + if (partition eq ReplicaManager.OfflinePartition) { + stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " + + "epoch %d for partition [%s,%d] as the local replica for the partition is in an offline log directory") + .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch, topicPartition.topic, topicPartition.partition)) + responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR) + } else if (partitionLeaderEpoch < stateInfo.leaderEpoch) { + // If the leader epoch is valid record the epoch of the controller that made the leadership decision. + // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path if(stateInfo.replicas.contains(localBrokerId)) partitionState.put(partition, stateInfo) else { @@ -878,6 +942,17 @@ class ReplicaManager(val config: KafkaConfig, else Set.empty[Partition] + leaderAndISRRequest.partitionStates.asScala.keys.foreach( topicPartition => + /* + * If there is offline log directory, a Partition object may have been created by getOrCreatePartition() + * before getOrCreateReplica() failed to create local replica due to KafkaStorageException. + * In this case ReplicaManager.allPartitions will map this topic-partition to an empty Partition object. + * we need to map this topic-partition to OfflinePartition instead. + */ + if (getReplica(topicPartition).isEmpty && (allPartitions.get(topicPartition) ne ReplicaManager.OfflinePartition)) + allPartitions.put(topicPartition, ReplicaManager.OfflinePartition) + ) + // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions // have been completely populated before starting the checkpointing there by avoiding weird race conditions if (!hwThreadInitialized) { @@ -885,7 +960,6 @@ class ReplicaManager(val config: KafkaConfig, hwThreadInitialized = true } replicaFetcherManager.shutdownIdleFetcherThreads() - onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower) BecomeLeaderOrFollowerResult(responseMap, Errors.NONE) } @@ -926,18 +1000,27 @@ class ReplicaManager(val config: KafkaConfig, replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(_.topicPartition)) // Update the partition information to be the leader partitionState.foreach{ case (partition, partitionStateInfo) => - if (partition.makeLeader(controllerId, partitionStateInfo, correlationId)) - partitionsToMakeLeaders += partition - else - stateChangeLogger.info(("Broker %d skipped the become-leader state change after marking its partition as leader with correlation id %d from " + - "controller %d epoch %d for partition %s since it is already the leader for the partition.") - .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition)) - } - partitionsToMakeLeaders.foreach { partition => - stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-leader request from controller " + - "%d epoch %d with correlation id %d for partition %s") - .format(localBrokerId, controllerId, epoch, correlationId, partition.topicPartition)) + try { + if (partition.makeLeader(controllerId, partitionStateInfo, correlationId)) { + partitionsToMakeLeaders += partition + stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-leader request from controller " + + "%d epoch %d with correlation id %d for partition %s") + .format(localBrokerId, controllerId, epoch, correlationId, partition.topicPartition)) + } else + stateChangeLogger.info(("Broker %d skipped the become-leader state change after marking its partition as leader with correlation id %d from " + + "controller %d epoch %d for partition %s since it is already the leader for the partition.") + .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition)) + } catch { + case e: KafkaStorageException => + stateChangeLogger.error(("Broker %d skipped the become-leader state change with correlation id %d from " + + "controller %d epoch %d for partition %s since the replica for the partition is offline due to disk error %s.") + .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition, e)) + val dirOpt = getLogDir(new TopicPartition(partition.topic, partition.partitionId)) + error(s"Error while making broker the leader for partition $partition in dir $dirOpt", e) + responseMap.put(new TopicPartition(partition.topic, partition.partitionId), Errors.KAFKA_STORAGE_ERROR) + } } + } catch { case e: Throwable => partitionState.keys.foreach { partition => @@ -996,27 +1079,37 @@ class ReplicaManager(val config: KafkaConfig, // TODO: Delete leaders from LeaderAndIsrRequest partitionState.foreach{ case (partition, partitionStateInfo) => - val newLeaderBrokerId = partitionStateInfo.leader - metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match { - // Only change partition state when the leader is available - case Some(_) => - if (partition.makeFollower(controllerId, partitionStateInfo, correlationId)) - partitionsToMakeFollower += partition - else - stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " + - "controller %d epoch %d for partition %s since the new leader %d is the same as the old leader") + try { + val newLeaderBrokerId = partitionStateInfo.leader + metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match { + // Only change partition state when the leader is available + case Some(_) => + if (partition.makeFollower(controllerId, partitionStateInfo, correlationId)) + partitionsToMakeFollower += partition + else + stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " + + "controller %d epoch %d for partition %s since the new leader %d is the same as the old leader") + .format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch, + partition.topicPartition, newLeaderBrokerId)) + case None => + // The leader broker should always be present in the metadata cache. + // If not, we should record the error message and abort the transition process for this partition + stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest with correlation id %d from controller" + + " %d epoch %d for partition %s but cannot become follower since the new leader %d is unavailable.") .format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch, - partition.topicPartition, newLeaderBrokerId)) - case None => - // The leader broker should always be present in the metadata cache. - // If not, we should record the error message and abort the transition process for this partition - stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest with correlation id %d from controller" + - " %d epoch %d for partition %s but cannot become follower since the new leader %d is unavailable.") - .format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch, - partition.topicPartition, newLeaderBrokerId)) - // Create the local replica even if the leader is unavailable. This is required to ensure that we include - // the partition's high watermark in the checkpoint file (see KAFKA-1647) - partition.getOrCreateReplica() + partition.topicPartition, newLeaderBrokerId)) + // Create the local replica even if the leader is unavailable. This is required to ensure that we include + // the partition's high watermark in the checkpoint file (see KAFKA-1647) + partition.getOrCreateReplica(isNew = partitionStateInfo.isNew) + } + } catch { + case e: KafkaStorageException => + stateChangeLogger.error(("Broker %d skipped the become-follower state change with correlation id %d from " + + "controller %d epoch %d for partition [%s,%d] since the replica for the partition is offline due to disk error %s") + .format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch, partition.topic, partition.partitionId, e)) + val dirOpt = getLogDir(new TopicPartition(partition.topic, partition.partitionId)) + error(s"Error while making broker the follower for partition $partition in dir $dirOpt", e) + responseMap.put(new TopicPartition(partition.topic, partition.partitionId), Errors.KAFKA_STORAGE_ERROR) } } @@ -1080,7 +1173,7 @@ class ReplicaManager(val config: KafkaConfig, private def maybeShrinkIsr(): Unit = { trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR") - allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs)) + allPartitions.values.filter(_ ne ReplicaManager.OfflinePartition).foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs)) } private def updateFollowerLogReadResults(replicaId: Int, readResults: Seq[(TopicPartition, LogReadResult)]) { @@ -1088,7 +1181,8 @@ class ReplicaManager(val config: KafkaConfig, readResults.foreach { case (topicPartition, readResult) => getPartition(topicPartition) match { case Some(partition) => - partition.updateReplicaLogReadResult(replicaId, readResult) + if (partition ne ReplicaManager.OfflinePartition) + partition.updateReplicaLogReadResult(replicaId, readResult) // for producer requests with ack > 1, we need to check // if they can be unblocked after some follower's log end offsets have moved @@ -1100,33 +1194,86 @@ class ReplicaManager(val config: KafkaConfig, } private def getLeaderPartitions: List[Partition] = - allPartitions.values.filter(_.leaderReplicaIfLocal.isDefined).toList + allPartitions.values.filter(partition => (partition ne ReplicaManager.OfflinePartition) && partition.leaderReplicaIfLocal.isDefined).toList def getLogEndOffset(topicPartition: TopicPartition): Option[Long] = { - getPartition(topicPartition).flatMap{ partition => - partition.leaderReplicaIfLocal.map(_.logEndOffset.messageOffset) + getPartition(topicPartition) match { + case Some(partition) => + if (partition eq ReplicaManager.OfflinePartition) + None + else + partition.leaderReplicaIfLocal.map(_.logEndOffset.messageOffset) + case None => None } } // Flushes the highwatermark value for all partitions to the highwatermark file def checkpointHighWatermarks() { - val replicas = allPartitions.values.flatMap(_.getReplica(localBrokerId)) - val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParentFile.getAbsolutePath) + val replicas = allPartitions.values.filter(_ ne ReplicaManager.OfflinePartition).flatMap(_.getReplica(localBrokerId)) + val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParent) for ((dir, reps) <- replicasByDir) { val hwms = reps.map(r => r.partition.topicPartition -> r.highWatermark.messageOffset).toMap try { - highWatermarkCheckpoints(dir).write(hwms) + highWatermarkCheckpoints.get(dir).foreach(_.write(hwms)) } catch { - case e: IOException => - fatal("Error writing to highwatermark file: ", e) - Exit.halt(1) + case e: KafkaStorageException => + error(s"Error while writing to highwatermark file in directory $dir", e) } } } + def handleLogDirFailure(dir: String) { + if (!logManager.isLogDirOnline(dir)) + return + + info(s"Stopping serving replicas in dir $dir") + replicaStateChangeLock synchronized { + val newOfflinePartitions = allPartitions.values.filter { partition => + if (partition eq ReplicaManager.OfflinePartition) + false + else partition.getReplica(config.brokerId) match { + case Some(replica) => + replica.log.isDefined && replica.log.get.dir.getParent == dir + case None => false + } + }.map(_.topicPartition) + + info(s"Partitions ${newOfflinePartitions.mkString(",")} are offline due to failure on log directory $dir") + + newOfflinePartitions.foreach { topicPartition => + val partition = allPartitions.put(topicPartition, ReplicaManager.OfflinePartition) + partition.removePartitionMetrics() + } + + newOfflinePartitions.map(_.topic).toSet.foreach { topic: String => + val topicHasPartitions = allPartitions.values.exists(partition => topic == partition.topic) + if (!topicHasPartitions) + brokerTopicStats.removeMetrics(topic) + } + + replicaFetcherManager.removeFetcherForPartitions(newOfflinePartitions.toSet) + highWatermarkCheckpoints = highWatermarkCheckpoints.filterKeys(_ != dir) + info("Broker %d stopped fetcher for partitions %s because they are in the failed log dir %s" + .format(localBrokerId, newOfflinePartitions.mkString(", "), dir)) + } + logManager.handleLogDirFailure(dir) + LogDirUtils.propagateLogDirEvent(zkUtils, localBrokerId) + info(s"Stopped serving replicas in dir $dir") + } + + def removeMetrics() { + removeMetric("LeaderCount") + removeMetric("PartitionCount") + removeMetric("OfflineReplicaCount") + removeMetric("UnderReplicatedPartitions") + } + // High watermark do not need to be checkpointed only when under unit tests def shutdown(checkpointHW: Boolean = true) { info("Shutting down") + removeMetrics() + if (logDirFailureHandler != null) + logDirFailureHandler.shutdown() replicaFetcherManager.shutdown() delayedFetchPurgatory.shutdown() delayedProducePurgatory.shutdown() @@ -1144,7 +1291,10 @@ class ReplicaManager(val config: KafkaConfig, requestedEpochInfo.map { case (tp, leaderEpoch) => val epochEndOffset = getPartition(tp) match { case Some(partition) => - partition.lastOffsetForLeaderEpoch(leaderEpoch) + if (partition eq ReplicaManager.OfflinePartition) + new EpochEndOffset(KAFKA_STORAGE_ERROR, UNDEFINED_EPOCH_OFFSET) + else + partition.lastOffsetForLeaderEpoch(leaderEpoch) case None => new EpochEndOffset(UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH_OFFSET) } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala index cc50620..7b67559 100644 --- a/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala +++ b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala @@ -18,9 +18,13 @@ package kafka.server.checkpoints import java.io._ import java.nio.charset.StandardCharsets -import java.nio.file.{FileAlreadyExistsException, FileSystems, Files, Paths} -import kafka.utils.{Exit, Logging} +import java.nio.file.{FileAlreadyExistsException, Files, Paths} + +import kafka.server.LogDirFailureChannel +import kafka.utils.Logging +import org.apache.kafka.common.errors.KafkaStorageException import org.apache.kafka.common.utils.Utils + import scala.collection.{Seq, mutable} trait CheckpointFileFormatter[T]{ @@ -29,86 +33,94 @@ trait CheckpointFileFormatter[T]{ def fromLine(line: String): Option[T] } -class CheckpointFile[T](val file: File, version: Int, formatter: CheckpointFileFormatter[T]) extends Logging { +class CheckpointFile[T](val file: File, + version: Int, + formatter: CheckpointFileFormatter[T], + logDirFailureChannel: LogDirFailureChannel, + logDir: String) extends Logging { private val path = file.toPath.toAbsolutePath private val tempPath = Paths.get(path.toString + ".tmp") private val lock = new Object() - + try Files.createFile(file.toPath) // create the file if it doesn't exist catch { case _: FileAlreadyExistsException => } def write(entries: Seq[T]) { lock synchronized { - // write to temp file and then swap with the existing file - val fileOutputStream = new FileOutputStream(tempPath.toFile) - val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8)) try { - writer.write(version.toString) - writer.newLine() - - writer.write(entries.size.toString) - writer.newLine() + // write to temp file and then swap with the existing file + val fileOutputStream = new FileOutputStream(tempPath.toFile) + val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8)) + try { + writer.write(version.toString) + writer.newLine() - entries.foreach { entry => - writer.write(formatter.toLine(entry)) + writer.write(entries.size.toString) writer.newLine() + + entries.foreach { entry => + writer.write(formatter.toLine(entry)) + writer.newLine() + } + + writer.flush() + fileOutputStream.getFD().sync() + } finally { + writer.close() } - writer.flush() - fileOutputStream.getFD().sync() + Utils.atomicMoveWithFallback(tempPath, path) } catch { - case e: FileNotFoundException => - if (FileSystems.getDefault.isReadOnly) { - fatal(s"Halting writes to checkpoint file (${file.getAbsolutePath}) because the underlying file system is inaccessible: ", e) - Exit.halt(1) - } - throw e - } finally { - writer.close() + case e: IOException => + logDirFailureChannel.maybeAddLogFailureEvent(logDir) + throw new KafkaStorageException(s"Error while writing to checkpoint file ${file.getAbsolutePath}", e) } - - Utils.atomicMoveWithFallback(tempPath, path) } } def read(): Seq[T] = { def malformedLineException(line: String) = new IOException(s"Malformed line in checkpoint file (${file.getAbsolutePath}): $line'") - lock synchronized { - val reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8)) - var line: String = null try { - line = reader.readLine() - if (line == null) - return Seq.empty - line.toInt match { - case fileVersion if fileVersion == version => - line = reader.readLine() - if (line == null) - return Seq.empty - val expectedSize = line.toInt - val entries = mutable.Buffer[T]() - line = reader.readLine() - while (line != null) { - val entry = formatter.fromLine(line) - entry match { - case Some(e) => - entries += e - line = reader.readLine() - case _ => throw malformedLineException(line) + val reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8)) + var line: String = null + try { + line = reader.readLine() + if (line == null) + return Seq.empty + line.toInt match { + case fileVersion if fileVersion == version => + line = reader.readLine() + if (line == null) + return Seq.empty + val expectedSize = line.toInt + val entries = mutable.Buffer[T]() + line = reader.readLine() + while (line != null) { + val entry = formatter.fromLine(line) + entry match { + case Some(e) => + entries += e + line = reader.readLine() + case _ => throw malformedLineException(line) + } } - } - if (entries.size != expectedSize) - throw new IOException(s"Expected $expectedSize entries in checkpoint file (${file.getAbsolutePath}), but found only ${entries.size}") - entries - case _ => - throw new IOException(s"Unrecognized version of the checkpoint file (${file.getAbsolutePath}): " + version) + if (entries.size != expectedSize) + throw new IOException(s"Expected $expectedSize entries in checkpoint file (${file.getAbsolutePath}), but found only ${entries.size}") + entries + case _ => + throw new IOException(s"Unrecognized version of the checkpoint file (${file.getAbsolutePath}): " + version) + } + } catch { + case _: NumberFormatException => throw malformedLineException(line) + } finally { + reader.close() } } catch { - case _: NumberFormatException => throw malformedLineException(line) - } finally { - reader.close() + case e: IOException => + logDirFailureChannel.maybeAddLogFailureEvent(logDir) + throw new KafkaStorageException(s"Error while reading checkpoint file ${file.getAbsolutePath}", e) } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala index d32d30f..a8db688 100644 --- a/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala +++ b/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala @@ -19,6 +19,7 @@ package kafka.server.checkpoints import java.io._ import java.util.regex.Pattern +import kafka.server.LogDirFailureChannel import kafka.server.epoch.EpochEntry import scala.collection._ @@ -55,10 +56,10 @@ object LeaderEpochCheckpointFile { /** * This class persists a map of (LeaderEpoch => Offsets) to a file (for a certain replica) */ -class LeaderEpochCheckpointFile(val file: File) extends LeaderEpochCheckpoint { +class LeaderEpochCheckpointFile(val file: File, logDirFailureChannel: LogDirFailureChannel = null) extends LeaderEpochCheckpoint { import LeaderEpochCheckpointFile._ - val checkpoint = new CheckpointFile[EpochEntry](file, CurrentVersion, Formatter) + val checkpoint = new CheckpointFile[EpochEntry](file, CurrentVersion, Formatter, logDirFailureChannel, file.getParentFile.getParent) def write(epochs: Seq[EpochEntry]): Unit = checkpoint.write(epochs) http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala index 5f5dc97..9cd0963 100644 --- a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala +++ b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala @@ -19,6 +19,7 @@ package kafka.server.checkpoints import java.io._ import java.util.regex.Pattern +import kafka.server.LogDirFailureChannel import kafka.server.epoch.EpochEntry import org.apache.kafka.common.TopicPartition @@ -51,9 +52,9 @@ trait OffsetCheckpoint { /** * This class persists a map of (Partition => Offsets) to a file (for a certain replica) */ -class OffsetCheckpointFile(val f: File) { +class OffsetCheckpointFile(val f: File, logDirFailureChannel: LogDirFailureChannel = null) { val checkpoint = new CheckpointFile[(TopicPartition, Long)](f, OffsetCheckpointFile.CurrentVersion, - OffsetCheckpointFile.Formatter) + OffsetCheckpointFile.Formatter, logDirFailureChannel, f.getParent) def write(offsets: Map[TopicPartition, Long]): Unit = checkpoint.write(offsets.toSeq) http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/utils/LogDirUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/LogDirUtils.scala b/core/src/main/scala/kafka/utils/LogDirUtils.scala new file mode 100644 index 0000000..0bbc47d --- /dev/null +++ b/core/src/main/scala/kafka/utils/LogDirUtils.scala @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.utils + +import kafka.controller.LogDirEventNotificationListener +import scala.collection.Map + +object LogDirUtils extends Logging { + + private val LogDirEventNotificationPrefix = "log_dir_event_" + val LogDirFailureEvent = 1 + + def propagateLogDirEvent(zkUtils: ZkUtils, brokerId: Int) { + val logDirEventNotificationPath: String = zkUtils.createSequentialPersistentPath( + ZkUtils.LogDirEventNotificationPath + "/" + LogDirEventNotificationPrefix, logDirFailureEventZkData(brokerId)) + debug("Added " + logDirEventNotificationPath + " for broker " + brokerId) + } + + private def logDirFailureEventZkData(brokerId: Int): String = { + Json.encode(Map("version" -> LogDirEventNotificationListener.version, "broker" -> brokerId, "event" -> LogDirFailureEvent)) + } + + def deleteLogDirEvents(zkUtils: ZkUtils) { + val sequenceNumbers = zkUtils.getChildrenParentMayNotExist(ZkUtils.LogDirEventNotificationPath).toSet + sequenceNumbers.map(x => zkUtils.deletePath(ZkUtils.LogDirEventNotificationPath + "/" + x)) + } + + def getBrokerIdFromLogDirEvent(zkUtils: ZkUtils, child: String): Option[Int] = { + val changeZnode = ZkUtils.LogDirEventNotificationPath + "/" + child + val (jsonOpt, stat) = zkUtils.readDataMaybeNull(changeZnode) + if (jsonOpt.isDefined) { + val json = Json.parseFull(jsonOpt.get) + + json match { + case Some(m) => + val brokerAndEventType = m.asInstanceOf[Map[String, Any]] + val brokerId = brokerAndEventType.get("broker").get.asInstanceOf[Int] + val eventType = brokerAndEventType.get("event").get.asInstanceOf[Int] + if (eventType != LogDirFailureEvent) + throw new IllegalArgumentException(s"The event type $eventType in znode $changeZnode is not recognized") + Some(brokerId) + case None => + error("Invalid LogDirEvent JSON: " + jsonOpt.get + " in ZK: " + changeZnode) + None + } + } else { + None + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 0035120..7d3529f 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -51,6 +51,7 @@ object ZkUtils { val ControllerPath = "/controller" val ControllerEpochPath = "/controller_epoch" val IsrChangeNotificationPath = "/isr_change_notification" + val LogDirEventNotificationPath = "/log_dir_event_notification" val KafkaAclPath = "/kafka-acl" val KafkaAclChangesPath = "/kafka-acl-changes" @@ -75,7 +76,8 @@ object ZkUtils { IsrChangeNotificationPath, KafkaAclPath, KafkaAclChangesPath, - ProducerIdBlockPath) + ProducerIdBlockPath, + LogDirEventNotificationPath) // Important: it is necessary to add any new top level Zookeeper path that contains // sensitive information that should not be world readable to the Seq @@ -235,7 +237,8 @@ class ZkUtils(val zkClient: ZkClient, DeleteTopicsPath, BrokerSequenceIdPath, IsrChangeNotificationPath, - ProducerIdBlockPath) + ProducerIdBlockPath, + LogDirEventNotificationPath) // Visible for testing val zkPath = new ZkPath(zkClient) http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 09ff9be..2b134fe 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -44,7 +44,6 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.{Node, TopicPartition, requests} import org.junit.Assert._ import org.junit.{After, Assert, Before, Test} - import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.Buffer @@ -272,7 +271,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } private def createUpdateMetadataRequest = { - val partitionState = Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava)).asJava + val partitionState = Map(tp -> new UpdateMetadataRequest.PartitionState( + Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava, Seq.empty[Integer].asJava)).asJava val securityProtocol = SecurityProtocol.PLAINTEXT val brokers = Set(new requests.UpdateMetadataRequest.Broker(brokerId, Seq(new requests.UpdateMetadataRequest.EndPoint("localhost", 0, securityProtocol, @@ -303,8 +303,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { private def leaveGroupRequest = new LeaveGroupRequest.Builder(group, "").build() private def leaderAndIsrRequest = { - new requests.LeaderAndIsrRequest.Builder(brokerId, Int.MaxValue, - Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava)).asJava, + new requests.LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, brokerId, Int.MaxValue, + Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava, false)).asJava, Set(new Node(brokerId, "localhost", 0)).asJava).build() } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index 921c2b4..5e3c7ab 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -37,6 +37,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { val producerCount: Int val consumerCount: Int val serverCount: Int + var logDirCount: Int = 1 lazy val producerConfig = new Properties lazy val consumerConfig = new Properties lazy val serverConfig = new Properties @@ -46,7 +47,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { override def generateConfigs = { val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol), - trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties) + trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = logDirCount) cfgs.foreach { config => config.setProperty(KafkaConfig.ListenersProp, s"${listenerName.value}://localhost:${TestUtils.RandomPort}") config.remove(KafkaConfig.InterBrokerSecurityProtocolProp) @@ -84,7 +85,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { saslProperties = this.clientSaslProperties, props = Some(producerConfig)) } - + def createNewConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = { TestUtils.createNewConsumer(brokerList, securityProtocol = this.securityProtocol, http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala b/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala new file mode 100644 index 0000000..6942df0 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.api + +import java.util.Collections +import java.util.concurrent.{ExecutionException, TimeUnit} + +import kafka.controller.{OfflineReplica, PartitionAndReplica} +import kafka.server.KafkaConfig +import kafka.utils.{CoreUtils, TestUtils, ZkUtils} +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.errors.{KafkaStorageException, NotLeaderForPartitionException} +import org.junit.{Before, Test} +import org.junit.Assert.assertTrue +import org.junit.Assert.assertEquals + +/** + * Test whether clients can producer and consume when there is log directory failure + */ +class LogDirFailureTest extends IntegrationTestHarness { + val producerCount: Int = 1 + val consumerCount: Int = 1 + val serverCount: Int = 2 + private val topic = "topic" + + this.logDirCount = 2 + this.producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "0") + this.producerConfig.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100") + this.serverConfig.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp, "100") + + @Before + override def setUp() { + super.setUp() + TestUtils.createTopic(zkUtils, topic, 1, 2, servers = servers) + } + + @Test + def testProduceAfterLogDirFailure() { + + val consumer = consumers.head + subscribeAndWaitForAssignment(topic, consumer) + val producer = producers.head + val partition = new TopicPartition(topic, 0) + val record = new ProducerRecord(topic, 0, s"key".getBytes, s"value".getBytes) + + val leaderServerId = producer.partitionsFor(topic).get(0).leader().id() + val leaderServer = servers.find(_.config.brokerId == leaderServerId).get + + // The first send() should succeed + producer.send(record).get() + TestUtils.waitUntilTrue(() => { + consumer.poll(0).count() == 1 + }, "Expected the first message", 3000L) + + // Make log directory of the partition on the leader broker inaccessible by replacing it with a file + val replica = leaderServer.replicaManager.getReplica(partition) + val logDir = replica.get.log.get.dir.getParentFile + CoreUtils.swallow(Utils.delete(logDir)) + logDir.createNewFile() + assertTrue(logDir.isFile) + + // Wait for ReplicaHighWatermarkCheckpoint to happen so that the log directory of the topic will be offline + TestUtils.waitUntilTrue(() => !leaderServer.logManager.liveLogDirs.contains(logDir), "Expected log directory offline", 3000L) + assertTrue(leaderServer.replicaManager.getReplica(partition).isEmpty) + + // The second send() should fail due to either KafkaStorageException or NotLeaderForPartitionException + try { + producer.send(record).get(6000, TimeUnit.MILLISECONDS) + fail("send() should fail with either KafkaStorageException or NotLeaderForPartitionException") + } catch { + case e: ExecutionException => + e.getCause match { + case t: KafkaStorageException => + case t: NotLeaderForPartitionException => // This may happen if ProduceRequest version <= 3 + case t: Throwable => fail(s"send() should fail with either KafkaStorageException or NotLeaderForPartitionException instead of ${t.toString}") + } + case e: Throwable => fail(s"send() should fail with either KafkaStorageException or NotLeaderForPartitionException instead of ${e.toString}") + } + + // Wait for producer to update metadata for the partition + TestUtils.waitUntilTrue(() => { + // ProduceResponse may contain KafkaStorageException and trigger metadata update + producer.send(record) + producer.partitionsFor(topic).get(0).leader().id() != leaderServerId + }, "Expected new leader for the partition", 6000L) + + // Consumer should receive some messages + TestUtils.waitUntilTrue(() => { + consumer.poll(0).count() > 0 + }, "Expected some messages", 3000L) + + // There should be no remaining LogDirEventNotification znode + assertTrue(zkUtils.getChildrenParentMayNotExist(ZkUtils.LogDirEventNotificationPath).isEmpty) + + // The controller should have marked the replica on the original leader as offline + val controllerServer = servers.find(_.kafkaController.isActive).get + val offlineReplicas = controllerServer.kafkaController.replicaStateMachine.replicasInState(topic, OfflineReplica) + assertTrue(offlineReplicas.contains(PartitionAndReplica(topic, 0, leaderServerId))) + } + + private def subscribeAndWaitForAssignment(topic: String, consumer: KafkaConsumer[Array[Byte], Array[Byte]]) { + consumer.subscribe(Collections.singletonList(topic)) + TestUtils.waitUntilTrue(() => { + consumer.poll(0) + !consumer.assignment.isEmpty + }, "Expected non-empty assignment") + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala index 0e57e53..760cc39 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala @@ -389,7 +389,7 @@ class TransactionsTest extends KafkaServerTestHarness { val recordMetadata = result.get() error(s"Missed a producer fenced exception when writing to ${recordMetadata.topic}-${recordMetadata.partition}. Grab the logs!!") servers.foreach { server => - error(s"log dirs: ${server.logManager.logDirs.map(_.getAbsolutePath).head}") + error(s"log dirs: ${server.logManager.liveLogDirs.map(_.getAbsolutePath).head}") } fail("Should not be able to send messages from a fenced producer.") } catch { @@ -436,7 +436,7 @@ class TransactionsTest extends KafkaServerTestHarness { val recordMetadata = result.get() error(s"Missed a producer fenced exception when writing to ${recordMetadata.topic}-${recordMetadata.partition}. Grab the logs!!") servers.foreach { case (server) => - error(s"log dirs: ${server.logManager.logDirs.map(_.getAbsolutePath).head}") + error(s"log dirs: ${server.logManager.liveLogDirs.map(_.getAbsolutePath).head}") } fail("Should not be able to send messages from a fenced producer.") } catch { http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala index 147e84a..ebe7223 100644 --- a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala +++ b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala @@ -111,7 +111,7 @@ class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness { override def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager = { new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown, - quotaManagers.follower, new BrokerTopicStats, metadataCache) { + quotaManagers.follower, new BrokerTopicStats, metadataCache, logDirFailureChannel) { override protected def createReplicaFetcherManager(metrics: Metrics, time: Time, threadNamePrefix: Option[String], quotaManager: ReplicationQuotaManager) = http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index a8ce17e..24e2920 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -97,7 +97,7 @@ class GroupMetadataManagerTest { expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records) EasyMock.replay(replicaManager) - + groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ()) val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache")) @@ -685,9 +685,9 @@ class GroupMetadataManagerTest { assertStoreGroupErrorMapping(Errors.NOT_ENOUGH_REPLICAS, Errors.COORDINATOR_NOT_AVAILABLE) assertStoreGroupErrorMapping(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND, Errors.COORDINATOR_NOT_AVAILABLE) assertStoreGroupErrorMapping(Errors.NOT_LEADER_FOR_PARTITION, Errors.NOT_COORDINATOR) - assertStoreGroupErrorMapping(Errors.MESSAGE_TOO_LARGE, Errors.UNKNOWN) - assertStoreGroupErrorMapping(Errors.RECORD_LIST_TOO_LARGE, Errors.UNKNOWN) - assertStoreGroupErrorMapping(Errors.INVALID_FETCH_SIZE, Errors.UNKNOWN) + assertStoreGroupErrorMapping(Errors.MESSAGE_TOO_LARGE, Errors.UNKNOWN_SERVER_ERROR) + assertStoreGroupErrorMapping(Errors.RECORD_LIST_TOO_LARGE, Errors.UNKNOWN_SERVER_ERROR) + assertStoreGroupErrorMapping(Errors.INVALID_FETCH_SIZE, Errors.UNKNOWN_SERVER_ERROR) assertStoreGroupErrorMapping(Errors.CORRUPT_MESSAGE, Errors.CORRUPT_MESSAGE) } @@ -1311,7 +1311,7 @@ class GroupMetadataManagerTest { EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) capturedArgument } - + private def expectAppendMessage(error: Errors) { val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture() EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(), http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala index df2f7df..6323d15 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala @@ -144,7 +144,7 @@ class TransactionMarkerRequestCompletionHandlerTest { @Test def shouldThrowIllegalStateExceptionWhenUnknownError(): Unit = { - verifyThrowIllegalStateExceptionOnError(Errors.UNKNOWN) + verifyThrowIllegalStateExceptionOnError(Errors.UNKNOWN_SERVER_ERROR) } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index 6a35d41..e86e088 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -309,7 +309,7 @@ class TransactionStateManagerTest { transactionManager.addLoadedTransactionsToCache(partitionId, coordinatorEpoch, new Pool[String, TransactionMetadata]()) transactionManager.putTransactionStateIfNotExists(transactionalId1, txnMetadata1) - expectedError = Errors.UNKNOWN + expectedError = Errors.UNKNOWN_SERVER_ERROR var failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds()) prepareForTxnMessageAppend(Errors.MESSAGE_TOO_LARGE) http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala index bf36199..d6f0a56 100644 --- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala @@ -20,7 +20,7 @@ import java.io.File import java.nio.file.Files import java.util.Properties -import kafka.server.BrokerTopicStats +import kafka.server.{BrokerTopicStats, LogDirFailureChannel} import kafka.utils.{MockTime, Pool, TestUtils} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.utils.Utils @@ -110,6 +110,7 @@ abstract class AbstractLogCleanerIntegrationTest { new LogCleaner(cleanerConfig, logDirs = Array(logDir), logs = logMap, + logDirFailureChannel = new LogDirFailureChannel(1), time = time) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index 8a119c2..e569b29 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala index b4c1790..f4eabc0 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala @@ -218,7 +218,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { private def createCleanerManager(log: Log): LogCleanerManager = { val logs = new Pool[TopicPartition, Log]() logs.put(new TopicPartition("log", 0), log) - val cleanerManager = new LogCleanerManager(Array(logDir), logs) + val cleanerManager = new LogCleanerManager(Array(logDir), logs, null) cleanerManager } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 3e58c4d..689a032 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -881,7 +881,7 @@ class LogCleanerTest extends JUnitSuite { checkSegmentOrder(groups) } - /** + /** * Following the loading of a log segment where the index file is zero sized, * the index returned would be the base offset. Sometimes the log file would * contain data with offsets in excess of the baseOffset which would cause @@ -1324,7 +1324,7 @@ class FakeOffsetMap(val slots: Int) extends OffsetMap { lastOffset = offset map.put(keyFor(key), offset) } - + override def get(key: ByteBuffer): Long = { val k = keyFor(key) if(map.containsKey(k)) @@ -1332,9 +1332,9 @@ class FakeOffsetMap(val slots: Int) extends OffsetMap { else -1L } - + override def clear(): Unit = map.clear() - + override def size: Int = map.size override def latestOffset: Long = lastOffset http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 8b7819f..0826747 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -21,12 +21,10 @@ import java.io._ import java.util.Properties import kafka.common._ -import kafka.server.FetchDataInfo import kafka.server.checkpoints.OffsetCheckpointFile import kafka.utils._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.OffsetOutOfRangeException -import org.apache.kafka.common.requests.IsolationLevel import org.apache.kafka.common.utils.Utils import org.junit.Assert._ import org.junit.{After, Before, Test} @@ -52,7 +50,7 @@ class LogManagerTest { logDir = TestUtils.tempDir() logManager = createLogManager() logManager.startup() - logDir = logManager.logDirs(0) + logDir = logManager.liveLogDirs(0) } @After @@ -60,7 +58,7 @@ class LogManagerTest { if(logManager != null) logManager.shutdown() Utils.delete(logDir) - logManager.logDirs.foreach(Utils.delete) + logManager.liveLogDirs.foreach(Utils.delete) } /** @@ -68,7 +66,7 @@ class LogManagerTest { */ @Test def testCreateLog() { - val log = logManager.createLog(new TopicPartition(name, 0), logConfig) + val log = logManager.getOrCreateLog(new TopicPartition(name, 0), logConfig) val logFile = new File(logDir, name + "-0") assertTrue(logFile.exists) log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0) @@ -90,7 +88,7 @@ class LogManagerTest { */ @Test def testCleanupExpiredSegments() { - val log = logManager.createLog(new TopicPartition(name, 0), logConfig) + val log = logManager.getOrCreateLog(new TopicPartition(name, 0), logConfig) var offset = 0L for(_ <- 0 until 200) { val set = TestUtils.singletonRecords("test".getBytes()) @@ -135,7 +133,7 @@ class LogManagerTest { logManager.startup() // create a log - val log = logManager.createLog(new TopicPartition(name, 0), config) + val log = logManager.getOrCreateLog(new TopicPartition(name, 0), config) var offset = 0L // add a bunch of messages that should be larger than the retentionSize @@ -175,7 +173,7 @@ class LogManagerTest { def testDoesntCleanLogsWithCompactDeletePolicy() { val logProps = new Properties() logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact + "," + LogConfig.Delete) - val log = logManager.createLog(new TopicPartition(name, 0), LogConfig.fromProps(logConfig.originals, logProps)) + val log = logManager.getOrCreateLog(new TopicPartition(name, 0), LogConfig.fromProps(logConfig.originals, logProps)) var offset = 0L for (_ <- 0 until 200) { val set = TestUtils.singletonRecords("test".getBytes(), key="test".getBytes()) @@ -204,7 +202,7 @@ class LogManagerTest { logManager = createLogManager() logManager.startup() - val log = logManager.createLog(new TopicPartition(name, 0), config) + val log = logManager.getOrCreateLog(new TopicPartition(name, 0), config) val lastFlush = log.lastFlushTime for (_ <- 0 until 200) { val set = TestUtils.singletonRecords("test".getBytes()) @@ -228,7 +226,7 @@ class LogManagerTest { // verify that logs are always assigned to the least loaded partition for(partition <- 0 until 20) { - logManager.createLog(new TopicPartition("test", partition), logConfig) + logManager.getOrCreateLog(new TopicPartition("test", partition), logConfig) assertEquals("We should have created the right number of logs", partition + 1, logManager.allLogs.size) val counts = logManager.allLogs.groupBy(_.dir.getParent).values.map(_.size) assertTrue("Load should balance evenly", counts.max <= counts.min + 1) @@ -286,7 +284,7 @@ class LogManagerTest { private def verifyCheckpointRecovery(topicPartitions: Seq[TopicPartition], logManager: LogManager) { - val logs = topicPartitions.map(this.logManager.createLog(_, logConfig)) + val logs = topicPartitions.map(this.logManager.getOrCreateLog(_, logConfig)) logs.foreach(log => { for (_ <- 0 until 50) log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0) @@ -294,7 +292,7 @@ class LogManagerTest { log.flush() }) - logManager.checkpointRecoveryPointOffsets() + logManager.checkpointLogRecoveryOffsets() val checkpoints = new OffsetCheckpointFile(new File(logDir, logManager.RecoveryPointCheckpointFile)).read() topicPartitions.zip(logs).foreach { http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index 79fe220..30ccc8b 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -30,7 +30,7 @@ import scala.collection.JavaConverters._ import scala.collection._ class LogSegmentTest { - + val topicPartition = new TopicPartition("topic", 0) val segments = mutable.ArrayBuffer[LogSegment]() var logDir: File = _ @@ -52,7 +52,7 @@ class LogSegmentTest { segments += seg seg } - + /* create a ByteBufferMessageSet for the given messages starting from the given offset */ def records(offset: Long, records: String*): MemoryRecords = { MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, offset, CompressionType.NONE, TimestampType.CREATE_TIME, http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 008cd27..2213d09 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -254,7 +254,8 @@ class LogTest { maxProducerIdExpirationMs = 300000, producerIdExpirationCheckIntervalMs = 30000, topicPartition = Log.parseTopicPartitionName(logDir), - stateManager) + producerStateManager = stateManager, + logDirFailureChannel = null) EasyMock.verify(stateManager) @@ -322,7 +323,8 @@ class LogTest { maxProducerIdExpirationMs = 300000, producerIdExpirationCheckIntervalMs = 30000, topicPartition = Log.parseTopicPartitionName(logDir), - stateManager) + producerStateManager = stateManager, + logDirFailureChannel = null) EasyMock.verify(stateManager) } @@ -356,7 +358,8 @@ class LogTest { maxProducerIdExpirationMs = 300000, producerIdExpirationCheckIntervalMs = 30000, topicPartition = Log.parseTopicPartitionName(logDir), - stateManager) + producerStateManager = stateManager, + logDirFailureChannel = null) EasyMock.verify(stateManager) cleanShutdownFile.delete() @@ -391,7 +394,8 @@ class LogTest { maxProducerIdExpirationMs = 300000, producerIdExpirationCheckIntervalMs = 30000, topicPartition = Log.parseTopicPartitionName(logDir), - stateManager) + producerStateManager = stateManager, + logDirFailureChannel = null) EasyMock.verify(stateManager) cleanShutdownFile.delete() http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index b6b40c2..5e63500 100755 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -24,7 +24,6 @@ import org.apache.kafka.common.utils.Utils import org.easymock.EasyMock import org.junit._ import org.junit.Assert._ -import kafka.common._ import kafka.cluster.Replica import kafka.utils.{KafkaScheduler, MockTime, TestUtils, ZkUtils} import java.util.concurrent.atomic.AtomicBoolean @@ -35,24 +34,28 @@ class HighwatermarkPersistenceTest { val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps) val topic = "foo" + val zkUtils = EasyMock.createMock(classOf[ZkUtils]) val logManagers = configs map { config => TestUtils.createLogManager( logDirs = config.logDirs.map(new File(_)).toArray, cleanerConfig = CleanerConfig()) } - + + val logDirFailureChannels = configs map { config => + new LogDirFailureChannel(config.logDirs.size) + } + @After def teardown() { - for(manager <- logManagers; dir <- manager.logDirs) + for(manager <- logManagers; dir <- manager.liveLogDirs) Utils.delete(dir) } @Test def testHighWatermarkPersistenceSinglePartition() { // mock zkclient - val zkUtils = EasyMock.createMock(classOf[ZkUtils]) EasyMock.replay(zkUtils) - + // create kafka scheduler val scheduler = new KafkaScheduler(2) scheduler.startup @@ -61,7 +64,7 @@ class HighwatermarkPersistenceTest { // create replica manager val replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler, logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower, - new BrokerTopicStats, new MetadataCache(configs.head.brokerId)) + new BrokerTopicStats, new MetadataCache(configs.head.brokerId), logDirFailureChannels.head) replicaManager.startup() try { replicaManager.checkpointHighWatermarks() @@ -69,7 +72,7 @@ class HighwatermarkPersistenceTest { assertEquals(0L, fooPartition0Hw) val partition0 = replicaManager.getOrCreatePartition(new TopicPartition(topic, 0)) // create leader and follower replicas - val log0 = logManagers.head.createLog(new TopicPartition(topic, 0), LogConfig()) + val log0 = logManagers.head.getOrCreateLog(new TopicPartition(topic, 0), LogConfig()) val leaderReplicaPartition0 = new Replica(configs.head.brokerId, partition0, time, 0, Some(log0)) partition0.addReplicaIfNotExists(leaderReplicaPartition0) val followerReplicaPartition0 = new Replica(configs.last.brokerId, partition0, time) @@ -96,7 +99,6 @@ class HighwatermarkPersistenceTest { val topic1 = "foo1" val topic2 = "foo2" // mock zkclient - val zkUtils = EasyMock.createMock(classOf[ZkUtils]) EasyMock.replay(zkUtils) // create kafka scheduler val scheduler = new KafkaScheduler(2) @@ -106,7 +108,7 @@ class HighwatermarkPersistenceTest { // create replica manager val replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler, logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower, - new BrokerTopicStats, new MetadataCache(configs.head.brokerId)) + new BrokerTopicStats, new MetadataCache(configs.head.brokerId), logDirFailureChannels.head) replicaManager.startup() try { replicaManager.checkpointHighWatermarks() @@ -114,7 +116,7 @@ class HighwatermarkPersistenceTest { assertEquals(0L, topic1Partition0Hw) val topic1Partition0 = replicaManager.getOrCreatePartition(new TopicPartition(topic1, 0)) // create leader log - val topic1Log0 = logManagers.head.createLog(new TopicPartition(topic1, 0), LogConfig()) + val topic1Log0 = logManagers.head.getOrCreateLog(new TopicPartition(topic1, 0), LogConfig()) // create a local replica for topic1 val leaderReplicaTopic1Partition0 = new Replica(configs.head.brokerId, topic1Partition0, time, 0, Some(topic1Log0)) topic1Partition0.addReplicaIfNotExists(leaderReplicaTopic1Partition0) @@ -130,7 +132,7 @@ class HighwatermarkPersistenceTest { // add another partition and set highwatermark val topic2Partition0 = replicaManager.getOrCreatePartition(new TopicPartition(topic2, 0)) // create leader log - val topic2Log0 = logManagers.head.createLog(new TopicPartition(topic2, 0), LogConfig()) + val topic2Log0 = logManagers.head.getOrCreateLog(new TopicPartition(topic2, 0), LogConfig()) // create a local replica for topic2 val leaderReplicaTopic2Partition0 = new Replica(configs.head.brokerId, topic2Partition0, time, 0, Some(topic2Log0)) topic2Partition0.addReplicaIfNotExists(leaderReplicaTopic2Partition0) @@ -163,5 +165,5 @@ class HighwatermarkPersistenceTest { replicaManager.highWatermarkCheckpoints(new File(replicaManager.config.logDirs.head).getAbsolutePath).read.getOrElse( new TopicPartition(topic, partition), 0L) } - + }
