chia7712 commented on a change in pull request #9967:
URL: https://github.com/apache/kafka/pull/9967#discussion_r564703919
##
File path: core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
##
@@ -21,38 +21,220 @@ import java.io._
import java.nio.file.{Files, NoSuchFileException}
import java.util.Properties
+import kafka.common.InconsistentBrokerMetadataException
+import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole, ProcessRole}
+import kafka.server.RawMetaProperties._
import kafka.utils._
+import org.apache.kafka.common.Uuid
import org.apache.kafka.common.utils.Utils
-case class BrokerMetadata(brokerId: Int,
- clusterId: Option[String]) {
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+object RawMetaProperties {
+ val ClusterIdKey = "cluster.id"
+ val BrokerIdKey = "broker.id"
+ val ControllerIdKey = "controller.id"
+ val VersionKey = "version"
+}
+
+case class RawMetaProperties(props: Properties = new Properties()) {
+
+ def clusterId: Option[String] = {
+Option(props.getProperty(ClusterIdKey))
+ }
+
+ def clusterId_=(id: String): Unit = {
+props.setProperty(ClusterIdKey, id)
+ }
+
+ def brokerId: Option[Int] = {
+intValue(BrokerIdKey)
+ }
+
+ def brokerId_=(id: Int): Unit = {
+props.setProperty(BrokerIdKey, id.toString)
+ }
+
+ def controllerId: Option[Int] = {
+intValue(ControllerIdKey)
+ }
+
+ def controllerId_=(id: Int): Unit = {
+props.setProperty(ControllerIdKey, id.toString)
+ }
+
+ def version: Int = {
+intValue(VersionKey).getOrElse(0)
+ }
+
+ def version_=(ver: Int): Unit = {
+props.setProperty(VersionKey, ver.toString)
+ }
+
+ def requireVersion(expectedVersion: Int): Unit = {
+if (version != expectedVersion) {
+ throw new RuntimeException(s"Expected version $expectedVersion, but got
"+
+s"version $version")
+}
+ }
+
+ private def intValue(key: String): Option[Int] = {
+try {
+ Option(props.getProperty(key)).map(Integer.parseInt)
+} catch {
+ case e: Throwable => throw new RuntimeException(s"Failed to parse $key
property " +
+s"as an int: ${e.getMessage}")
+}
+ }
+
+ override def toString: String = {
+"RawMetaProperties(" +
props.keySet().asScala.toList.asInstanceOf[List[String]].sorted.map {
+ key => key + "=" + props.get(key)
+}.mkString(", ") + ")"
+ }
+}
+
+object MetaProperties {
+ def parse(
+properties: RawMetaProperties,
+processRoles: Set[ProcessRole]
+ ): MetaProperties = {
+properties.requireVersion(expectedVersion = 1)
+val clusterId = requireClusterId(properties)
+
+if (processRoles.contains(BrokerRole)) {
+ require(BrokerIdKey, properties.brokerId)
+}
+
+if (processRoles.contains(ControllerRole)) {
+ require(ControllerIdKey, properties.controllerId)
+}
+
+new MetaProperties(clusterId, properties.brokerId, properties.controllerId)
+ }
+
+ def require[T](key: String, value: Option[T]): T = {
+value.getOrElse(throw new RuntimeException(s"Failed to find required
property $key."))
+ }
+
+ def requireClusterId(properties: RawMetaProperties): Uuid = {
+val value = require(ClusterIdKey, properties.clusterId)
+try {
+ Uuid.fromString(value)
+} catch {
+ case e: Throwable => throw new RuntimeException(s"Failed to parse
$ClusterIdKey property " +
+s"as a UUID: ${e.getMessage}")
+}
+ }
+}
+
+case class ZkMetaProperties(
+ clusterId: String,
+ brokerId: Int
+) {
+ def toProperties: Properties = {
+val properties = new RawMetaProperties()
+properties.version = 0
+properties.clusterId = clusterId
+properties.brokerId = brokerId
+properties.props
+ }
+
+ override def toString: String = {
+s"LegacyMetaProperties(brokerId=$brokerId, clusterId=$clusterId)"
+ }
+}
+
+case class MetaProperties(
+ clusterId: Uuid,
+ brokerId: Option[Int] = None,
+ controllerId: Option[Int] = None
+) {
+ def toProperties: Properties = {
+val properties = new RawMetaProperties()
+properties.version = 1
+properties.clusterId = clusterId.toString
+brokerId.foreach(properties.brokerId = _)
+controllerId.foreach(properties.controllerId = _)
+properties.props
+ }
override def toString: String = {
-s"BrokerMetadata(brokerId=$brokerId,
clusterId=${clusterId.map(_.toString).getOrElse("None")})"
+s"MetaProperties(clusterId=$clusterId" +
+ s", brokerId=${brokerId.getOrElse("none")}" +
+ s", controllerId=${controllerId.getOrElse("none")}" +
+ ")"
+ }
+}
+
+object BrokerMetadataCheckpoint extends Logging {
+ def getBrokerMetadataAndOfflineDirs(
+logDirs: collection.Seq[String],
+ignoreMissing: Boolean
+ ): (RawMetaProperties, collection.Seq[String]) = {
+require(logDirs.nonEmpty, "Must have at least one log dir to read
meta.properties")
+
+val brokerMetadataMap = mutable.HashMap[String, Properties]()
+