hachikuji commented on code in PR #12250:
URL: https://github.com/apache/kafka/pull/12250#discussion_r892713477


##########
core/src/main/scala/kafka/server/ReplicaFetcherManager.scala:
##########
@@ -21,13 +21,15 @@ import kafka.cluster.BrokerEndPoint
 import org.apache.kafka.clients.FetchSessionHandler
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.server.common.MetadataVersion
 
 class ReplicaFetcherManager(brokerConfig: KafkaConfig,
                             protected val replicaManager: ReplicaManager,
                             metrics: Metrics,
                             time: Time,
                             threadNamePrefix: Option[String] = None,
-                            quotaManager: ReplicationQuotaManager)
+                            quotaManager: ReplicationQuotaManager,
+                            val metadataVersionSupplier: () => MetadataVersion)

Review Comment:
   nit: why is this a `val`?



##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala:
##########
@@ -132,10 +131,7 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
       // Publish the new metadata image to the metadata cache.
       metadataCache.setImage(newImage)
 
-      val metadataVersionLogMsg = newImage.features().metadataVersion() match {
-        case MetadataVersion.UNINITIALIZED => "un-initialized metadata.version"

Review Comment:
   Does broker startup have any guarantees with respect to the metadata version 
in the log? For example, does the controller require the latest metadata 
feature record to be consumed before a broker can be unfenced? I'm mainly 
trying to understand if we might revert to an older version and what the 
consequences of that would be.



##########
core/src/main/scala/kafka/server/ReplicaFetcherThread.scala:
##########
@@ -135,7 +137,7 @@ class ReplicaFetcherThread(name: String,
 
   def maybeWarnIfOversizedRecords(records: MemoryRecords, topicPartition: 
TopicPartition): Unit = {
     // oversized messages don't cause replication to fail from fetch request 
version 3 (KIP-74)
-    if (brokerConfig.fetchRequestVersion <= 2 && records.sizeInBytes > 0 && 
records.validBytes <= 0)
+    if (metadataVersionSupplier().fetchRequestVersion() <= 2 && 
records.sizeInBytes > 0 && records.validBytes <= 0)

Review Comment:
   nit: are the parenthesis required for `fetchRequestVersion`?



##########
core/src/main/scala/kafka/server/KafkaRaftServer.scala:
##########
@@ -185,7 +185,7 @@ object KafkaRaftServer {
     val bootstrapMetadata = if 
(config.originals.containsKey(KafkaConfig.InterBrokerProtocolVersionProp)) {
       BootstrapMetadata.load(Paths.get(config.metadataLogDir), 
config.interBrokerProtocolVersion)
     } else {
-      BootstrapMetadata.load(Paths.get(config.metadataLogDir), 
MetadataVersion.IBP_3_0_IV0)
+      BootstrapMetadata.load(Paths.get(config.metadataLogDir), 
MetadataVersion.MINIMUM_KRAFT_VERSION)

Review Comment:
   nit: the use of "preview" in some of the comments and code seems unnecessary 
and likely to cause confusion. Could we just say older versions? Instead of 
`fallbackPreviewVersion`, maybe just `fallbackVersion`?



##########
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##########
@@ -43,7 +43,6 @@
  * released version, they can use "0.10.0" when upgrading to the 0.10.0 
release.
  */
 public enum MetadataVersion {
-    UNINITIALIZED(-1, "0.0", ""),

Review Comment:
   nit: There is a comment below about omitting UNINITIALIZED that we need to 
update.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to