This is an automated email from the ASF dual-hosted git repository. jsancio pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 2fc1875d0b1 KAFKA-14557; Lock metadata log dir (#13058) 2fc1875d0b1 is described below commit 2fc1875d0b1e5cd58727bbe80ee6d06602671993 Author: José Armando García Sancio <jsan...@users.noreply.github.com> AuthorDate: Tue Jan 10 10:18:40 2023 -0800 KAFKA-14557; Lock metadata log dir (#13058) This change makes sure that Kafka grabs a log dir lock in the following additional cases: 1. When a Kafka node runs in controller only. The current implementation doesn't grab a file lock because the LogManager is never instantiated. 2. When the metadata log dir is different from the log dir(s). The current implementation of LogManager doesn't load or grab a lock on the metadata dir. Reviewers: Ron Dagostino <rdagost...@confluent.io> , dengziming <dengziming1...@gmail.com> --- core/src/main/scala/kafka/log/LogManager.scala | 4 +- core/src/main/scala/kafka/raft/RaftManager.scala | 58 +++++-- .../main/scala/kafka/server/KafkaRaftServer.scala | 8 +- .../scala/unit/kafka/raft/RaftManagerTest.scala | 172 ++++++++++++++++----- 4 files changed, 189 insertions(+), 53 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index a82cf5d1bfd..1a3f28eaba4 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -79,7 +79,6 @@ class LogManager(logDirs: Seq[File], import LogManager._ - val LockFile = ".lock" val InitialTaskDelayMs = 30 * 1000 private val logCreationOrDeletionLock = new Object @@ -241,7 +240,7 @@ class LogManager(logDirs: Seq[File], private def lockLogDirs(dirs: Seq[File]): Seq[FileLock] = { dirs.flatMap { dir => try { - val lock = new FileLock(new File(dir, LockFile)) + val lock = new FileLock(new File(dir, LockFileName)) if (!lock.tryLock()) throw new KafkaException("Failed to acquire lock on file .lock in " + lock.file.getParent + ". A Kafka instance in another process or thread is using this directory.") @@ -1341,6 +1340,7 @@ class LogManager(logDirs: Seq[File], } object LogManager { + val LockFileName = ".lock" /** * Wait all jobs to complete diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala index 5b8fe1e8276..bbb31806c3b 100644 --- a/core/src/main/scala/kafka/raft/RaftManager.scala +++ b/core/src/main/scala/kafka/raft/RaftManager.scala @@ -18,15 +18,25 @@ package kafka.raft import java.io.File import java.nio.file.Files +import java.nio.file.Paths import java.util import java.util.OptionalInt import java.util.concurrent.CompletableFuture +import kafka.log.LogManager import kafka.log.UnifiedLog import kafka.raft.KafkaRaftManager.RaftIoThread +import kafka.server.KafkaRaftServer.ControllerRole import kafka.server.{KafkaConfig, MetaProperties} +import kafka.utils.CoreUtils +import kafka.utils.FileLock +import kafka.utils.KafkaScheduler +import kafka.utils.Logging +import kafka.utils.ShutdownableThread import kafka.utils.timer.SystemTimer -import kafka.utils.{KafkaScheduler, Logging, ShutdownableThread} import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient} +import org.apache.kafka.common.KafkaException +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.Uuid import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.{ChannelBuilders, ListenerName, NetworkReceive, Selectable, Selector} import org.apache.kafka.common.protocol.ApiMessage @@ -34,7 +44,6 @@ import org.apache.kafka.common.requests.RequestHeader import org.apache.kafka.common.security.JaasContext import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.{LogContext, Time} -import org.apache.kafka.common.{TopicPartition, Uuid} import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec, NON_ROUTABLE_ADDRESS, UnknownAddressSpec} import org.apache.kafka.raft.{FileBasedStateStore, KafkaRaftClient, LeaderAndEpoch, RaftClient, RaftConfig, RaftRequest, ReplicatedLog} import org.apache.kafka.server.common.serialization.RecordSerde @@ -78,6 +87,19 @@ object KafkaRaftManager { Files.createDirectories(dir.toPath) dir } + + private def lockDataDir(dataDir: File): FileLock = { + val lock = new FileLock(new File(dataDir, LogManager.LockFileName)) + + if (!lock.tryLock()) { + throw new KafkaException( + s"Failed to acquire lock on file .lock in ${lock.file.getParent}. A Kafka instance in another process or " + + "thread is using this directory." + ) + } + + lock + } } trait RaftManager[T] { @@ -120,6 +142,23 @@ class KafkaRaftManager[T]( scheduler.startup() private val dataDir = createDataDir() + + private val dataDirLock = { + // Aquire the log dir lock if the metadata log dir is different from the log dirs + val differentMetadataLogDir = !config + .logDirs + .map(Paths.get(_).toAbsolutePath) + .contains(Paths.get(config.metadataLogDir).toAbsolutePath) + // Or this node is only a controller + val isOnlyController = config.processRoles == Set(ControllerRole) + + if (differentMetadataLogDir || isOnlyController) { + Some(KafkaRaftManager.lockDataDir(new File(config.metadataLogDir))) + } else { + None + } + } + override val replicatedLog: ReplicatedLog = buildMetadataLog() private val netChannel = buildNetworkChannel() private val expirationTimer = new SystemTimer("raft-expiration-executor") @@ -147,13 +186,14 @@ class KafkaRaftManager[T]( } def shutdown(): Unit = { - expirationService.shutdown() - expirationTimer.shutdown() - raftIoThread.shutdown() - client.close() - scheduler.shutdown() - netChannel.close() - replicatedLog.close() + CoreUtils.swallow(expirationService.shutdown(), this) + CoreUtils.swallow(expirationTimer.shutdown(), this) + CoreUtils.swallow(raftIoThread.shutdown(), this) + CoreUtils.swallow(client.close(), this) + CoreUtils.swallow(scheduler.shutdown(), this) + CoreUtils.swallow(netChannel.close(), this) + CoreUtils.swallow(replicatedLog.close(), this) + CoreUtils.swallow(dataDirLock.foreach(_.destroy()), this) } override def register( diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala index 9cb95e39166..f5f2c121975 100644 --- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala +++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala @@ -124,8 +124,12 @@ object KafkaRaftServer { val MetadataTopicId = Uuid.METADATA_TOPIC_ID sealed trait ProcessRole - case object BrokerRole extends ProcessRole - case object ControllerRole extends ProcessRole + case object BrokerRole extends ProcessRole { + override def toString(): String = "broker" + } + case object ControllerRole extends ProcessRole { + override def toString(): String = "controller" + } /** * Initialize the configured log directories, including both [[KafkaConfig.MetadataLogDirProp]] diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala index 9d7a93db94c..9907c802d2b 100644 --- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala +++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala @@ -16,10 +16,20 @@ */ package kafka.raft -import java.util.concurrent.CompletableFuture +import java.nio.channels.FileChannel +import java.nio.channels.OverlappingFileLockException +import java.nio.file.Path +import java.nio.file.StandardOpenOption import java.util.Properties +import java.util.concurrent.CompletableFuture +import kafka.log.LogManager import kafka.raft.KafkaRaftManager.RaftIoThread -import kafka.server.{KafkaConfig, MetaProperties} +import kafka.server.KafkaConfig +import kafka.server.KafkaRaftServer.BrokerRole +import kafka.server.KafkaRaftServer.ControllerRole +import kafka.server.KafkaRaftServer.ProcessRole +import kafka.server.MetaProperties +import kafka.utils.TestUtils import kafka.tools.TestRaftServer.ByteArraySerde import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.Uuid @@ -27,41 +37,50 @@ import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.utils.Time import org.apache.kafka.raft.KafkaRaftClient import org.apache.kafka.raft.RaftConfig -import org.apache.kafka.test.TestUtils import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import org.mockito.Mockito._ -import java.io.File - class RaftManagerTest { - - private def instantiateRaftManagerWithConfigs(topicPartition: TopicPartition, processRoles: String, nodeId: String) = { - def configWithProcessRolesAndNodeId(processRoles: String, nodeId: String, logDir: File): KafkaConfig = { - val props = new Properties - props.setProperty(KafkaConfig.MetadataLogDirProp, logDir.getPath) - props.setProperty(KafkaConfig.ProcessRolesProp, processRoles) - props.setProperty(KafkaConfig.NodeIdProp, nodeId) - props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL") - if (processRoles.contains("broker")) { - props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "PLAINTEXT") - if (processRoles.contains("controller")) { // co-located - props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093") - props.setProperty(KafkaConfig.QuorumVotersProp, s"${nodeId}@localhost:9093") - } else { // broker-only - val voterId = (nodeId.toInt + 1) - props.setProperty(KafkaConfig.QuorumVotersProp, s"${voterId}@localhost:9093") - } - } else if (processRoles.contains("controller")) { // controller-only - props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9093") + private def createConfig( + processRoles: Set[ProcessRole], + nodeId: Int, + logDir: Option[Path], + metadataDir: Option[Path] + ): KafkaConfig = { + val props = new Properties + logDir.foreach { value => + props.setProperty(KafkaConfig.LogDirProp, value.toString) + } + metadataDir.foreach { value => + props.setProperty(KafkaConfig.MetadataLogDirProp, value.toString) + } + props.setProperty(KafkaConfig.ProcessRolesProp, processRoles.mkString(",")) + props.setProperty(KafkaConfig.NodeIdProp, nodeId.toString) + props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL") + if (processRoles.contains(BrokerRole)) { + props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "PLAINTEXT") + if (processRoles.contains(ControllerRole)) { // co-located + props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093") props.setProperty(KafkaConfig.QuorumVotersProp, s"${nodeId}@localhost:9093") + } else { // broker-only + val voterId = nodeId + 1 + props.setProperty(KafkaConfig.QuorumVotersProp, s"${voterId}@localhost:9093") } - - new KafkaConfig(props) + } else if (processRoles.contains(ControllerRole)) { // controller-only + props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9093") + props.setProperty(KafkaConfig.QuorumVotersProp, s"${nodeId}@localhost:9093") } - val logDir = TestUtils.tempDirectory() - val config = configWithProcessRolesAndNodeId(processRoles, nodeId, logDir) + new KafkaConfig(props) + } + + private def createRaftManager( + topicPartition: TopicPartition, + config: KafkaConfig + ): KafkaRaftManager[Array[Byte]] = { val topicId = new Uuid(0L, 2L) val metaProperties = MetaProperties( clusterId = Uuid.randomUuid.toString, @@ -81,25 +100,99 @@ class RaftManagerTest { ) } - @Test - def testNodeIdPresentIfBrokerRoleOnly(): Unit = { - val raftManager = instantiateRaftManagerWithConfigs(new TopicPartition("__raft_id_test", 0), "broker", "1") - assertEquals(1, raftManager.client.nodeId.getAsInt) + @ParameterizedTest + @ValueSource(strings = Array("broker", "controller", "broker,controller")) + def testNodeIdPresent(processRoles: String): Unit = { + var processRolesSet = Set.empty[ProcessRole] + if (processRoles.contains("broker")) { + processRolesSet = processRolesSet ++ Set(BrokerRole) + } + if (processRoles.contains("controller")) { + processRolesSet = processRolesSet ++ Set(ControllerRole) + } + + val logDir = TestUtils.tempDir() + val nodeId = 1 + val raftManager = createRaftManager( + new TopicPartition("__raft_id_test", 0), + createConfig( + processRolesSet, + nodeId, + Some(logDir.toPath), + None + ) + ) + assertEquals(nodeId, raftManager.client.nodeId.getAsInt) raftManager.shutdown() } - @Test - def testNodeIdPresentIfControllerRoleOnly(): Unit = { - val raftManager = instantiateRaftManagerWithConfigs(new TopicPartition("__raft_id_test", 0), "controller", "1") - assertEquals(1, raftManager.client.nodeId.getAsInt) + @ParameterizedTest + @ValueSource(strings = Array("metadata-only", "log-only", "both")) + def testLogDirLockWhenControllerOnly(dirType: String): Unit = { + val logDir = if (dirType.equals("metadata-only")) { + None + } else { + Some(TestUtils.tempDir().toPath) + } + + val metadataDir = if (dirType.equals("log-only")) { + None + } else { + Some(TestUtils.tempDir().toPath) + } + + val nodeId = 1 + val raftManager = createRaftManager( + new TopicPartition("__raft_id_test", 0), + createConfig( + Set(ControllerRole), + nodeId, + logDir, + metadataDir + ) + ) + + val lockPath = metadataDir.getOrElse(logDir.get).resolve(LogManager.LockFileName) + assertTrue(fileLocked(lockPath)) + raftManager.shutdown() + + assertFalse(fileLocked(lockPath)) } @Test - def testNodeIdPresentIfColocated(): Unit = { - val raftManager = instantiateRaftManagerWithConfigs(new TopicPartition("__raft_id_test", 0), "controller,broker", "1") - assertEquals(1, raftManager.client.nodeId.getAsInt) + def testLogDirLockWhenBrokerOnlyWithSeparateMetadataDir(): Unit = { + val logDir = Some(TestUtils.tempDir().toPath) + val metadataDir = Some(TestUtils.tempDir().toPath) + + val nodeId = 1 + val raftManager = createRaftManager( + new TopicPartition("__raft_id_test", 0), + createConfig( + Set(BrokerRole), + nodeId, + logDir, + metadataDir + ) + ) + + val lockPath = metadataDir.getOrElse(logDir.get).resolve(LogManager.LockFileName) + assertTrue(fileLocked(lockPath)) + raftManager.shutdown() + + assertFalse(fileLocked(lockPath)) + } + + private def fileLocked(path: Path): Boolean = { + TestUtils.resource(FileChannel.open(path, StandardOpenOption.WRITE)) { channel => + try { + Option(channel.tryLock()).foreach(_.close()) + false + } catch { + case _: OverlappingFileLockException => true + } + } } @Test @@ -140,5 +233,4 @@ class RaftManagerTest { assertTrue(ioThread.isThreadFailed) assertFalse(ioThread.isRunning) } - }