This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2fc1875d0b1 KAFKA-14557; Lock metadata log dir (#13058)
2fc1875d0b1 is described below

commit 2fc1875d0b1e5cd58727bbe80ee6d06602671993
Author: José Armando García Sancio <jsan...@users.noreply.github.com>
AuthorDate: Tue Jan 10 10:18:40 2023 -0800

    KAFKA-14557; Lock metadata log dir (#13058)
    
    This change makes sure that Kafka grabs a log dir lock in the following 
additional cases:
    
    1. When a Kafka node runs in controller only. The current implementation 
doesn't grab a file lock because the LogManager is never instantiated.
    2. When the metadata log dir is different from the log dir(s). The current 
implementation of LogManager doesn't load or grab a lock on the metadata dir.
    
    Reviewers: Ron Dagostino <rdagost...@confluent.io> , dengziming 
<dengziming1...@gmail.com>
---
 core/src/main/scala/kafka/log/LogManager.scala     |   4 +-
 core/src/main/scala/kafka/raft/RaftManager.scala   |  58 +++++--
 .../main/scala/kafka/server/KafkaRaftServer.scala  |   8 +-
 .../scala/unit/kafka/raft/RaftManagerTest.scala    | 172 ++++++++++++++++-----
 4 files changed, 189 insertions(+), 53 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index a82cf5d1bfd..1a3f28eaba4 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -79,7 +79,6 @@ class LogManager(logDirs: Seq[File],
 
   import LogManager._
 
-  val LockFile = ".lock"
   val InitialTaskDelayMs = 30 * 1000
 
   private val logCreationOrDeletionLock = new Object
@@ -241,7 +240,7 @@ class LogManager(logDirs: Seq[File],
   private def lockLogDirs(dirs: Seq[File]): Seq[FileLock] = {
     dirs.flatMap { dir =>
       try {
-        val lock = new FileLock(new File(dir, LockFile))
+        val lock = new FileLock(new File(dir, LockFileName))
         if (!lock.tryLock())
           throw new KafkaException("Failed to acquire lock on file .lock in " 
+ lock.file.getParent +
             ". A Kafka instance in another process or thread is using this 
directory.")
@@ -1341,6 +1340,7 @@ class LogManager(logDirs: Seq[File],
 }
 
 object LogManager {
+  val LockFileName = ".lock"
 
   /**
    * Wait all jobs to complete
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala 
b/core/src/main/scala/kafka/raft/RaftManager.scala
index 5b8fe1e8276..bbb31806c3b 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -18,15 +18,25 @@ package kafka.raft
 
 import java.io.File
 import java.nio.file.Files
+import java.nio.file.Paths
 import java.util
 import java.util.OptionalInt
 import java.util.concurrent.CompletableFuture
+import kafka.log.LogManager
 import kafka.log.UnifiedLog
 import kafka.raft.KafkaRaftManager.RaftIoThread
+import kafka.server.KafkaRaftServer.ControllerRole
 import kafka.server.{KafkaConfig, MetaProperties}
+import kafka.utils.CoreUtils
+import kafka.utils.FileLock
+import kafka.utils.KafkaScheduler
+import kafka.utils.Logging
+import kafka.utils.ShutdownableThread
 import kafka.utils.timer.SystemTimer
-import kafka.utils.{KafkaScheduler, Logging, ShutdownableThread}
 import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, 
NetworkClient}
+import org.apache.kafka.common.KafkaException
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.{ChannelBuilders, ListenerName, 
NetworkReceive, Selectable, Selector}
 import org.apache.kafka.common.protocol.ApiMessage
@@ -34,7 +44,6 @@ import org.apache.kafka.common.requests.RequestHeader
 import org.apache.kafka.common.security.JaasContext
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.{LogContext, Time}
-import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec, 
NON_ROUTABLE_ADDRESS, UnknownAddressSpec}
 import org.apache.kafka.raft.{FileBasedStateStore, KafkaRaftClient, 
LeaderAndEpoch, RaftClient, RaftConfig, RaftRequest, ReplicatedLog}
 import org.apache.kafka.server.common.serialization.RecordSerde
@@ -78,6 +87,19 @@ object KafkaRaftManager {
     Files.createDirectories(dir.toPath)
     dir
   }
+
+  private def lockDataDir(dataDir: File): FileLock = {
+    val lock = new FileLock(new File(dataDir, LogManager.LockFileName))
+
+    if (!lock.tryLock()) {
+      throw new KafkaException(
+        s"Failed to acquire lock on file .lock in ${lock.file.getParent}. A 
Kafka instance in another process or " +
+        "thread is using this directory."
+      )
+    }
+
+    lock
+  }
 }
 
 trait RaftManager[T] {
@@ -120,6 +142,23 @@ class KafkaRaftManager[T](
   scheduler.startup()
 
   private val dataDir = createDataDir()
+
+  private val dataDirLock = {
+    // Aquire the log dir lock if the metadata log dir is different from the 
log dirs
+    val differentMetadataLogDir = !config
+      .logDirs
+      .map(Paths.get(_).toAbsolutePath)
+      .contains(Paths.get(config.metadataLogDir).toAbsolutePath)
+    // Or this node is only a controller
+    val isOnlyController = config.processRoles == Set(ControllerRole)
+
+    if (differentMetadataLogDir || isOnlyController) {
+      Some(KafkaRaftManager.lockDataDir(new File(config.metadataLogDir)))
+    } else {
+      None
+    }
+  }
+
   override val replicatedLog: ReplicatedLog = buildMetadataLog()
   private val netChannel = buildNetworkChannel()
   private val expirationTimer = new SystemTimer("raft-expiration-executor")
@@ -147,13 +186,14 @@ class KafkaRaftManager[T](
   }
 
   def shutdown(): Unit = {
-    expirationService.shutdown()
-    expirationTimer.shutdown()
-    raftIoThread.shutdown()
-    client.close()
-    scheduler.shutdown()
-    netChannel.close()
-    replicatedLog.close()
+    CoreUtils.swallow(expirationService.shutdown(), this)
+    CoreUtils.swallow(expirationTimer.shutdown(), this)
+    CoreUtils.swallow(raftIoThread.shutdown(), this)
+    CoreUtils.swallow(client.close(), this)
+    CoreUtils.swallow(scheduler.shutdown(), this)
+    CoreUtils.swallow(netChannel.close(), this)
+    CoreUtils.swallow(replicatedLog.close(), this)
+    CoreUtils.swallow(dataDirLock.foreach(_.destroy()), this)
   }
 
   override def register(
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala 
b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index 9cb95e39166..f5f2c121975 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -124,8 +124,12 @@ object KafkaRaftServer {
   val MetadataTopicId = Uuid.METADATA_TOPIC_ID
 
   sealed trait ProcessRole
-  case object BrokerRole extends ProcessRole
-  case object ControllerRole extends ProcessRole
+  case object BrokerRole extends ProcessRole {
+    override def toString(): String = "broker"
+  }
+  case object ControllerRole extends ProcessRole {
+    override def toString(): String = "controller"
+  }
 
   /**
    * Initialize the configured log directories, including both 
[[KafkaConfig.MetadataLogDirProp]]
diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala 
b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
index 9d7a93db94c..9907c802d2b 100644
--- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
@@ -16,10 +16,20 @@
  */
 package kafka.raft
 
-import java.util.concurrent.CompletableFuture
+import java.nio.channels.FileChannel
+import java.nio.channels.OverlappingFileLockException
+import java.nio.file.Path
+import java.nio.file.StandardOpenOption
 import java.util.Properties
+import java.util.concurrent.CompletableFuture
+import kafka.log.LogManager
 import kafka.raft.KafkaRaftManager.RaftIoThread
-import kafka.server.{KafkaConfig, MetaProperties}
+import kafka.server.KafkaConfig
+import kafka.server.KafkaRaftServer.BrokerRole
+import kafka.server.KafkaRaftServer.ControllerRole
+import kafka.server.KafkaRaftServer.ProcessRole
+import kafka.server.MetaProperties
+import kafka.utils.TestUtils
 import kafka.tools.TestRaftServer.ByteArraySerde
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.Uuid
@@ -27,41 +37,50 @@ import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.raft.KafkaRaftClient
 import org.apache.kafka.raft.RaftConfig
-import org.apache.kafka.test.TestUtils
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 import org.mockito.Mockito._
 
-import java.io.File
-
 class RaftManagerTest {
-
-  private def instantiateRaftManagerWithConfigs(topicPartition: 
TopicPartition, processRoles: String, nodeId: String) = {
-    def configWithProcessRolesAndNodeId(processRoles: String, nodeId: String, 
logDir: File): KafkaConfig = {
-      val props = new Properties
-      props.setProperty(KafkaConfig.MetadataLogDirProp, logDir.getPath)
-      props.setProperty(KafkaConfig.ProcessRolesProp, processRoles)
-      props.setProperty(KafkaConfig.NodeIdProp, nodeId)
-      props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
-      if (processRoles.contains("broker")) {
-        props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "PLAINTEXT")
-        if (processRoles.contains("controller")) { // co-located
-          props.setProperty(KafkaConfig.ListenersProp, 
"PLAINTEXT://localhost:9092,SSL://localhost:9093")
-          props.setProperty(KafkaConfig.QuorumVotersProp, 
s"${nodeId}@localhost:9093")
-        } else { // broker-only
-          val voterId = (nodeId.toInt + 1)
-          props.setProperty(KafkaConfig.QuorumVotersProp, 
s"${voterId}@localhost:9093")
-        }
-      } else if (processRoles.contains("controller")) { // controller-only
-        props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9093")
+  private def createConfig(
+    processRoles: Set[ProcessRole],
+    nodeId: Int,
+    logDir: Option[Path],
+    metadataDir: Option[Path]
+  ): KafkaConfig = {
+    val props = new Properties
+    logDir.foreach { value =>
+      props.setProperty(KafkaConfig.LogDirProp, value.toString)
+    }
+    metadataDir.foreach { value =>
+      props.setProperty(KafkaConfig.MetadataLogDirProp, value.toString)
+    }
+    props.setProperty(KafkaConfig.ProcessRolesProp, processRoles.mkString(","))
+    props.setProperty(KafkaConfig.NodeIdProp, nodeId.toString)
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
+    if (processRoles.contains(BrokerRole)) {
+      props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "PLAINTEXT")
+      if (processRoles.contains(ControllerRole)) { // co-located
+        props.setProperty(KafkaConfig.ListenersProp, 
"PLAINTEXT://localhost:9092,SSL://localhost:9093")
         props.setProperty(KafkaConfig.QuorumVotersProp, 
s"${nodeId}@localhost:9093")
+      } else { // broker-only
+        val voterId = nodeId + 1
+        props.setProperty(KafkaConfig.QuorumVotersProp, 
s"${voterId}@localhost:9093")
       }
-
-      new KafkaConfig(props)
+    } else if (processRoles.contains(ControllerRole)) { // controller-only
+      props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9093")
+      props.setProperty(KafkaConfig.QuorumVotersProp, 
s"${nodeId}@localhost:9093")
     }
 
-    val logDir = TestUtils.tempDirectory()
-    val config = configWithProcessRolesAndNodeId(processRoles, nodeId, logDir)
+    new KafkaConfig(props)
+  }
+
+  private def createRaftManager(
+    topicPartition: TopicPartition,
+    config: KafkaConfig
+  ): KafkaRaftManager[Array[Byte]] = {
     val topicId = new Uuid(0L, 2L)
     val metaProperties = MetaProperties(
       clusterId = Uuid.randomUuid.toString,
@@ -81,25 +100,99 @@ class RaftManagerTest {
     )
   }
 
-  @Test
-  def testNodeIdPresentIfBrokerRoleOnly(): Unit = {
-    val raftManager = instantiateRaftManagerWithConfigs(new 
TopicPartition("__raft_id_test", 0), "broker", "1")
-    assertEquals(1, raftManager.client.nodeId.getAsInt)
+  @ParameterizedTest
+  @ValueSource(strings = Array("broker", "controller", "broker,controller"))
+  def testNodeIdPresent(processRoles: String): Unit = {
+    var processRolesSet = Set.empty[ProcessRole]
+    if (processRoles.contains("broker")) {
+      processRolesSet = processRolesSet ++ Set(BrokerRole)
+    }
+    if (processRoles.contains("controller")) {
+      processRolesSet = processRolesSet ++ Set(ControllerRole)
+    }
+
+    val logDir = TestUtils.tempDir()
+    val nodeId = 1
+    val raftManager = createRaftManager(
+      new TopicPartition("__raft_id_test", 0),
+      createConfig(
+        processRolesSet,
+        nodeId,
+        Some(logDir.toPath),
+        None
+      )
+    )
+    assertEquals(nodeId, raftManager.client.nodeId.getAsInt)
     raftManager.shutdown()
   }
 
-  @Test
-  def testNodeIdPresentIfControllerRoleOnly(): Unit = {
-    val raftManager = instantiateRaftManagerWithConfigs(new 
TopicPartition("__raft_id_test", 0), "controller", "1")
-    assertEquals(1, raftManager.client.nodeId.getAsInt)
+  @ParameterizedTest
+  @ValueSource(strings = Array("metadata-only", "log-only", "both"))
+  def testLogDirLockWhenControllerOnly(dirType: String): Unit = {
+    val logDir = if (dirType.equals("metadata-only")) {
+      None
+    } else {
+      Some(TestUtils.tempDir().toPath)
+    }
+
+    val metadataDir = if (dirType.equals("log-only")) {
+      None
+    } else {
+      Some(TestUtils.tempDir().toPath)
+    }
+
+    val nodeId = 1
+    val raftManager = createRaftManager(
+      new TopicPartition("__raft_id_test", 0),
+      createConfig(
+        Set(ControllerRole),
+        nodeId,
+        logDir,
+        metadataDir
+      )
+    )
+
+    val lockPath = 
metadataDir.getOrElse(logDir.get).resolve(LogManager.LockFileName)
+    assertTrue(fileLocked(lockPath))
+
     raftManager.shutdown()
+
+    assertFalse(fileLocked(lockPath))
   }
 
   @Test
-  def testNodeIdPresentIfColocated(): Unit = {
-    val raftManager = instantiateRaftManagerWithConfigs(new 
TopicPartition("__raft_id_test", 0), "controller,broker", "1")
-    assertEquals(1, raftManager.client.nodeId.getAsInt)
+  def testLogDirLockWhenBrokerOnlyWithSeparateMetadataDir(): Unit = {
+    val logDir = Some(TestUtils.tempDir().toPath)
+    val metadataDir = Some(TestUtils.tempDir().toPath)
+
+    val nodeId = 1
+    val raftManager = createRaftManager(
+      new TopicPartition("__raft_id_test", 0),
+      createConfig(
+        Set(BrokerRole),
+        nodeId,
+        logDir,
+        metadataDir
+      )
+    )
+
+    val lockPath = 
metadataDir.getOrElse(logDir.get).resolve(LogManager.LockFileName)
+    assertTrue(fileLocked(lockPath))
+
     raftManager.shutdown()
+
+    assertFalse(fileLocked(lockPath))
+  }
+
+  private def fileLocked(path: Path): Boolean = {
+    TestUtils.resource(FileChannel.open(path, StandardOpenOption.WRITE)) { 
channel =>
+      try {
+        Option(channel.tryLock()).foreach(_.close())
+        false
+      } catch {
+        case _: OverlappingFileLockException => true
+      }
+    }
   }
 
   @Test
@@ -140,5 +233,4 @@ class RaftManagerTest {
     assertTrue(ioThread.isThreadFailed)
     assertFalse(ioThread.isRunning)
   }
-
 }

Reply via email to