mumrah commented on a change in pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#discussion_r670506521



##########
File path: core/src/main/scala/kafka/log/LogConfig.scala
##########
@@ -479,4 +501,31 @@ object LogConfig {
     logProps.put(MessageDownConversionEnableProp, 
kafkaConfig.logMessageDownConversionEnable: java.lang.Boolean)
     logProps
   }
+
+  def shouldIgnoreMessageFormatVersion(interBrokerProtocolVersion: 
ApiVersion): Boolean =
+    interBrokerProtocolVersion >= KAFKA_3_0_IV1
+
+  class MessageFormatVersion(messageFormatVersionString: String, 
interBrokerProtocolVersionString: String) {
+    val messageFormatVersion = ApiVersion(messageFormatVersionString)
+    private val interBrokerProtocolVersion = 
ApiVersion(interBrokerProtocolVersionString)
+
+    def shouldIgnore: Boolean = 
shouldIgnoreMessageFormatVersion(interBrokerProtocolVersion)
+
+    def shouldWarn: Boolean =
+      interBrokerProtocolVersion >= KAFKA_3_0_IV1 && 
messageFormatVersion.recordVersion.precedes(RecordVersion.V2)
+
+    @nowarn("cat=deprecation")
+    def topicWarningMessage(topicName: String): String = {
+      s"Topic configuration ${LogConfig.MessageFormatVersionProp} with value 
`$messageFormatVersionString` is ignored " +
+        s"for `$topicName` because the inter-broker protocol version 
`$interBrokerProtocolVersionString` is " +
+        "greater or equal than 3.0"
+    }
+
+    @nowarn("cat=deprecation")
+    def brokerWarningMessage: String = {
+      s"Broker configuration ${KafkaConfig.LogMessageFormatVersionProp} with 
value $messageFormatVersionString is ignored " +
+        s"because the inter-broker protocol version 
`$interBrokerProtocolVersionString` is greater or equal than 3.0"

Review comment:
       nit: alignment

##########
File path: core/src/main/scala/kafka/log/LogConfig.scala
##########
@@ -479,4 +501,31 @@ object LogConfig {
     logProps.put(MessageDownConversionEnableProp, 
kafkaConfig.logMessageDownConversionEnable: java.lang.Boolean)
     logProps
   }
+
+  def shouldIgnoreMessageFormatVersion(interBrokerProtocolVersion: 
ApiVersion): Boolean =
+    interBrokerProtocolVersion >= KAFKA_3_0_IV1
+
+  class MessageFormatVersion(messageFormatVersionString: String, 
interBrokerProtocolVersionString: String) {
+    val messageFormatVersion = ApiVersion(messageFormatVersionString)
+    private val interBrokerProtocolVersion = 
ApiVersion(interBrokerProtocolVersionString)
+
+    def shouldIgnore: Boolean = 
shouldIgnoreMessageFormatVersion(interBrokerProtocolVersion)
+
+    def shouldWarn: Boolean =
+      interBrokerProtocolVersion >= KAFKA_3_0_IV1 && 
messageFormatVersion.recordVersion.precedes(RecordVersion.V2)
+
+    @nowarn("cat=deprecation")
+    def topicWarningMessage(topicName: String): String = {
+      s"Topic configuration ${LogConfig.MessageFormatVersionProp} with value 
`$messageFormatVersionString` is ignored " +
+        s"for `$topicName` because the inter-broker protocol version 
`$interBrokerProtocolVersionString` is " +
+        "greater or equal than 3.0"

Review comment:
       nit: alignment

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -253,9 +259,10 @@ class LogManager(logDirs: Seq[File],
                            hadCleanShutdown: Boolean,
                            recoveryPoints: Map[TopicPartition, Long],
                            logStartOffsets: Map[TopicPartition, Long],
+                           defaultConfig: LogConfig,
                            topicConfigOverrides: Map[String, LogConfig]): Log 
= {
     val topicPartition = Log.parseTopicPartitionName(logDir)
-    val config = topicConfigOverrides.getOrElse(topicPartition.topic, 
currentDefaultConfig)
+    val config = topicConfigOverrides.getOrElse(topicPartition.topic, 
defaultConfig)

Review comment:
       The addition of this argument here (and elsewhere in LogManager) is to 
let us go through the validation/warning logic during startup. Is that right?
   
   Are the other usages of `currentDefaultConfig` safe as-is? What happens if 
someone reconfigures the log config? Is that handled via 
`LogManager#fetchLogConfig`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to