This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new b1c38e6  KAFKA-7920; Do not permit zstd produce requests until IBP is 
updated to 2.1 (#6256)
b1c38e6 is described below

commit b1c38e663c5ef4540fb7abd2bcdb04e81d7e0f91
Author: Lee Dongjin <dong...@apache.org>
AuthorDate: Thu Feb 21 01:44:04 2019 +0900

    KAFKA-7920; Do not permit zstd produce requests until IBP is updated to 2.1 
(#6256)
    
    Fail produce requests using zstd until the inter.broker.protocol.version is 
large enough that replicas are ensured to support it. Otherwise, followers 
receive the `UNSUPPORTED_COMPRESSION_TYPE` when fetching zstd data and ISRs 
shrink.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../kafka/common/requests/ProduceRequest.java      |   2 +-
 core/src/main/scala/kafka/cluster/Partition.scala  |   8 +-
 core/src/main/scala/kafka/log/Log.scala            |  16 ++-
 core/src/main/scala/kafka/log/LogValidator.scala   |  18 +--
 core/src/main/scala/kafka/server/KafkaConfig.scala |   9 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |   1 +
 .../scala/unit/kafka/cluster/PartitionTest.scala   |   5 +-
 .../scala/unit/kafka/log/LogValidatorTest.scala    | 136 +++++++++++++++------
 .../server/AbstractCreateTopicsRequestTest.scala   |   2 +-
 .../kafka/server/ReplicaFetcherThreadTest.scala    |   7 +-
 10 files changed, 141 insertions(+), 63 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index a961d47..ad23f3f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -402,7 +402,7 @@ public class ProduceRequest extends AbstractRequest {
                 throw new InvalidRecordException("Produce requests with 
version " + version + " are only allowed to " +
                     "contain record batches with magic version 2");
             if (version < 7 && entry.compressionType() == 
CompressionType.ZSTD) {
-                throw new UnsupportedCompressionTypeException("Produce 
requests with version " + version + " are note allowed to " +
+                throw new UnsupportedCompressionTypeException("Produce 
requests with version " + version + " are not allowed to " +
                     "use ZStandard compression");
             }
 
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 31c3668..4cedd5e 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -20,7 +20,7 @@ import java.util.Optional
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
 import com.yammer.metrics.core.Gauge
-import kafka.api.{LeaderAndIsr, Request}
+import kafka.api.{ApiVersion, LeaderAndIsr, Request}
 import kafka.common.UnexpectedAppendOffsetException
 import kafka.controller.KafkaController
 import kafka.log._
@@ -48,6 +48,7 @@ object Partition {
     new Partition(topicPartition,
       isOffline = false,
       replicaLagTimeMaxMs = replicaManager.config.replicaLagTimeMaxMs,
+      interBrokerProtocolVersion = 
replicaManager.config.interBrokerProtocolVersion,
       localBrokerId = replicaManager.config.brokerId,
       time = time,
       replicaManager = replicaManager,
@@ -62,6 +63,7 @@ object Partition {
 class Partition(val topicPartition: TopicPartition,
                 val isOffline: Boolean,
                 private val replicaLagTimeMaxMs: Long,
+                private val interBrokerProtocolVersion: ApiVersion,
                 private val localBrokerId: Int,
                 private val time: Time,
                 private val replicaManager: ReplicaManager,
@@ -738,7 +740,9 @@ class Partition(val topicPartition: TopicPartition,
               s"is insufficient to satisfy the min.isr requirement of $minIsr 
for partition $topicPartition")
           }
 
-          val info = log.appendAsLeader(records, leaderEpoch = 
this.leaderEpoch, isFromClient)
+          val info = log.appendAsLeader(records, leaderEpoch = 
this.leaderEpoch, isFromClient,
+            interBrokerProtocolVersion)
+
           // we may need to increment high watermark since ISR could be down 
to 1
           (info, maybeIncrementLeaderHW(leaderReplica))
 
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index 234913d..b56b26f 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -27,7 +27,7 @@ import java.util.concurrent.{ConcurrentNavigableMap, 
ConcurrentSkipListMap, Time
 import java.util.regex.Pattern
 
 import com.yammer.metrics.core.Gauge
-import kafka.api.KAFKA_0_10_0_IV0
+import kafka.api.{ApiVersion, KAFKA_0_10_0_IV0}
 import kafka.common.{LogSegmentOffsetOverflowException, LongRef, 
OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
 import kafka.message.{BrokerCompressionCodec, CompressionCodec, 
NoCompressionCodec}
 import kafka.metrics.KafkaMetricsGroup
@@ -785,11 +785,13 @@ class Log(@volatile var dir: File,
    *
    * @param records The records to append
    * @param isFromClient Whether or not this append is from a producer
+   * @param interBrokerProtocolVersion Inter-broker message protocol version
    * @throws KafkaStorageException If the append fails due to an I/O error.
    * @return Information about the appended messages including the first and 
last offset.
    */
-  def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, isFromClient: 
Boolean = true): LogAppendInfo = {
-    append(records, isFromClient, assignOffsets = true, leaderEpoch)
+  def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, isFromClient: 
Boolean = true,
+                     interBrokerProtocolVersion: ApiVersion = 
ApiVersion.latestVersion): LogAppendInfo = {
+    append(records, isFromClient, interBrokerProtocolVersion, assignOffsets = 
true, leaderEpoch)
   }
 
   /**
@@ -800,7 +802,7 @@ class Log(@volatile var dir: File,
    * @return Information about the appended messages including the first and 
last offset.
    */
   def appendAsFollower(records: MemoryRecords): LogAppendInfo = {
-    append(records, isFromClient = false, assignOffsets = false, leaderEpoch = 
-1)
+    append(records, isFromClient = false, interBrokerProtocolVersion = 
ApiVersion.latestVersion, assignOffsets = false, leaderEpoch = -1)
   }
 
   /**
@@ -811,6 +813,7 @@ class Log(@volatile var dir: File,
    *
    * @param records The log records to append
    * @param isFromClient Whether or not this append is from a producer
+   * @param interBrokerProtocolVersion Inter-broker message protocol version
    * @param assignOffsets Should the log assign offsets to this message set or 
blindly apply what it is given
    * @param leaderEpoch The partition's leader epoch which will be applied to 
messages when offsets are assigned on the leader
    * @throws KafkaStorageException If the append fails due to an I/O error.
@@ -818,7 +821,7 @@ class Log(@volatile var dir: File,
    * @throws UnexpectedAppendOffsetException If the first or last offset in 
append is less than next offset
    * @return Information about the appended messages including the first and 
last offset.
    */
-  private def append(records: MemoryRecords, isFromClient: Boolean, 
assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = {
+  private def append(records: MemoryRecords, isFromClient: Boolean, 
interBrokerProtocolVersion: ApiVersion, assignOffsets: Boolean, leaderEpoch: 
Int): LogAppendInfo = {
     maybeHandleIOException(s"Error while appending records to $topicPartition 
in dir ${dir.getParent}") {
       val appendInfo = analyzeAndValidateRecords(records, isFromClient = 
isFromClient)
 
@@ -849,7 +852,8 @@ class Log(@volatile var dir: File,
               config.messageTimestampType,
               config.messageTimestampDifferenceMaxMs,
               leaderEpoch,
-              isFromClient)
+              isFromClient,
+              interBrokerProtocolVersion)
           } catch {
             case e: IOException =>
               throw new KafkaException(s"Error validating messages while 
appending to log $name", e)
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala 
b/core/src/main/scala/kafka/log/LogValidator.scala
index 2cfbf7d..f7c7d04 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -18,10 +18,11 @@ package kafka.log
 
 import java.nio.ByteBuffer
 
+import kafka.api.{ApiVersion, KAFKA_2_1_IV0}
 import kafka.common.LongRef
-import kafka.message.{CompressionCodec, NoCompressionCodec}
+import kafka.message.{CompressionCodec, NoCompressionCodec, 
ZStdCompressionCodec}
 import kafka.utils.Logging
-import org.apache.kafka.common.errors.{InvalidTimestampException, 
UnsupportedForMessageFormatException}
+import org.apache.kafka.common.errors.{InvalidTimestampException, 
UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
 import org.apache.kafka.common.record.{AbstractRecords, CompressionType, 
InvalidRecordException, MemoryRecords, Record, RecordBatch, 
RecordConversionStats, TimestampType}
 import org.apache.kafka.common.utils.Time
 
@@ -56,7 +57,8 @@ private[kafka] object LogValidator extends Logging {
                                                       timestampType: 
TimestampType,
                                                       timestampDiffMaxMs: Long,
                                                       partitionLeaderEpoch: 
Int,
-                                                      isFromClient: Boolean): 
ValidationAndOffsetAssignResult = {
+                                                      isFromClient: Boolean,
+                                                      
interBrokerProtocolVersion: ApiVersion): ValidationAndOffsetAssignResult = {
     if (sourceCodec == NoCompressionCodec && targetCodec == 
NoCompressionCodec) {
       // check the magic value
       if (!records.hasMatchingMagic(magic))
@@ -68,7 +70,7 @@ private[kafka] object LogValidator extends Logging {
           partitionLeaderEpoch, isFromClient, magic)
     } else {
       validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, time, 
now, sourceCodec, targetCodec, compactedTopic,
-        magic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch, 
isFromClient)
+        magic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch, 
isFromClient, interBrokerProtocolVersion)
     }
   }
 
@@ -245,8 +247,8 @@ private[kafka] object LogValidator extends Logging {
                                                  timestampType: TimestampType,
                                                  timestampDiffMaxMs: Long,
                                                  partitionLeaderEpoch: Int,
-                                                 isFromClient: Boolean): 
ValidationAndOffsetAssignResult = {
-
+                                                 isFromClient: Boolean,
+                                                 interBrokerProtocolVersion: 
ApiVersion): ValidationAndOffsetAssignResult = {
       // No in place assignment situation 1 and 2
       var inPlaceAssignment = sourceCodec == targetCodec && toMagic > 
RecordBatch.MAGIC_VALUE_V0
 
@@ -265,10 +267,12 @@ private[kafka] object LogValidator extends Logging {
           inPlaceAssignment = true
 
         for (record <- batch.asScala) {
-          validateRecord(batch, record, now, timestampType, 
timestampDiffMaxMs, compactedTopic)
           if (sourceCodec != NoCompressionCodec && record.isCompressed)
             throw new InvalidRecordException("Compressed outer record should 
not have an inner record with a " +
               s"compression attribute set: $record")
+          if (targetCodec == ZStdCompressionCodec && 
interBrokerProtocolVersion < KAFKA_2_1_IV0)
+            throw new UnsupportedCompressionTypeException("Produce requests to 
inter.broker.protocol.version < 2.1 broker " + "are not allowed to use 
ZStandard compression")
+          validateRecord(batch, record, now, timestampType, 
timestampDiffMaxMs, compactedTopic)
 
           uncompressedSizeInBytes += record.sizeInBytes()
           if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > 
RecordBatch.MAGIC_VALUE_V0) {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 9edda4e..e91c0d5 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -20,11 +20,11 @@ package kafka.server
 import java.util
 import java.util.{Collections, Properties}
 
-import kafka.api.{ApiVersion, ApiVersionValidator, KAFKA_0_10_0_IV1}
+import kafka.api.{ApiVersion, ApiVersionValidator, KAFKA_0_10_0_IV1, 
KAFKA_2_1_IV0}
 import kafka.cluster.EndPoint
 import kafka.coordinator.group.OffsetConfig
 import kafka.coordinator.transaction.{TransactionLog, TransactionStateManager}
-import kafka.message.{BrokerCompressionCodec, CompressionCodec}
+import kafka.message.{BrokerCompressionCodec, CompressionCodec, 
ZStdCompressionCodec}
 import kafka.utils.CoreUtils
 import kafka.utils.Implicits._
 import org.apache.kafka.clients.CommonClientConfigs
@@ -1412,6 +1412,11 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
       s"log.message.format.version $logMessageFormatVersionString can only be 
used when inter.broker.protocol.version " +
       s"is set to version 
${ApiVersion.minSupportedFor(recordVersion).shortVersion} or higher")
 
+    if (offsetsTopicCompressionCodec == ZStdCompressionCodec)
+      require(interBrokerProtocolVersion.recordVersion.value >= 
KAFKA_2_1_IV0.recordVersion.value,
+        "offsets.topic.compression.codec zstd can only be used when 
inter.broker.protocol.version " +
+        s"is set to version ${KAFKA_2_1_IV0.shortVersion} or higher")
+
     val interBrokerUsesSasl = interBrokerSecurityProtocol == 
SecurityProtocol.SASL_PLAINTEXT || interBrokerSecurityProtocol == 
SecurityProtocol.SASL_SSL
     require(!interBrokerUsesSasl || saslInterBrokerHandshakeRequestEnable || 
saslMechanismInterBrokerProtocol == SaslConfigs.GSSAPI_MECHANISM,
       s"Only GSSAPI mechanism is supported for inter-broker communication with 
SASL when inter.broker.protocol.version is set to 
$interBrokerProtocolVersionString")
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index efdde13..f52bfe6 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -124,6 +124,7 @@ object ReplicaManager {
   val OfflinePartition: Partition = new Partition(new TopicPartition("", -1),
     isOffline = true,
     replicaLagTimeMaxMs = 0L,
+    interBrokerProtocolVersion = ApiVersion.latestVersion,
     localBrokerId = -1,
     time = null,
     replicaManager = null,
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 9bf167d..cc2a311 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -22,7 +22,7 @@ import java.util.{Optional, Properties}
 import java.util.concurrent.{CountDownLatch, Executors, TimeUnit, 
TimeoutException}
 import java.util.concurrent.atomic.AtomicBoolean
 
-import kafka.api.Request
+import kafka.api.{ApiVersion, Request}
 import kafka.common.UnexpectedAppendOffsetException
 import kafka.log.{Defaults => _, _}
 import kafka.server._
@@ -411,6 +411,7 @@ class PartitionTest {
     val partition = new Partition(topicPartition,
       isOffline = false,
       replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
+      interBrokerProtocolVersion = ApiVersion.latestVersion,
       localBrokerId = brokerId,
       time,
       replicaManager,
@@ -506,6 +507,7 @@ class PartitionTest {
     val partition = new Partition(topicPartition,
       isOffline = false,
       replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
+      interBrokerProtocolVersion = ApiVersion.latestVersion,
       localBrokerId = brokerId,
       time,
       replicaManager,
@@ -714,6 +716,7 @@ class PartitionTest {
       val partition = new Partition(tp,
         isOffline = false,
         replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
+        interBrokerProtocolVersion = ApiVersion.latestVersion,
         localBrokerId = brokerId,
         time,
         replicaManager,
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala 
b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index 2a367e0..37553b9 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -19,9 +19,10 @@ package kafka.log
 import java.nio.ByteBuffer
 import java.util.concurrent.TimeUnit
 
+import kafka.api.{ApiVersion, KAFKA_2_0_IV1}
 import kafka.common.LongRef
-import kafka.message.{CompressionCodec, DefaultCompressionCodec, 
GZIPCompressionCodec, NoCompressionCodec, SnappyCompressionCodec}
-import org.apache.kafka.common.errors.{InvalidTimestampException, 
UnsupportedForMessageFormatException}
+import kafka.message._
+import org.apache.kafka.common.errors.{InvalidTimestampException, 
UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.test.TestUtils
@@ -55,7 +56,8 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true)
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion)
     val validatedRecords = validatedResults.validatedRecords
     assertEquals("message set size should not change", 
records.records.asScala.size, validatedRecords.records.asScala.size)
     validatedRecords.batches.asScala.foreach(batch => 
validateLogAppendTime(now, 1234L, batch))
@@ -92,7 +94,8 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true)
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion)
     val validatedRecords = validatedResults.validatedRecords
 
     assertEquals("message set size should not change", 
records.records.asScala.size, validatedRecords.records.asScala.size)
@@ -133,7 +136,8 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true)
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion)
     val validatedRecords = validatedResults.validatedRecords
 
     assertEquals("message set size should not change", 
records.records.asScala.size,
@@ -190,7 +194,8 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true)
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion)
   }
 
   @Test
@@ -231,7 +236,8 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = partitionLeaderEpoch,
-      isFromClient = true)
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion)
     val validatedRecords = validatingResults.validatedRecords
 
     var i = 0
@@ -297,7 +303,8 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = partitionLeaderEpoch,
-      isFromClient = true)
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion)
     val validatedRecords = validatingResults.validatedRecords
 
     var i = 0
@@ -347,7 +354,8 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true)
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion)
     val validatedRecords = validatedResults.validatedRecords
 
     for (batch <- validatedRecords.batches.asScala) {
@@ -388,7 +396,8 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true)
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion)
     val validatedRecords = validatedResults.validatedRecords
 
     for (batch <- validatedRecords.batches.asScala) {
@@ -442,7 +451,8 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = partitionLeaderEpoch,
-      isFromClient = true)
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion)
     val validatedRecords = validatedResults.validatedRecords
 
     var i = 0
@@ -492,7 +502,8 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true)
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion)
   }
 
   @Test(expected = classOf[InvalidTimestampException])
@@ -512,7 +523,8 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true)
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion)
   }
 
   @Test(expected = classOf[InvalidTimestampException])
@@ -532,7 +544,8 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true)
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion)
   }
 
   @Test(expected = classOf[InvalidTimestampException])
@@ -552,7 +565,8 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true)
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion)
   }
 
   @Test
@@ -571,7 +585,8 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true).validatedRecords, offset)
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, 
offset)
   }
 
   @Test
@@ -590,7 +605,8 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true).validatedRecords, offset)
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, 
offset)
   }
 
   @Test
@@ -610,7 +626,8 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true).validatedRecords
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords
     checkOffsets(messageWithOffset, offset)
   }
 
@@ -631,7 +648,8 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true).validatedRecords
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords
     checkOffsets(messageWithOffset, offset)
   }
 
@@ -653,7 +671,8 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true).validatedRecords
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords
     checkOffsets(compressedMessagesWithOffset, offset)
   }
 
@@ -675,7 +694,8 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true).validatedRecords
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords
     checkOffsets(compressedMessagesWithOffset, offset)
   }
 
@@ -695,7 +715,8 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true)
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion)
     checkOffsets(validatedResults.validatedRecords, offset)
     verifyRecordConversionStats(validatedResults.recordConversionStats, 
numConvertedRecords = 3, records,
       compressed = false)
@@ -717,7 +738,8 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true)
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion)
     checkOffsets(validatedResults.validatedRecords, offset)
     verifyRecordConversionStats(validatedResults.recordConversionStats, 
numConvertedRecords = 3, records,
       compressed = false)
@@ -739,7 +761,8 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true)
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion)
     checkOffsets(validatedResults.validatedRecords, offset)
     verifyRecordConversionStats(validatedResults.recordConversionStats, 
numConvertedRecords = 3, records,
       compressed = true)
@@ -761,7 +784,8 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true)
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion)
     checkOffsets(validatedResults.validatedRecords, offset)
     verifyRecordConversionStats(validatedResults.recordConversionStats, 
numConvertedRecords = 3, records,
       compressed = true)
@@ -783,7 +807,8 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true)
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion)
   }
 
   @Test
@@ -802,7 +827,8 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = false)
+      isFromClient = false,
+      interBrokerProtocolVersion = ApiVersion.latestVersion)
     val batches = TestUtils.toList(result.validatedRecords.batches)
     assertEquals(1, batches.size)
     val batch = batches.get(0)
@@ -826,7 +852,8 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true).validatedRecords, offset)
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, 
offset)
   }
 
   @Test
@@ -846,7 +873,8 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true).validatedRecords, offset)
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, 
offset)
   }
 
   @Test
@@ -865,7 +893,8 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true).validatedRecords, offset)
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, 
offset)
   }
 
   @Test
@@ -884,7 +913,8 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true).validatedRecords, offset)
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, 
offset)
   }
 
   @Test
@@ -904,7 +934,8 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true).validatedRecords, offset)
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, 
offset)
   }
 
   @Test
@@ -924,7 +955,8 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true).validatedRecords, offset)
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, 
offset)
   }
 
   @Test(expected = classOf[UnsupportedForMessageFormatException])
@@ -946,7 +978,8 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true).validatedRecords, offset)
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, 
offset)
   }
 
   @Test(expected = classOf[UnsupportedForMessageFormatException])
@@ -968,7 +1001,8 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true).validatedRecords, offset)
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, 
offset)
   }
 
   @Test
@@ -988,7 +1022,8 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true).validatedRecords, offset)
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, 
offset)
   }
 
   @Test
@@ -1008,7 +1043,8 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true).validatedRecords, offset)
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, 
offset)
   }
 
   @Test(expected = classOf[InvalidRecordException])
@@ -1026,7 +1062,8 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true)
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion)
   }
 
   @Test(expected = classOf[InvalidRecordException])
@@ -1034,6 +1071,26 @@ class LogValidatorTest {
     testBatchWithoutRecordsNotAllowed(DefaultCompressionCodec, 
DefaultCompressionCodec)
   }
 
+  @Test(expected = classOf[UnsupportedCompressionTypeException])
+  def testZStdCompressedWithUnavailableIBPVersion(): Unit = {
+    val now = System.currentTimeMillis()
+    // The timestamps should be overwritten
+    val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, 
timestamp = 1234L, codec = CompressionType.NONE)
+    LogValidator.validateMessagesAndAssignOffsets(records,
+      offsetCounter = new LongRef(0),
+      time= time,
+      now = now,
+      sourceCodec = NoCompressionCodec,
+      targetCodec = ZStdCompressionCodec,
+      compactedTopic = false,
+      magic = RecordBatch.MAGIC_VALUE_V2,
+      timestampType = TimestampType.LOG_APPEND_TIME,
+      timestampDiffMaxMs = 1000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
+      isFromClient = true,
+      interBrokerProtocolVersion = KAFKA_2_0_IV1)
+  }
+
   @Test(expected = classOf[InvalidRecordException])
   def testUncompressedBatchWithoutRecordsNotAllowed(): Unit = {
     testBatchWithoutRecordsNotAllowed(NoCompressionCodec, NoCompressionCodec)
@@ -1065,7 +1122,8 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true)
+      isFromClient = true,
+      interBrokerProtocolVersion = ApiVersion.latestVersion)
   }
 
   private def createRecords(magicValue: Byte,
diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
index d89a9df..c39e230 100644
--- 
a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
@@ -23,7 +23,7 @@ import kafka.network.SocketServer
 import kafka.utils.TestUtils
 import org.apache.kafka.common.protocol.types.Struct
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.{ApiError, CreateTopicsRequest, 
CreateTopicsResponse, MetadataRequest, MetadataResponse}
+import org.apache.kafka.common.requests._
 import org.junit.Assert.{assertEquals, assertFalse, assertNotNull, assertTrue}
 
 import scala.collection.JavaConverters._
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index 04e0218..ec169a2 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -18,19 +18,18 @@ package kafka.server
 
 import java.util.Optional
 
-import kafka.cluster.{BrokerEndPoint, Replica}
+import kafka.cluster.{BrokerEndPoint, Partition, Replica}
 import kafka.log.LogManager
-import kafka.cluster.Partition
 import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.ClientResponse
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.protocol.Errors._
-import org.apache.kafka.common.requests.{EpochEndOffset, 
OffsetsForLeaderEpochRequest}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.EpochEndOffset._
+import org.apache.kafka.common.requests.{EpochEndOffset, 
OffsetsForLeaderEpochRequest}
 import org.apache.kafka.common.utils.SystemTime
 import org.easymock.EasyMock._
 import org.easymock.{Capture, CaptureType, IAnswer}

Reply via email to