This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push: new b1c38e6 KAFKA-7920; Do not permit zstd produce requests until IBP is updated to 2.1 (#6256) b1c38e6 is described below commit b1c38e663c5ef4540fb7abd2bcdb04e81d7e0f91 Author: Lee Dongjin <dong...@apache.org> AuthorDate: Thu Feb 21 01:44:04 2019 +0900 KAFKA-7920; Do not permit zstd produce requests until IBP is updated to 2.1 (#6256) Fail produce requests using zstd until the inter.broker.protocol.version is large enough that replicas are ensured to support it. Otherwise, followers receive the `UNSUPPORTED_COMPRESSION_TYPE` when fetching zstd data and ISRs shrink. Reviewers: Jason Gustafson <ja...@confluent.io> --- .../kafka/common/requests/ProduceRequest.java | 2 +- core/src/main/scala/kafka/cluster/Partition.scala | 8 +- core/src/main/scala/kafka/log/Log.scala | 16 ++- core/src/main/scala/kafka/log/LogValidator.scala | 18 +-- core/src/main/scala/kafka/server/KafkaConfig.scala | 9 +- .../main/scala/kafka/server/ReplicaManager.scala | 1 + .../scala/unit/kafka/cluster/PartitionTest.scala | 5 +- .../scala/unit/kafka/log/LogValidatorTest.scala | 136 +++++++++++++++------ .../server/AbstractCreateTopicsRequestTest.scala | 2 +- .../kafka/server/ReplicaFetcherThreadTest.scala | 7 +- 10 files changed, 141 insertions(+), 63 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index a961d47..ad23f3f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -402,7 +402,7 @@ public class ProduceRequest extends AbstractRequest { throw new InvalidRecordException("Produce requests with version " + version + " are only allowed to " + "contain record batches with magic version 2"); if (version < 7 && entry.compressionType() == CompressionType.ZSTD) { - throw new UnsupportedCompressionTypeException("Produce requests with version " + version + " are note allowed to " + + throw new UnsupportedCompressionTypeException("Produce requests with version " + version + " are not allowed to " + "use ZStandard compression"); } diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 31c3668..4cedd5e 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -20,7 +20,7 @@ import java.util.Optional import java.util.concurrent.locks.ReentrantReadWriteLock import com.yammer.metrics.core.Gauge -import kafka.api.{LeaderAndIsr, Request} +import kafka.api.{ApiVersion, LeaderAndIsr, Request} import kafka.common.UnexpectedAppendOffsetException import kafka.controller.KafkaController import kafka.log._ @@ -48,6 +48,7 @@ object Partition { new Partition(topicPartition, isOffline = false, replicaLagTimeMaxMs = replicaManager.config.replicaLagTimeMaxMs, + interBrokerProtocolVersion = replicaManager.config.interBrokerProtocolVersion, localBrokerId = replicaManager.config.brokerId, time = time, replicaManager = replicaManager, @@ -62,6 +63,7 @@ object Partition { class Partition(val topicPartition: TopicPartition, val isOffline: Boolean, private val replicaLagTimeMaxMs: Long, + private val interBrokerProtocolVersion: ApiVersion, private val localBrokerId: Int, private val time: Time, private val replicaManager: ReplicaManager, @@ -738,7 +740,9 @@ class Partition(val topicPartition: TopicPartition, s"is insufficient to satisfy the min.isr requirement of $minIsr for partition $topicPartition") } - val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient) + val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient, + interBrokerProtocolVersion) + // we may need to increment high watermark since ISR could be down to 1 (info, maybeIncrementLeaderHW(leaderReplica)) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 234913d..b56b26f 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -27,7 +27,7 @@ import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap, Time import java.util.regex.Pattern import com.yammer.metrics.core.Gauge -import kafka.api.KAFKA_0_10_0_IV0 +import kafka.api.{ApiVersion, KAFKA_0_10_0_IV0} import kafka.common.{LogSegmentOffsetOverflowException, LongRef, OffsetsOutOfOrderException, UnexpectedAppendOffsetException} import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec} import kafka.metrics.KafkaMetricsGroup @@ -785,11 +785,13 @@ class Log(@volatile var dir: File, * * @param records The records to append * @param isFromClient Whether or not this append is from a producer + * @param interBrokerProtocolVersion Inter-broker message protocol version * @throws KafkaStorageException If the append fails due to an I/O error. * @return Information about the appended messages including the first and last offset. */ - def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, isFromClient: Boolean = true): LogAppendInfo = { - append(records, isFromClient, assignOffsets = true, leaderEpoch) + def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, isFromClient: Boolean = true, + interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = { + append(records, isFromClient, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch) } /** @@ -800,7 +802,7 @@ class Log(@volatile var dir: File, * @return Information about the appended messages including the first and last offset. */ def appendAsFollower(records: MemoryRecords): LogAppendInfo = { - append(records, isFromClient = false, assignOffsets = false, leaderEpoch = -1) + append(records, isFromClient = false, interBrokerProtocolVersion = ApiVersion.latestVersion, assignOffsets = false, leaderEpoch = -1) } /** @@ -811,6 +813,7 @@ class Log(@volatile var dir: File, * * @param records The log records to append * @param isFromClient Whether or not this append is from a producer + * @param interBrokerProtocolVersion Inter-broker message protocol version * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given * @param leaderEpoch The partition's leader epoch which will be applied to messages when offsets are assigned on the leader * @throws KafkaStorageException If the append fails due to an I/O error. @@ -818,7 +821,7 @@ class Log(@volatile var dir: File, * @throws UnexpectedAppendOffsetException If the first or last offset in append is less than next offset * @return Information about the appended messages including the first and last offset. */ - private def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = { + private def append(records: MemoryRecords, isFromClient: Boolean, interBrokerProtocolVersion: ApiVersion, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = { maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") { val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient) @@ -849,7 +852,8 @@ class Log(@volatile var dir: File, config.messageTimestampType, config.messageTimestampDifferenceMaxMs, leaderEpoch, - isFromClient) + isFromClient, + interBrokerProtocolVersion) } catch { case e: IOException => throw new KafkaException(s"Error validating messages while appending to log $name", e) diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index 2cfbf7d..f7c7d04 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -18,10 +18,11 @@ package kafka.log import java.nio.ByteBuffer +import kafka.api.{ApiVersion, KAFKA_2_1_IV0} import kafka.common.LongRef -import kafka.message.{CompressionCodec, NoCompressionCodec} +import kafka.message.{CompressionCodec, NoCompressionCodec, ZStdCompressionCodec} import kafka.utils.Logging -import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedForMessageFormatException} +import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} import org.apache.kafka.common.record.{AbstractRecords, CompressionType, InvalidRecordException, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType} import org.apache.kafka.common.utils.Time @@ -56,7 +57,8 @@ private[kafka] object LogValidator extends Logging { timestampType: TimestampType, timestampDiffMaxMs: Long, partitionLeaderEpoch: Int, - isFromClient: Boolean): ValidationAndOffsetAssignResult = { + isFromClient: Boolean, + interBrokerProtocolVersion: ApiVersion): ValidationAndOffsetAssignResult = { if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) { // check the magic value if (!records.hasMatchingMagic(magic)) @@ -68,7 +70,7 @@ private[kafka] object LogValidator extends Logging { partitionLeaderEpoch, isFromClient, magic) } else { validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, time, now, sourceCodec, targetCodec, compactedTopic, - magic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch, isFromClient) + magic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch, isFromClient, interBrokerProtocolVersion) } } @@ -245,8 +247,8 @@ private[kafka] object LogValidator extends Logging { timestampType: TimestampType, timestampDiffMaxMs: Long, partitionLeaderEpoch: Int, - isFromClient: Boolean): ValidationAndOffsetAssignResult = { - + isFromClient: Boolean, + interBrokerProtocolVersion: ApiVersion): ValidationAndOffsetAssignResult = { // No in place assignment situation 1 and 2 var inPlaceAssignment = sourceCodec == targetCodec && toMagic > RecordBatch.MAGIC_VALUE_V0 @@ -265,10 +267,12 @@ private[kafka] object LogValidator extends Logging { inPlaceAssignment = true for (record <- batch.asScala) { - validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic) if (sourceCodec != NoCompressionCodec && record.isCompressed) throw new InvalidRecordException("Compressed outer record should not have an inner record with a " + s"compression attribute set: $record") + if (targetCodec == ZStdCompressionCodec && interBrokerProtocolVersion < KAFKA_2_1_IV0) + throw new UnsupportedCompressionTypeException("Produce requests to inter.broker.protocol.version < 2.1 broker " + "are not allowed to use ZStandard compression") + validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic) uncompressedSizeInBytes += record.sizeInBytes() if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 9edda4e..e91c0d5 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -20,11 +20,11 @@ package kafka.server import java.util import java.util.{Collections, Properties} -import kafka.api.{ApiVersion, ApiVersionValidator, KAFKA_0_10_0_IV1} +import kafka.api.{ApiVersion, ApiVersionValidator, KAFKA_0_10_0_IV1, KAFKA_2_1_IV0} import kafka.cluster.EndPoint import kafka.coordinator.group.OffsetConfig import kafka.coordinator.transaction.{TransactionLog, TransactionStateManager} -import kafka.message.{BrokerCompressionCodec, CompressionCodec} +import kafka.message.{BrokerCompressionCodec, CompressionCodec, ZStdCompressionCodec} import kafka.utils.CoreUtils import kafka.utils.Implicits._ import org.apache.kafka.clients.CommonClientConfigs @@ -1412,6 +1412,11 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO s"log.message.format.version $logMessageFormatVersionString can only be used when inter.broker.protocol.version " + s"is set to version ${ApiVersion.minSupportedFor(recordVersion).shortVersion} or higher") + if (offsetsTopicCompressionCodec == ZStdCompressionCodec) + require(interBrokerProtocolVersion.recordVersion.value >= KAFKA_2_1_IV0.recordVersion.value, + "offsets.topic.compression.codec zstd can only be used when inter.broker.protocol.version " + + s"is set to version ${KAFKA_2_1_IV0.shortVersion} or higher") + val interBrokerUsesSasl = interBrokerSecurityProtocol == SecurityProtocol.SASL_PLAINTEXT || interBrokerSecurityProtocol == SecurityProtocol.SASL_SSL require(!interBrokerUsesSasl || saslInterBrokerHandshakeRequestEnable || saslMechanismInterBrokerProtocol == SaslConfigs.GSSAPI_MECHANISM, s"Only GSSAPI mechanism is supported for inter-broker communication with SASL when inter.broker.protocol.version is set to $interBrokerProtocolVersionString") diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index efdde13..f52bfe6 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -124,6 +124,7 @@ object ReplicaManager { val OfflinePartition: Partition = new Partition(new TopicPartition("", -1), isOffline = true, replicaLagTimeMaxMs = 0L, + interBrokerProtocolVersion = ApiVersion.latestVersion, localBrokerId = -1, time = null, replicaManager = null, diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index 9bf167d..cc2a311 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -22,7 +22,7 @@ import java.util.{Optional, Properties} import java.util.concurrent.{CountDownLatch, Executors, TimeUnit, TimeoutException} import java.util.concurrent.atomic.AtomicBoolean -import kafka.api.Request +import kafka.api.{ApiVersion, Request} import kafka.common.UnexpectedAppendOffsetException import kafka.log.{Defaults => _, _} import kafka.server._ @@ -411,6 +411,7 @@ class PartitionTest { val partition = new Partition(topicPartition, isOffline = false, replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs, + interBrokerProtocolVersion = ApiVersion.latestVersion, localBrokerId = brokerId, time, replicaManager, @@ -506,6 +507,7 @@ class PartitionTest { val partition = new Partition(topicPartition, isOffline = false, replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs, + interBrokerProtocolVersion = ApiVersion.latestVersion, localBrokerId = brokerId, time, replicaManager, @@ -714,6 +716,7 @@ class PartitionTest { val partition = new Partition(tp, isOffline = false, replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs, + interBrokerProtocolVersion = ApiVersion.latestVersion, localBrokerId = brokerId, time, replicaManager, diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 2a367e0..37553b9 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -19,9 +19,10 @@ package kafka.log import java.nio.ByteBuffer import java.util.concurrent.TimeUnit +import kafka.api.{ApiVersion, KAFKA_2_0_IV1} import kafka.common.LongRef -import kafka.message.{CompressionCodec, DefaultCompressionCodec, GZIPCompressionCodec, NoCompressionCodec, SnappyCompressionCodec} -import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedForMessageFormatException} +import kafka.message._ +import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Time import org.apache.kafka.test.TestUtils @@ -55,7 +56,8 @@ class LogValidatorTest { timestampType = TimestampType.LOG_APPEND_TIME, timestampDiffMaxMs = 1000L, partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = true) + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion) val validatedRecords = validatedResults.validatedRecords assertEquals("message set size should not change", records.records.asScala.size, validatedRecords.records.asScala.size) validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, 1234L, batch)) @@ -92,7 +94,8 @@ class LogValidatorTest { timestampType = TimestampType.LOG_APPEND_TIME, timestampDiffMaxMs = 1000L, partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = true) + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion) val validatedRecords = validatedResults.validatedRecords assertEquals("message set size should not change", records.records.asScala.size, validatedRecords.records.asScala.size) @@ -133,7 +136,8 @@ class LogValidatorTest { timestampType = TimestampType.LOG_APPEND_TIME, timestampDiffMaxMs = 1000L, partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = true) + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion) val validatedRecords = validatedResults.validatedRecords assertEquals("message set size should not change", records.records.asScala.size, @@ -190,7 +194,8 @@ class LogValidatorTest { timestampType = TimestampType.LOG_APPEND_TIME, timestampDiffMaxMs = 1000L, partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = true) + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion) } @Test @@ -231,7 +236,8 @@ class LogValidatorTest { timestampType = TimestampType.CREATE_TIME, timestampDiffMaxMs = 1000L, partitionLeaderEpoch = partitionLeaderEpoch, - isFromClient = true) + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion) val validatedRecords = validatingResults.validatedRecords var i = 0 @@ -297,7 +303,8 @@ class LogValidatorTest { timestampType = TimestampType.CREATE_TIME, timestampDiffMaxMs = 1000L, partitionLeaderEpoch = partitionLeaderEpoch, - isFromClient = true) + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion) val validatedRecords = validatingResults.validatedRecords var i = 0 @@ -347,7 +354,8 @@ class LogValidatorTest { timestampType = TimestampType.CREATE_TIME, timestampDiffMaxMs = 1000L, partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = true) + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion) val validatedRecords = validatedResults.validatedRecords for (batch <- validatedRecords.batches.asScala) { @@ -388,7 +396,8 @@ class LogValidatorTest { timestampType = TimestampType.CREATE_TIME, timestampDiffMaxMs = 1000L, partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = true) + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion) val validatedRecords = validatedResults.validatedRecords for (batch <- validatedRecords.batches.asScala) { @@ -442,7 +451,8 @@ class LogValidatorTest { timestampType = TimestampType.CREATE_TIME, timestampDiffMaxMs = 1000L, partitionLeaderEpoch = partitionLeaderEpoch, - isFromClient = true) + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion) val validatedRecords = validatedResults.validatedRecords var i = 0 @@ -492,7 +502,8 @@ class LogValidatorTest { timestampType = TimestampType.CREATE_TIME, timestampDiffMaxMs = 1000L, partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = true) + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion) } @Test(expected = classOf[InvalidTimestampException]) @@ -512,7 +523,8 @@ class LogValidatorTest { timestampType = TimestampType.CREATE_TIME, timestampDiffMaxMs = 1000L, partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = true) + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion) } @Test(expected = classOf[InvalidTimestampException]) @@ -532,7 +544,8 @@ class LogValidatorTest { timestampType = TimestampType.CREATE_TIME, timestampDiffMaxMs = 1000L, partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = true) + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion) } @Test(expected = classOf[InvalidTimestampException]) @@ -552,7 +565,8 @@ class LogValidatorTest { timestampType = TimestampType.CREATE_TIME, timestampDiffMaxMs = 1000L, partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = true) + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion) } @Test @@ -571,7 +585,8 @@ class LogValidatorTest { timestampType = TimestampType.CREATE_TIME, timestampDiffMaxMs = 1000L, partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = true).validatedRecords, offset) + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, offset) } @Test @@ -590,7 +605,8 @@ class LogValidatorTest { timestampType = TimestampType.CREATE_TIME, timestampDiffMaxMs = 1000L, partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = true).validatedRecords, offset) + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, offset) } @Test @@ -610,7 +626,8 @@ class LogValidatorTest { timestampType = TimestampType.CREATE_TIME, timestampDiffMaxMs = 5000L, partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = true).validatedRecords + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords checkOffsets(messageWithOffset, offset) } @@ -631,7 +648,8 @@ class LogValidatorTest { timestampType = TimestampType.CREATE_TIME, timestampDiffMaxMs = 5000L, partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = true).validatedRecords + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords checkOffsets(messageWithOffset, offset) } @@ -653,7 +671,8 @@ class LogValidatorTest { timestampType = TimestampType.CREATE_TIME, timestampDiffMaxMs = 5000L, partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = true).validatedRecords + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords checkOffsets(compressedMessagesWithOffset, offset) } @@ -675,7 +694,8 @@ class LogValidatorTest { timestampType = TimestampType.CREATE_TIME, timestampDiffMaxMs = 5000L, partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = true).validatedRecords + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords checkOffsets(compressedMessagesWithOffset, offset) } @@ -695,7 +715,8 @@ class LogValidatorTest { timestampType = TimestampType.LOG_APPEND_TIME, timestampDiffMaxMs = 1000L, partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = true) + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion) checkOffsets(validatedResults.validatedRecords, offset) verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records, compressed = false) @@ -717,7 +738,8 @@ class LogValidatorTest { timestampType = TimestampType.LOG_APPEND_TIME, timestampDiffMaxMs = 1000L, partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = true) + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion) checkOffsets(validatedResults.validatedRecords, offset) verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records, compressed = false) @@ -739,7 +761,8 @@ class LogValidatorTest { timestampType = TimestampType.LOG_APPEND_TIME, timestampDiffMaxMs = 1000L, partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = true) + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion) checkOffsets(validatedResults.validatedRecords, offset) verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records, compressed = true) @@ -761,7 +784,8 @@ class LogValidatorTest { timestampType = TimestampType.LOG_APPEND_TIME, timestampDiffMaxMs = 1000L, partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = true) + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion) checkOffsets(validatedResults.validatedRecords, offset) verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records, compressed = true) @@ -783,7 +807,8 @@ class LogValidatorTest { timestampType = TimestampType.CREATE_TIME, timestampDiffMaxMs = 5000L, partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = true) + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion) } @Test @@ -802,7 +827,8 @@ class LogValidatorTest { timestampType = TimestampType.CREATE_TIME, timestampDiffMaxMs = 5000L, partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = false) + isFromClient = false, + interBrokerProtocolVersion = ApiVersion.latestVersion) val batches = TestUtils.toList(result.validatedRecords.batches) assertEquals(1, batches.size) val batch = batches.get(0) @@ -826,7 +852,8 @@ class LogValidatorTest { timestampType = TimestampType.CREATE_TIME, timestampDiffMaxMs = 5000L, partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = true).validatedRecords, offset) + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, offset) } @Test @@ -846,7 +873,8 @@ class LogValidatorTest { timestampType = TimestampType.CREATE_TIME, timestampDiffMaxMs = 5000L, partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = true).validatedRecords, offset) + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, offset) } @Test @@ -865,7 +893,8 @@ class LogValidatorTest { timestampType = TimestampType.LOG_APPEND_TIME, timestampDiffMaxMs = 1000L, partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = true).validatedRecords, offset) + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, offset) } @Test @@ -884,7 +913,8 @@ class LogValidatorTest { timestampType = TimestampType.LOG_APPEND_TIME, timestampDiffMaxMs = 1000L, partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = true).validatedRecords, offset) + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, offset) } @Test @@ -904,7 +934,8 @@ class LogValidatorTest { timestampType = TimestampType.CREATE_TIME, timestampDiffMaxMs = 5000L, partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = true).validatedRecords, offset) + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, offset) } @Test @@ -924,7 +955,8 @@ class LogValidatorTest { timestampType = TimestampType.CREATE_TIME, timestampDiffMaxMs = 5000L, partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = true).validatedRecords, offset) + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, offset) } @Test(expected = classOf[UnsupportedForMessageFormatException]) @@ -946,7 +978,8 @@ class LogValidatorTest { timestampType = TimestampType.CREATE_TIME, timestampDiffMaxMs = 5000L, partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = true).validatedRecords, offset) + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, offset) } @Test(expected = classOf[UnsupportedForMessageFormatException]) @@ -968,7 +1001,8 @@ class LogValidatorTest { timestampType = TimestampType.CREATE_TIME, timestampDiffMaxMs = 5000L, partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = true).validatedRecords, offset) + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, offset) } @Test @@ -988,7 +1022,8 @@ class LogValidatorTest { timestampType = TimestampType.CREATE_TIME, timestampDiffMaxMs = 5000L, partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = true).validatedRecords, offset) + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, offset) } @Test @@ -1008,7 +1043,8 @@ class LogValidatorTest { timestampType = TimestampType.CREATE_TIME, timestampDiffMaxMs = 5000L, partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = true).validatedRecords, offset) + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, offset) } @Test(expected = classOf[InvalidRecordException]) @@ -1026,7 +1062,8 @@ class LogValidatorTest { timestampType = TimestampType.CREATE_TIME, timestampDiffMaxMs = 5000L, partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = true) + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion) } @Test(expected = classOf[InvalidRecordException]) @@ -1034,6 +1071,26 @@ class LogValidatorTest { testBatchWithoutRecordsNotAllowed(DefaultCompressionCodec, DefaultCompressionCodec) } + @Test(expected = classOf[UnsupportedCompressionTypeException]) + def testZStdCompressedWithUnavailableIBPVersion(): Unit = { + val now = System.currentTimeMillis() + // The timestamps should be overwritten + val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = 1234L, codec = CompressionType.NONE) + LogValidator.validateMessagesAndAssignOffsets(records, + offsetCounter = new LongRef(0), + time= time, + now = now, + sourceCodec = NoCompressionCodec, + targetCodec = ZStdCompressionCodec, + compactedTopic = false, + magic = RecordBatch.MAGIC_VALUE_V2, + timestampType = TimestampType.LOG_APPEND_TIME, + timestampDiffMaxMs = 1000L, + partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, + isFromClient = true, + interBrokerProtocolVersion = KAFKA_2_0_IV1) + } + @Test(expected = classOf[InvalidRecordException]) def testUncompressedBatchWithoutRecordsNotAllowed(): Unit = { testBatchWithoutRecordsNotAllowed(NoCompressionCodec, NoCompressionCodec) @@ -1065,7 +1122,8 @@ class LogValidatorTest { timestampType = TimestampType.CREATE_TIME, timestampDiffMaxMs = 5000L, partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = true) + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion) } private def createRecords(magicValue: Byte, diff --git a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala index d89a9df..c39e230 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala @@ -23,7 +23,7 @@ import kafka.network.SocketServer import kafka.utils.TestUtils import org.apache.kafka.common.protocol.types.Struct import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import org.apache.kafka.common.requests.{ApiError, CreateTopicsRequest, CreateTopicsResponse, MetadataRequest, MetadataResponse} +import org.apache.kafka.common.requests._ import org.junit.Assert.{assertEquals, assertFalse, assertNotNull, assertTrue} import scala.collection.JavaConverters._ diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala index 04e0218..ec169a2 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala @@ -18,19 +18,18 @@ package kafka.server import java.util.Optional -import kafka.cluster.{BrokerEndPoint, Replica} +import kafka.cluster.{BrokerEndPoint, Partition, Replica} import kafka.log.LogManager -import kafka.cluster.Partition import kafka.server.QuotaFactory.UnboundedQuota import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend import kafka.utils.TestUtils import org.apache.kafka.clients.ClientResponse import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.protocol.Errors._ -import org.apache.kafka.common.requests.{EpochEndOffset, OffsetsForLeaderEpochRequest} +import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.EpochEndOffset._ +import org.apache.kafka.common.requests.{EpochEndOffset, OffsetsForLeaderEpochRequest} import org.apache.kafka.common.utils.SystemTime import org.easymock.EasyMock._ import org.easymock.{Capture, CaptureType, IAnswer}