[ https://issues.apache.org/jira/browse/KAFKA-6554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16369845#comment-16369845 ]
ASF GitHub Bot commented on KAFKA-6554: --------------------------------------- rajinisivaram closed pull request #4585: KAFKA-6554; Missing lastOffsetDelta validation before log append URL: https://github.com/apache/kafka/pull/4585 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index ff8d3b990ba..71e668e45da 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -106,7 +106,7 @@ static final int CRC_LENGTH = 4; static final int ATTRIBUTES_OFFSET = CRC_OFFSET + CRC_LENGTH; static final int ATTRIBUTE_LENGTH = 2; - static final int LAST_OFFSET_DELTA_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH; + public static final int LAST_OFFSET_DELTA_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH; static final int LAST_OFFSET_DELTA_LENGTH = 4; static final int FIRST_TIMESTAMP_OFFSET = LAST_OFFSET_DELTA_OFFSET + LAST_OFFSET_DELTA_LENGTH; static final int FIRST_TIMESTAMP_LENGTH = 8; @@ -118,7 +118,7 @@ static final int PRODUCER_EPOCH_LENGTH = 2; static final int BASE_SEQUENCE_OFFSET = PRODUCER_EPOCH_OFFSET + PRODUCER_EPOCH_LENGTH; static final int BASE_SEQUENCE_LENGTH = 4; - static final int RECORDS_COUNT_OFFSET = BASE_SEQUENCE_OFFSET + BASE_SEQUENCE_LENGTH; + public static final int RECORDS_COUNT_OFFSET = BASE_SEQUENCE_OFFSET + BASE_SEQUENCE_LENGTH; static final int RECORDS_COUNT_LENGTH = 4; static final int RECORDS_OFFSET = RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH; public static final int RECORD_BATCH_OVERHEAD = RECORDS_OFFSET; diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index 15750e9cd06..1beb2bdda37 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -74,15 +74,27 @@ private[kafka] object LogValidator extends Logging { private def validateBatch(batch: RecordBatch, isFromClient: Boolean, toMagic: Byte): Unit = { if (isFromClient) { + if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) { + val countFromOffsets = batch.lastOffset - batch.baseOffset + 1 + if (countFromOffsets <= 0) + throw new InvalidRecordException(s"Batch has an invalid offset range: [${batch.baseOffset}, ${batch.lastOffset}]") + + // v2 and above messages always have a non-null count + val count = batch.countOrNull + if (count <= 0) + throw new InvalidRecordException(s"Invalid reported count for record batch: $count") + + if (countFromOffsets != batch.countOrNull) + throw new InvalidRecordException(s"Inconsistent batch offset range [${batch.baseOffset}, ${batch.lastOffset}] " + + s"and count of records $count") + } + if (batch.hasProducerId && batch.baseSequence < 0) throw new InvalidRecordException(s"Invalid sequence number ${batch.baseSequence} in record batch " + s"with producerId ${batch.producerId}") if (batch.isControlBatch) throw new InvalidRecordException("Clients are not allowed to write control records") - - if (Option(batch.countOrNull).contains(0)) - throw new InvalidRecordException("Record batches must contain at least one record") } if (batch.isTransactional && toMagic < RecordBatch.MAGIC_VALUE_V2) diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 131152af430..04e89528b12 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -26,6 +26,7 @@ import org.apache.kafka.common.utils.Time import org.apache.kafka.test.TestUtils import org.junit.Assert._ import org.junit.Test +import org.scalatest.Assertions.intercept import scala.collection.JavaConverters._ @@ -147,6 +148,50 @@ class LogValidatorTest { compressed = true) } + @Test + def testInvalidOffsetRangeAndRecordCount(): Unit = { + // The batch to be written contains 3 records, so the correct lastOffsetDelta is 2 + validateRecordBatchWithCountOverrides(lastOffsetDelta = 2, count = 3) + + // Count and offset range are inconsistent or invalid + assertInvalidBatchCountOverrides(lastOffsetDelta = 0, count = 3) + assertInvalidBatchCountOverrides(lastOffsetDelta = 15, count = 3) + assertInvalidBatchCountOverrides(lastOffsetDelta = -3, count = 3) + assertInvalidBatchCountOverrides(lastOffsetDelta = 2, count = -3) + assertInvalidBatchCountOverrides(lastOffsetDelta = 2, count = 6) + assertInvalidBatchCountOverrides(lastOffsetDelta = 2, count = 0) + assertInvalidBatchCountOverrides(lastOffsetDelta = -3, count = -2) + + // Count and offset range are consistent, but do not match the actual number of records + assertInvalidBatchCountOverrides(lastOffsetDelta = 5, count = 6) + assertInvalidBatchCountOverrides(lastOffsetDelta = 1, count = 2) + } + + private def assertInvalidBatchCountOverrides(lastOffsetDelta: Int, count: Int): Unit = { + intercept[InvalidRecordException] { + validateRecordBatchWithCountOverrides(lastOffsetDelta, count) + } + } + + private def validateRecordBatchWithCountOverrides(lastOffsetDelta: Int, count: Int) { + val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = 1234L, codec = CompressionType.NONE) + records.buffer.putInt(DefaultRecordBatch.RECORDS_COUNT_OFFSET, count) + records.buffer.putInt(DefaultRecordBatch.LAST_OFFSET_DELTA_OFFSET, lastOffsetDelta) + LogValidator.validateMessagesAndAssignOffsets( + records, + offsetCounter = new LongRef(0), + time = time, + now = time.milliseconds(), + sourceCodec = DefaultCompressionCodec, + targetCodec = DefaultCompressionCodec, + compactedTopic = false, + magic = RecordBatch.MAGIC_VALUE_V2, + timestampType = TimestampType.LOG_APPEND_TIME, + timestampDiffMaxMs = 1000L, + partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, + isFromClient = true) + } + @Test def testLogAppendTimeWithoutRecompressionV2() { checkLogAppendTimeWithoutRecompression(RecordBatch.MAGIC_VALUE_V2) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Broker doesn't reject Produce request with inconsistent state > ------------------------------------------------------------- > > Key: KAFKA-6554 > URL: https://issues.apache.org/jira/browse/KAFKA-6554 > Project: Kafka > Issue Type: Bug > Components: producer > Affects Versions: 1.0.0 > Reporter: Simon Fell > Assignee: Jason Gustafson > Priority: Minor > Attachments: produce_v3.txt > > > Produce messages of type v3 have offset deltas in each record along with a > LastOffsetDelta for the topic/partition set. In investigating an issue with > missing offsets, I found a bug in a producer library where it would send > multiple records, but leave LastOffsetDelta at 0. This causes various > problems including holes in the offsets fetched by the consumer. > As lastOffsetDelta can be computed by looking at the records, it seems like > the broker should at least validate the LastOffsetDelta field against the > contained records to stop this bad data getting in. > I've attached a decode v3 produce message that was causing the problems, and > was accepted by the broker. > Here's a link to the issue in the kafka library we were using which has more > context if you need it. > https://github.com/Shopify/sarama/issues/1032 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)