[ 
https://issues.apache.org/jira/browse/KAFKA-7030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511971#comment-16511971
 ] 

ASF GitHub Bot commented on KAFKA-7030:
---------------------------------------

hachikuji closed pull request #5192: KAFKA-7030: Add configuration to disable 
message down-conversion (KIP-283)
URL: https://github.com/apache/kafka/pull/5192
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java 
b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
index d6b70032626..fb2208c0328 100755
--- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
@@ -165,4 +165,11 @@
         "the timestamp when a broker receives a message and the timestamp 
specified in the message. If " +
         "message.timestamp.type=CreateTime, a message will be rejected if the 
difference in timestamp " +
         "exceeds this threshold. This configuration is ignored if 
message.timestamp.type=LogAppendTime.";
+
+    public static final String MESSAGE_DOWNCONVERSION_ENABLE_CONFIG = 
"message.downconversion.enable";
+    public static final String MESSAGE_DOWNCONVERSION_ENABLE_DOC = "This 
configuration controls whether " +
+        "down-conversion of message formats is enabled to satisfy consume 
requests. When set to <code>false</code>, " +
+        "broker will not perform down-conversion for consumers expecting an 
older message format. The broker responds " +
+        "with <code>UNSUPPORTED_VERSION</code> error for consume requests from 
such older clients. This configuration" +
+        "does not apply to any message format conversion that might be 
required for replication to followers.";
 }
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala 
b/core/src/main/scala/kafka/log/LogConfig.scala
index 158209a1fc0..59269fe18d3 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -63,6 +63,7 @@ object Defaults {
   val LeaderReplicationThrottledReplicas = Collections.emptyList[String]()
   val FollowerReplicationThrottledReplicas = Collections.emptyList[String]()
   val MaxIdMapSnapshots = kafka.server.Defaults.MaxIdMapSnapshots
+  val MessageDownConversionEnable = 
kafka.server.Defaults.MessageDownConversionEnable
 }
 
 case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: 
Set[String] = Set.empty)
@@ -96,6 +97,7 @@ case class LogConfig(props: java.util.Map[_, _], 
overriddenConfigs: Set[String]
   val messageTimestampDifferenceMaxMs = 
getLong(LogConfig.MessageTimestampDifferenceMaxMsProp).longValue
   val LeaderReplicationThrottledReplicas = 
getList(LogConfig.LeaderReplicationThrottledReplicasProp)
   val FollowerReplicationThrottledReplicas = 
getList(LogConfig.FollowerReplicationThrottledReplicasProp)
+  val messageDownConversionEnable = 
getBoolean(LogConfig.MessageDownConversionEnableProp)
 
   def randomSegmentJitter: Long =
     if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % 
math.min(segmentJitterMs, segmentMs)
@@ -131,6 +133,7 @@ object LogConfig {
   val MessageFormatVersionProp = TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG
   val MessageTimestampTypeProp = TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG
   val MessageTimestampDifferenceMaxMsProp = 
TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG
+  val MessageDownConversionEnableProp = 
TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG
 
   // Leave these out of TopicConfig for now as they are replication quota 
configs
   val LeaderReplicationThrottledReplicasProp = 
"leader.replication.throttled.replicas"
@@ -158,6 +161,7 @@ object LogConfig {
   val MessageFormatVersionDoc = TopicConfig.MESSAGE_FORMAT_VERSION_DOC
   val MessageTimestampTypeDoc = TopicConfig.MESSAGE_TIMESTAMP_TYPE_DOC
   val MessageTimestampDifferenceMaxMsDoc = 
TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC
+  val MessageDownConversionEnableDoc = 
TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_DOC
 
   val LeaderReplicationThrottledReplicasDoc = "A list of replicas for which 
log replication should be throttled on " +
     "the leader side. The list should describe a set of replicas in the form " 
+
@@ -262,6 +266,8 @@ object LogConfig {
         LeaderReplicationThrottledReplicasDoc, 
LeaderReplicationThrottledReplicasProp)
       .define(FollowerReplicationThrottledReplicasProp, LIST, 
Defaults.FollowerReplicationThrottledReplicas, ThrottledReplicaListValidator, 
MEDIUM,
         FollowerReplicationThrottledReplicasDoc, 
FollowerReplicationThrottledReplicasProp)
+      .define(MessageDownConversionEnableProp, BOOLEAN, 
Defaults.MessageDownConversionEnable, LOW,
+        MessageDownConversionEnableDoc, 
KafkaConfig.LogMessageDownConversionEnableProp)
   }
 
   def apply(): LogConfig = LogConfig(new Properties())
@@ -325,7 +331,8 @@ object LogConfig {
     PreAllocateEnableProp -> KafkaConfig.LogPreAllocateProp,
     MessageFormatVersionProp -> KafkaConfig.LogMessageFormatVersionProp,
     MessageTimestampTypeProp -> KafkaConfig.LogMessageTimestampTypeProp,
-    MessageTimestampDifferenceMaxMsProp -> 
KafkaConfig.LogMessageTimestampDifferenceMaxMsProp
+    MessageTimestampDifferenceMaxMsProp -> 
KafkaConfig.LogMessageTimestampDifferenceMaxMsProp,
+    MessageDownConversionEnableProp -> 
KafkaConfig.LogMessageDownConversionEnableProp
   )
 
 }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index ae7845b5166..ae80029e79c 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -43,7 +43,6 @@ import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, 
TRANSACTION_STATE_TOPIC_NAME, isInternal}
-import org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.{ListenerName, Send}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -507,44 +506,41 @@ class KafkaApis(val requestChannel: RequestChannel,
           fetchRequest.toForget(),
           fetchRequest.isFromFollower())
 
+    def errorResponse[T >: MemoryRecords <: BaseRecords](error: Errors): 
FetchResponse.PartitionData[T] = {
+      new FetchResponse.PartitionData[T](error, 
FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
+        FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
+    }
+
     val erroneous = mutable.ArrayBuffer[(TopicPartition, 
FetchResponse.PartitionData[Records])]()
     val interesting = mutable.ArrayBuffer[(TopicPartition, 
FetchRequest.PartitionData)]()
     if (fetchRequest.isFromFollower()) {
       // The follower must have ClusterAction on ClusterResource in order to 
fetch partition data.
       if (authorize(request.session, ClusterAction, Resource.ClusterResource)) 
{
-        fetchContext.foreachPartition((topicPartition, data) => {
-          if (!metadataCache.contains(topicPartition)) {
-            erroneous += topicPartition -> new 
FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
-              FetchResponse.INVALID_HIGHWATERMARK, 
FetchResponse.INVALID_LAST_STABLE_OFFSET,
-              FetchResponse.INVALID_LOG_START_OFFSET, null, 
MemoryRecords.EMPTY)
-          } else {
+        fetchContext.foreachPartition { (topicPartition, data) =>
+          if (!metadataCache.contains(topicPartition))
+            erroneous += topicPartition -> 
errorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
+          else
             interesting += (topicPartition -> data)
-          }
-        })
+        }
       } else {
-        fetchContext.foreachPartition((part, _) => {
-          erroneous += part -> new 
FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
-            FetchResponse.INVALID_HIGHWATERMARK, 
FetchResponse.INVALID_LAST_STABLE_OFFSET,
-            FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
-        })
+        fetchContext.foreachPartition { (part, _) =>
+          erroneous += part -> errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
+        }
       }
     } else {
       // Regular Kafka consumers need READ permission on each partition they 
are fetching.
-      fetchContext.foreachPartition((topicPartition, data) => {
+      fetchContext.foreachPartition { (topicPartition, data) =>
         if (!authorize(request.session, Read, Resource(Topic, 
topicPartition.topic, LITERAL)))
-          erroneous += topicPartition -> new 
FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
-            FetchResponse.INVALID_HIGHWATERMARK, 
FetchResponse.INVALID_LAST_STABLE_OFFSET,
-            FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
+          erroneous += topicPartition -> 
errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
         else if (!metadataCache.contains(topicPartition))
-          erroneous += topicPartition -> new 
FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
-            FetchResponse.INVALID_HIGHWATERMARK, 
FetchResponse.INVALID_LAST_STABLE_OFFSET,
-            FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
+          erroneous += topicPartition -> 
errorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
         else
           interesting += (topicPartition -> data)
-      })
+      }
     }
 
-    def convertRecords(tp: TopicPartition, unconvertedRecords: Records): 
BaseRecords = {
+    def maybeConvertFetchedData(tp: TopicPartition,
+                                partitionData: 
FetchResponse.PartitionData[Records]): FetchResponse.PartitionData[BaseRecords] 
= {
       // Down-conversion of the fetched records is needed when the stored 
magic version is
       // greater than that supported by the client (as indicated by the fetch 
request version). If the
       // configured magic version for the topic is less than or equal to that 
supported by the version of the
@@ -552,8 +548,10 @@ class KafkaApis(val requestChannel: RequestChannel,
       // know it must be supported. However, if the magic version is changed 
from a higher version back to a
       // lower version, this check will no longer be valid and we will fail to 
down-convert the messages
       // which were written in the new format prior to the version downgrade.
-      replicaManager.getMagic(tp).flatMap { magic =>
-        val downConvertMagic = {
+      val unconvertedRecords = partitionData.records
+      val logConfig = replicaManager.getLogConfig(tp)
+      val downConvertMagic =
+        logConfig.map(_.messageFormatVersion.recordVersion.value).flatMap { 
magic =>
           if (magic > RecordBatch.MAGIC_VALUE_V0 && versionId <= 1 && 
!unconvertedRecords.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0))
             Some(RecordBatch.MAGIC_VALUE_V0)
           else if (magic > RecordBatch.MAGIC_VALUE_V1 && versionId <= 3 && 
!unconvertedRecords.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1))
@@ -562,28 +560,36 @@ class KafkaApis(val requestChannel: RequestChannel,
             None
         }
 
-        downConvertMagic.map { magic =>
-          trace(s"Down converting records from partition $tp to message format 
version $magic for fetch request from $clientId")
-
-          // Because down-conversion is extremely memory intensive, we want to 
try and delay the down-conversion as much
-          // as possible. With KIP-283, we have the ability to lazily 
down-convert in a chunked manner. The lazy, chunked
-          // down-conversion always guarantees that at least one batch of 
messages is down-converted and sent out to the
-          // client.
-          new LazyDownConversionRecords(tp, unconvertedRecords, magic, 
fetchContext.getFetchOffset(tp).get, time)
-        }
-      }.getOrElse(unconvertedRecords)
+      // For fetch requests from clients, check if down-conversion is disabled 
for the particular partition
+      if (downConvertMagic.isDefined && !fetchRequest.isFromFollower && 
!logConfig.forall(_.messageDownConversionEnable)) {
+        trace(s"Conversion to message format ${downConvertMagic.get} is 
disabled for partition $tp. Sending unsupported version response to $clientId.")
+        errorResponse(Errors.UNSUPPORTED_VERSION)
+      } else {
+        val convertedRecords =
+          downConvertMagic.map { magic =>
+            trace(s"Down converting records from partition $tp to message 
format version $magic for fetch request from $clientId")
+            // Because down-conversion is extremely memory intensive, we want 
to try and delay the down-conversion as much
+            // as possible. With KIP-283, we have the ability to lazily 
down-convert in a chunked manner. The lazy, chunked
+            // down-conversion always guarantees that at least one batch of 
messages is down-converted and sent out to the
+            // client.
+            new LazyDownConversionRecords(tp, unconvertedRecords, magic, 
fetchContext.getFetchOffset(tp).get, time)
+          }.getOrElse(unconvertedRecords)
+        new FetchResponse.PartitionData[BaseRecords](partitionData.error, 
partitionData.highWatermark,
+          FetchResponse.INVALID_LAST_STABLE_OFFSET, 
partitionData.logStartOffset, partitionData.abortedTransactions,
+          convertedRecords)
+      }
     }
 
     // the callback for process a fetch response, invoked before throttling
     def processResponseCallback(responsePartitionData: Seq[(TopicPartition, 
FetchPartitionData)]): Unit = {
       val partitions = new util.LinkedHashMap[TopicPartition, 
FetchResponse.PartitionData[Records]]
-      responsePartitionData.foreach{ case (tp, data) =>
+      responsePartitionData.foreach { case (tp, data) =>
         val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull
         val lastStableOffset = 
data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET)
         partitions.put(tp, new FetchResponse.PartitionData(data.error, 
data.highWatermark, lastStableOffset,
           data.logStartOffset, abortedTransactions, data.records))
       }
-      erroneous.foreach{case (tp, data) => partitions.put(tp, data)}
+      erroneous.foreach { case (tp, data) => partitions.put(tp, data) }
 
       // When this callback is triggered, the remote API call has completed.
       // Record time before any byte-rate throttling.
@@ -598,14 +604,10 @@ class KafkaApis(val requestChannel: RequestChannel,
           if (unconvertedPartitionData.error != Errors.NONE)
             debug(s"Fetch request with correlation id 
${request.header.correlationId} from client $clientId " +
               s"on partition $tp failed due to 
${unconvertedPartitionData.error.exceptionName}")
-          val convertedRecords = convertRecords(tp, 
unconvertedPartitionData.records)
-          val convertedPartitionData = new 
FetchResponse.PartitionData[BaseRecords](unconvertedPartitionData.error,
-            unconvertedPartitionData.highWatermark, 
FetchResponse.INVALID_LAST_STABLE_OFFSET, 
unconvertedPartitionData.logStartOffset,
-            unconvertedPartitionData.abortedTransactions, convertedRecords)
-          convertedData.put(tp, convertedPartitionData)
+          convertedData.put(tp, maybeConvertFetchedData(tp, 
unconvertedPartitionData))
         }
 
-        // Prepare fetch resopnse from converted data
+        // Prepare fetch response from converted data
         val response = new FetchResponse(unconvertedFetchResponse.error(), 
convertedData, throttleTimeMs,
           unconvertedFetchResponse.sessionId())
         response.responseData.asScala.foreach { case (topicPartition, data) =>
@@ -1455,7 +1457,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             duplicateTopics.keySet.map((_, new 
ApiError(Errors.INVALID_REQUEST, errorMessage))).toMap
           } else Map.empty
 
-        val unauthorizedTopicsResults = unauthorizedTopics.keySet.map(_ -> new 
ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null)) 
+        val unauthorizedTopicsResults = unauthorizedTopics.keySet.map(_ -> new 
ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null))
         val completeResults = results ++ duplicatedTopicsResults ++ 
unauthorizedTopicsResults
         sendResponseCallback(completeResults)
       }
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 19bb8074958..5a021919dd6 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -116,6 +116,7 @@ object Defaults {
   val NumRecoveryThreadsPerDataDir = 1
   val AutoCreateTopicsEnable = true
   val MinInSyncReplicas = 1
+  val MessageDownConversionEnable = true
 
   /** ********* Replication configuration ***********/
   val ControllerSocketTimeoutMs = RequestTimeoutMs
@@ -330,6 +331,7 @@ object KafkaConfig {
   val MinInSyncReplicasProp = "min.insync.replicas"
   val CreateTopicPolicyClassNameProp = "create.topic.policy.class.name"
   val AlterConfigPolicyClassNameProp = "alter.config.policy.class.name"
+  val LogMessageDownConversionEnableProp = LogConfigPrefix + 
"message.downconversion.enable"
   /** ********* Replication configuration ***********/
   val ControllerSocketTimeoutMsProp = "controller.socket.timeout.ms"
   val DefaultReplicationFactorProp = "default.replication.factor"
@@ -599,6 +601,7 @@ object KafkaConfig {
     "implement the 
<code>org.apache.kafka.server.policy.CreateTopicPolicy</code> interface."
   val AlterConfigPolicyClassNameDoc = "The alter configs policy class that 
should be used for validation. The class should " +
     "implement the 
<code>org.apache.kafka.server.policy.AlterConfigPolicy</code> interface."
+  val LogMessageDownConversionEnableDoc = 
TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_DOC;
 
   /** ********* Replication configuration ***********/
   val ControllerSocketTimeoutMsDoc = "The socket timeout for 
controller-to-broker channels"
@@ -862,6 +865,7 @@ object KafkaConfig {
       .define(LogMessageTimestampDifferenceMaxMsProp, LONG, 
Defaults.LogMessageTimestampDifferenceMaxMs, MEDIUM, 
LogMessageTimestampDifferenceMaxMsDoc)
       .define(CreateTopicPolicyClassNameProp, CLASS, null, LOW, 
CreateTopicPolicyClassNameDoc)
       .define(AlterConfigPolicyClassNameProp, CLASS, null, LOW, 
AlterConfigPolicyClassNameDoc)
+      .define(LogMessageDownConversionEnableProp, BOOLEAN, 
Defaults.MessageDownConversionEnable, LOW, LogMessageDownConversionEnableDoc)
 
       /** ********* Replication configuration ***********/
       .define(ControllerSocketTimeoutMsProp, INT, 
Defaults.ControllerSocketTimeoutMs, MEDIUM, ControllerSocketTimeoutMsDoc)
@@ -1135,6 +1139,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
   val logMessageFormatVersion = ApiVersion(logMessageFormatVersionString)
   def logMessageTimestampType = 
TimestampType.forName(getString(KafkaConfig.LogMessageTimestampTypeProp))
   def logMessageTimestampDifferenceMaxMs: Long = 
getLong(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp)
+  def logMessageDownConversionEnable: Boolean = 
getBoolean(KafkaConfig.LogMessageDownConversionEnableProp)
 
   /** ********* Replication configuration ***********/
   val controllerSocketTimeoutMs: Int = 
getInt(KafkaConfig.ControllerSocketTimeoutMsProp)
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 57bca697437..f73ede619dd 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -79,6 +79,7 @@ object KafkaServer {
     logProps.put(LogConfig.MessageFormatVersionProp, 
kafkaConfig.logMessageFormatVersion.version)
     logProps.put(LogConfig.MessageTimestampTypeProp, 
kafkaConfig.logMessageTimestampType.name)
     logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, 
kafkaConfig.logMessageTimestampDifferenceMaxMs: java.lang.Long)
+    logProps.put(LogConfig.MessageDownConversionEnableProp, 
kafkaConfig.logMessageDownConversionEnable: java.lang.Boolean)
     logProps
   }
 
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 24f3235570f..965595b2c2e 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -25,7 +25,7 @@ import com.yammer.metrics.core.Gauge
 import kafka.api._
 import kafka.cluster.{BrokerEndPoint, Partition, Replica}
 import kafka.controller.{KafkaController, StateChangeLogger}
-import kafka.log.{Log, LogAppendInfo, LogManager}
+import kafka.log.{Log, LogAppendInfo, LogConfig, LogManager}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
 import kafka.server.checkpoints.OffsetCheckpointFile
@@ -995,8 +995,9 @@ class ReplicaManager(val config: KafkaConfig,
     quota.isThrottled(topicPartition) && quota.isQuotaExceeded && 
!isReplicaInSync
   }
 
-  def getMagic(topicPartition: TopicPartition): Option[Byte] =
-    
getReplica(topicPartition).flatMap(_.log.map(_.config.messageFormatVersion.recordVersion.value))
+  def getLogConfig(topicPartition: TopicPartition): Option[LogConfig] = 
getReplica(topicPartition).flatMap(_.log.map(_.config))
+
+  def getMagic(topicPartition: TopicPartition): Option[Byte] = 
getLogConfig(topicPartition).map(_.messageFormatVersion.recordVersion.value)
 
   def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: 
UpdateMetadataRequest) : Seq[TopicPartition] =  {
     replicaStateChangeLock synchronized {
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 45b3fdc74bd..69ca31703ef 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -369,6 +369,7 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
     props.put(KafkaConfig.LogPreAllocateProp, true.toString)
     props.put(KafkaConfig.LogMessageTimestampTypeProp, 
TimestampType.LOG_APPEND_TIME.toString)
     props.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, "1000")
+    props.put(KafkaConfig.LogMessageDownConversionEnableProp, "false")
     reconfigureServers(props, perBrokerConfig = false, 
(KafkaConfig.LogSegmentBytesProp, "4000"))
 
     // Verify that all broker defaults have been updated
diff --git 
a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
 
b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
new file mode 100644
index 00000000000..e5ef9858cf7
--- /dev/null
+++ 
b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unit.kafka.server
+
+import java.util
+import java.util.Properties
+
+import kafka.log.LogConfig
+import kafka.server.{BaseRequestTest, KafkaConfig}
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
+import org.apache.kafka.common.serialization.StringSerializer
+import org.junit.Assert._
+import org.junit.Test
+
+class FetchRequestDownConversionConfigTest extends BaseRequestTest {
+  private var producer: KafkaProducer[String, String] = null
+  override def numBrokers: Int = 1
+
+  override def setUp(): Unit = {
+    super.setUp()
+    initProducer()
+  }
+
+  override def tearDown(): Unit = {
+    if (producer != null)
+      producer.close()
+    super.tearDown()
+  }
+
+  override protected def propertyOverrides(properties: Properties): Unit = {
+    super.propertyOverrides(properties)
+    properties.put(KafkaConfig.LogMessageDownConversionEnableProp, "false")
+  }
+
+  private def initProducer(): Unit = {
+    producer = 
TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers),
+      retries = 5, keySerializer = new StringSerializer, valueSerializer = new 
StringSerializer)
+  }
+
+  private def createTopics(numTopics: Int, numPartitions: Int,
+                           configs: Map[String, String] = Map.empty, 
topicSuffixStart: Int = 0): Map[TopicPartition, Int] = {
+    val topics = (0 until numTopics).map(t => s"topic${t + topicSuffixStart}")
+    val topicConfig = new Properties
+    topicConfig.setProperty(LogConfig.MinInSyncReplicasProp, 1.toString)
+    configs.foreach { case (k, v) => topicConfig.setProperty(k, v) }
+    topics.flatMap { topic =>
+      val partitionToLeader = createTopic(topic, numPartitions = 
numPartitions, replicationFactor = 1,
+        topicConfig = topicConfig)
+      partitionToLeader.map { case (partition, leader) => new 
TopicPartition(topic, partition) -> leader }
+    }.toMap
+  }
+
+  private def createPartitionMap(maxPartitionBytes: Int, topicPartitions: 
Seq[TopicPartition],
+                                 offsetMap: Map[TopicPartition, Long] = 
Map.empty): util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] = {
+    val partitionMap = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
+    topicPartitions.foreach { tp =>
+      partitionMap.put(tp, new 
FetchRequest.PartitionData(offsetMap.getOrElse(tp, 0), 0L, maxPartitionBytes))
+    }
+    partitionMap
+  }
+
+  private def sendFetchRequest(leaderId: Int, request: FetchRequest): 
FetchResponse[MemoryRecords] = {
+    val response = connectAndSend(request, ApiKeys.FETCH, destination = 
brokerSocketServer(leaderId))
+    FetchResponse.parse(response, request.version)
+  }
+
+  /**
+   * Tests that fetch request that require down-conversion returns with an 
error response when down-conversion is disabled on broker.
+   */
+  @Test
+  def testV1FetchWithDownConversionDisabled(): Unit = {
+    val topicMap = createTopics(numTopics = 5, numPartitions = 1)
+    val topicPartitions = topicMap.keySet.toSeq
+    topicPartitions.foreach(tp => producer.send(new ProducerRecord(tp.topic(), 
"key", "value")).get())
+    val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, 
createPartitionMap(1024,
+      topicPartitions)).build(1)
+    val fetchResponse = sendFetchRequest(topicMap.head._2, fetchRequest)
+    topicPartitions.foreach(tp => assertEquals(Errors.UNSUPPORTED_VERSION, 
fetchResponse.responseData().get(tp).error))
+  }
+
+  /**
+   * Tests that "message.downconversion.enable" has no effect when 
down-conversion is not required.
+   */
+  @Test
+  def testLatestFetchWithDownConversionDisabled(): Unit = {
+    val topicMap = createTopics(numTopics = 5, numPartitions = 1)
+    val topicPartitions = topicMap.keySet.toSeq
+    topicPartitions.foreach(tp => producer.send(new ProducerRecord(tp.topic(), 
"key", "value")).get())
+    val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, 
createPartitionMap(1024,
+      topicPartitions)).build()
+    val fetchResponse = sendFetchRequest(topicMap.head._2, fetchRequest)
+    topicPartitions.foreach(tp => assertEquals(Errors.NONE, 
fetchResponse.responseData().get(tp).error))
+  }
+
+  /**
+   * Tests that "message.downconversion.enable" can be set at topic level, and 
its configuration is obeyed for client
+   * fetch requests.
+   */
+  @Test
+  def testV1FetchWithTopicLevelOverrides(): Unit = {
+    // create topics with default down-conversion configuration (i.e. 
conversion disabled)
+    val conversionDisabledTopicsMap = createTopics(numTopics = 5, 
numPartitions = 1, topicSuffixStart = 0)
+    val conversionDisabledTopicPartitions = 
conversionDisabledTopicsMap.keySet.toSeq
+
+    // create topics with down-conversion configuration enabled
+    val topicConfig = Map(LogConfig.MessageDownConversionEnableProp -> "true")
+    val conversionEnabledTopicsMap = createTopics(numTopics = 5, numPartitions 
= 1, topicConfig, topicSuffixStart = 5)
+    val conversionEnabledTopicPartitions = 
conversionEnabledTopicsMap.keySet.toSeq
+
+    val allTopics = conversionDisabledTopicPartitions ++ 
conversionEnabledTopicPartitions
+    val leaderId = conversionDisabledTopicsMap.head._2
+
+    allTopics.foreach(tp => producer.send(new ProducerRecord(tp.topic(), 
"key", "value")).get())
+    val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, 
createPartitionMap(1024,
+      allTopics)).build(1)
+    val fetchResponse = sendFetchRequest(leaderId, fetchRequest)
+
+    conversionDisabledTopicPartitions.foreach(tp => 
assertEquals(Errors.UNSUPPORTED_VERSION, 
fetchResponse.responseData().get(tp).error))
+    conversionEnabledTopicPartitions.foreach(tp => assertEquals(Errors.NONE, 
fetchResponse.responseData().get(tp).error))
+  }
+
+  /**
+   * Tests that "message.downconversion.enable" has no effect on fetch 
requests from replicas.
+   */
+  @Test
+  def testV1FetchFromReplica(): Unit = {
+    // create topics with default down-conversion configuration (i.e. 
conversion disabled)
+    val conversionDisabledTopicsMap = createTopics(numTopics = 5, 
numPartitions = 1, topicSuffixStart = 0)
+    val conversionDisabledTopicPartitions = 
conversionDisabledTopicsMap.keySet.toSeq
+
+    // create topics with down-conversion configuration enabled
+    val topicConfig = Map(LogConfig.MessageDownConversionEnableProp -> "true")
+    val conversionEnabledTopicsMap = createTopics(numTopics = 5, numPartitions 
= 1, topicConfig, topicSuffixStart = 5)
+    val conversionEnabledTopicPartitions = 
conversionEnabledTopicsMap.keySet.toSeq
+
+    val allTopicPartitions = conversionDisabledTopicPartitions ++ 
conversionEnabledTopicPartitions
+    val leaderId = conversionDisabledTopicsMap.head._2
+
+    allTopicPartitions.foreach(tp => producer.send(new 
ProducerRecord(tp.topic(), "key", "value")).get())
+    val fetchRequest = FetchRequest.Builder.forReplica(1, 1, Int.MaxValue, 0,
+      createPartitionMap(1024, allTopicPartitions)).build()
+    val fetchResponse = sendFetchRequest(leaderId, fetchRequest)
+
+    allTopicPartitions.foreach(tp => assertEquals(Errors.NONE, 
fetchResponse.responseData().get(tp).error))
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 424b8c79fe4..1ba388ef4d2 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -426,7 +426,7 @@ class FetchRequestTest extends BaseRequestTest {
   }
 
   private def createTopics(numTopics: Int, numPartitions: Int, configs: 
Map[String, String] = Map.empty): Map[TopicPartition, Int] = {
-    val topics = (0 until numPartitions).map(t => s"topic$t")
+    val topics = (0 until numTopics).map(t => s"topic$t")
     val topicConfig = new Properties
     topicConfig.setProperty(LogConfig.MinInSyncReplicasProp, 2.toString)
     configs.foreach { case (k, v) => topicConfig.setProperty(k, v) }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add configuration to disable message down-conversion
> ----------------------------------------------------
>
>                 Key: KAFKA-7030
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7030
>             Project: Kafka
>          Issue Type: Sub-task
>            Reporter: Dhruvil Shah
>            Assignee: Dhruvil Shah
>            Priority: Major
>             Fix For: 2.0.0
>
>
> Add configuration to disable message down-conversion as described in 
> [KIP-283|https://cwiki.apache.org/confluence/display/KAFKA/KIP-283%3A+Efficient+Memory+Usage+for+Down-Conversion].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to