[GitHub] [kafka] chia7712 commented on a change in pull request #9967: KAFKA-12236; New meta.properties logic for KIP-500

2021-01-30 Thread GitBox


chia7712 commented on a change in pull request #9967:
URL: https://github.com/apache/kafka/pull/9967#discussion_r567254453



##
File path: core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
##
@@ -21,38 +21,203 @@ import java.io._
 import java.nio.file.{Files, NoSuchFileException}
 import java.util.Properties
 
+import kafka.common.{InconsistentBrokerMetadataException, KafkaException}
+import kafka.server.RawMetaProperties._
 import kafka.utils._
+import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.utils.Utils
 
-case class BrokerMetadata(brokerId: Int,
-  clusterId: Option[String]) {
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+object RawMetaProperties {
+  val ClusterIdKey = "cluster.id"
+  val BrokerIdKey = "broker.id"
+  val NodeIdKey = "node.id"

Review comment:
   Both "broker.id" and "node.id" are defined by ```KafkaConfig``` also. 
Should we unify them?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9967: KAFKA-12236; New meta.properties logic for KIP-500

2021-01-27 Thread GitBox


chia7712 commented on a change in pull request #9967:
URL: https://github.com/apache/kafka/pull/9967#discussion_r565786393



##
File path: core/src/main/scala/kafka/server/Server.scala
##
@@ -46,6 +46,22 @@ object Server {
 new Metrics(metricConfig, reporters, time, true, metricsContext)
   }
 
+  def initializeMetrics(
+config: KafkaConfig,
+time: Time,
+metaProps: MetaProperties

Review comment:
   It seems to me the properties in ```MetaProperties``` is duplicate to 
```KafkaConfig``` in this case. Is there any reason that we need to pass 
```MetaProperties```?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9967: KAFKA-12236; New meta.properties logic for KIP-500

2021-01-26 Thread GitBox


chia7712 commented on a change in pull request #9967:
URL: https://github.com/apache/kafka/pull/9967#discussion_r564703919



##
File path: core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
##
@@ -21,38 +21,220 @@ import java.io._
 import java.nio.file.{Files, NoSuchFileException}
 import java.util.Properties
 
+import kafka.common.InconsistentBrokerMetadataException
+import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole, ProcessRole}
+import kafka.server.RawMetaProperties._
 import kafka.utils._
+import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.utils.Utils
 
-case class BrokerMetadata(brokerId: Int,
-  clusterId: Option[String]) {
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+object RawMetaProperties {
+  val ClusterIdKey = "cluster.id"
+  val BrokerIdKey = "broker.id"
+  val ControllerIdKey = "controller.id"
+  val VersionKey = "version"
+}
+
+case class RawMetaProperties(props: Properties = new Properties()) {
+
+  def clusterId: Option[String] = {
+Option(props.getProperty(ClusterIdKey))
+  }
+
+  def clusterId_=(id: String): Unit = {
+props.setProperty(ClusterIdKey, id)
+  }
+
+  def brokerId: Option[Int] = {
+intValue(BrokerIdKey)
+  }
+
+  def brokerId_=(id: Int): Unit = {
+props.setProperty(BrokerIdKey, id.toString)
+  }
+
+  def controllerId: Option[Int] = {
+intValue(ControllerIdKey)
+  }
+
+  def controllerId_=(id: Int): Unit = {
+props.setProperty(ControllerIdKey, id.toString)
+  }
+
+  def version: Int = {
+intValue(VersionKey).getOrElse(0)
+  }
+
+  def version_=(ver: Int): Unit = {
+props.setProperty(VersionKey, ver.toString)
+  }
+
+  def requireVersion(expectedVersion: Int): Unit = {
+if (version != expectedVersion) {
+  throw new RuntimeException(s"Expected version $expectedVersion, but got 
"+
+s"version $version")
+}
+  }
+
+  private def intValue(key: String): Option[Int] = {
+try {
+  Option(props.getProperty(key)).map(Integer.parseInt)
+} catch {
+  case e: Throwable => throw new RuntimeException(s"Failed to parse $key 
property " +
+s"as an int: ${e.getMessage}")
+}
+  }
+
+  override def toString: String = {
+"RawMetaProperties(" + 
props.keySet().asScala.toList.asInstanceOf[List[String]].sorted.map {
+  key => key + "=" + props.get(key)
+}.mkString(", ") + ")"
+  }
+}
+
+object MetaProperties {
+  def parse(
+properties: RawMetaProperties,
+processRoles: Set[ProcessRole]
+  ): MetaProperties = {
+properties.requireVersion(expectedVersion = 1)
+val clusterId = requireClusterId(properties)
+
+if (processRoles.contains(BrokerRole)) {
+  require(BrokerIdKey, properties.brokerId)
+}
+
+if (processRoles.contains(ControllerRole)) {
+  require(ControllerIdKey, properties.controllerId)
+}
+
+new MetaProperties(clusterId, properties.brokerId, properties.controllerId)
+  }
+
+  def require[T](key: String, value: Option[T]): T = {
+value.getOrElse(throw new RuntimeException(s"Failed to find required 
property $key."))
+  }
+
+  def requireClusterId(properties: RawMetaProperties): Uuid = {
+val value = require(ClusterIdKey, properties.clusterId)
+try {
+  Uuid.fromString(value)
+} catch {
+  case e: Throwable => throw new RuntimeException(s"Failed to parse 
$ClusterIdKey property " +
+s"as a UUID: ${e.getMessage}")
+}
+  }
+}
+
+case class ZkMetaProperties(
+  clusterId: String,
+  brokerId: Int
+) {
+  def toProperties: Properties = {
+val properties = new RawMetaProperties()
+properties.version = 0
+properties.clusterId = clusterId
+properties.brokerId = brokerId
+properties.props
+  }
+
+  override def toString: String = {
+s"LegacyMetaProperties(brokerId=$brokerId, clusterId=$clusterId)"
+  }
+}
+
+case class MetaProperties(
+  clusterId: Uuid,
+  brokerId: Option[Int] = None,
+  controllerId: Option[Int] = None
+) {
+  def toProperties: Properties = {
+val properties = new RawMetaProperties()
+properties.version = 1
+properties.clusterId = clusterId.toString
+brokerId.foreach(properties.brokerId = _)
+controllerId.foreach(properties.controllerId = _)
+properties.props
+  }
 
   override def toString: String  = {
-s"BrokerMetadata(brokerId=$brokerId, 
clusterId=${clusterId.map(_.toString).getOrElse("None")})"
+s"MetaProperties(clusterId=$clusterId" +
+  s", brokerId=${brokerId.getOrElse("none")}" +
+  s", controllerId=${controllerId.getOrElse("none")}" +
+  ")"
+  }
+}
+
+object BrokerMetadataCheckpoint extends Logging {
+  def getBrokerMetadataAndOfflineDirs(
+logDirs: collection.Seq[String],
+ignoreMissing: Boolean
+  ): (RawMetaProperties, collection.Seq[String]) = {
+require(logDirs.nonEmpty, "Must have at least one log dir to read 
meta.properties")
+
+val brokerMetadataMap = mutable.HashMap[String, Properties]()
+