[ 
https://issues.apache.org/jira/browse/KAFKA-6554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16369845#comment-16369845
 ] 

ASF GitHub Bot commented on KAFKA-6554:
---------------------------------------

rajinisivaram closed pull request #4585: KAFKA-6554; Missing lastOffsetDelta 
validation before log append
URL: https://github.com/apache/kafka/pull/4585
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java 
b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index ff8d3b990ba..71e668e45da 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -106,7 +106,7 @@
     static final int CRC_LENGTH = 4;
     static final int ATTRIBUTES_OFFSET = CRC_OFFSET + CRC_LENGTH;
     static final int ATTRIBUTE_LENGTH = 2;
-    static final int LAST_OFFSET_DELTA_OFFSET = ATTRIBUTES_OFFSET + 
ATTRIBUTE_LENGTH;
+    public static final int LAST_OFFSET_DELTA_OFFSET = ATTRIBUTES_OFFSET + 
ATTRIBUTE_LENGTH;
     static final int LAST_OFFSET_DELTA_LENGTH = 4;
     static final int FIRST_TIMESTAMP_OFFSET = LAST_OFFSET_DELTA_OFFSET + 
LAST_OFFSET_DELTA_LENGTH;
     static final int FIRST_TIMESTAMP_LENGTH = 8;
@@ -118,7 +118,7 @@
     static final int PRODUCER_EPOCH_LENGTH = 2;
     static final int BASE_SEQUENCE_OFFSET = PRODUCER_EPOCH_OFFSET + 
PRODUCER_EPOCH_LENGTH;
     static final int BASE_SEQUENCE_LENGTH = 4;
-    static final int RECORDS_COUNT_OFFSET = BASE_SEQUENCE_OFFSET + 
BASE_SEQUENCE_LENGTH;
+    public static final int RECORDS_COUNT_OFFSET = BASE_SEQUENCE_OFFSET + 
BASE_SEQUENCE_LENGTH;
     static final int RECORDS_COUNT_LENGTH = 4;
     static final int RECORDS_OFFSET = RECORDS_COUNT_OFFSET + 
RECORDS_COUNT_LENGTH;
     public static final int RECORD_BATCH_OVERHEAD = RECORDS_OFFSET;
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala 
b/core/src/main/scala/kafka/log/LogValidator.scala
index 15750e9cd06..1beb2bdda37 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -74,15 +74,27 @@ private[kafka] object LogValidator extends Logging {
 
   private def validateBatch(batch: RecordBatch, isFromClient: Boolean, 
toMagic: Byte): Unit = {
     if (isFromClient) {
+      if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
+        val countFromOffsets = batch.lastOffset - batch.baseOffset + 1
+        if (countFromOffsets <= 0)
+          throw new InvalidRecordException(s"Batch has an invalid offset 
range: [${batch.baseOffset}, ${batch.lastOffset}]")
+
+        // v2 and above messages always have a non-null count
+        val count = batch.countOrNull
+        if (count <= 0)
+          throw new InvalidRecordException(s"Invalid reported count for record 
batch: $count")
+
+        if (countFromOffsets != batch.countOrNull)
+          throw new InvalidRecordException(s"Inconsistent batch offset range 
[${batch.baseOffset}, ${batch.lastOffset}] " +
+            s"and count of records $count")
+      }
+
       if (batch.hasProducerId && batch.baseSequence < 0)
         throw new InvalidRecordException(s"Invalid sequence number 
${batch.baseSequence} in record batch " +
           s"with producerId ${batch.producerId}")
 
       if (batch.isControlBatch)
         throw new InvalidRecordException("Clients are not allowed to write 
control records")
-
-      if (Option(batch.countOrNull).contains(0))
-        throw new InvalidRecordException("Record batches must contain at least 
one record")
     }
 
     if (batch.isTransactional && toMagic < RecordBatch.MAGIC_VALUE_V2)
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala 
b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index 131152af430..04e89528b12 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -26,6 +26,7 @@ import org.apache.kafka.common.utils.Time
 import org.apache.kafka.test.TestUtils
 import org.junit.Assert._
 import org.junit.Test
+import org.scalatest.Assertions.intercept
 
 import scala.collection.JavaConverters._
 
@@ -147,6 +148,50 @@ class LogValidatorTest {
       compressed = true)
   }
 
+  @Test
+  def testInvalidOffsetRangeAndRecordCount(): Unit = {
+    // The batch to be written contains 3 records, so the correct 
lastOffsetDelta is 2
+    validateRecordBatchWithCountOverrides(lastOffsetDelta = 2, count = 3)
+
+    // Count and offset range are inconsistent or invalid
+    assertInvalidBatchCountOverrides(lastOffsetDelta = 0, count = 3)
+    assertInvalidBatchCountOverrides(lastOffsetDelta = 15, count = 3)
+    assertInvalidBatchCountOverrides(lastOffsetDelta = -3, count = 3)
+    assertInvalidBatchCountOverrides(lastOffsetDelta = 2, count = -3)
+    assertInvalidBatchCountOverrides(lastOffsetDelta = 2, count = 6)
+    assertInvalidBatchCountOverrides(lastOffsetDelta = 2, count = 0)
+    assertInvalidBatchCountOverrides(lastOffsetDelta = -3, count = -2)
+
+    // Count and offset range are consistent, but do not match the actual 
number of records
+    assertInvalidBatchCountOverrides(lastOffsetDelta = 5, count = 6)
+    assertInvalidBatchCountOverrides(lastOffsetDelta = 1, count = 2)
+  }
+
+  private def assertInvalidBatchCountOverrides(lastOffsetDelta: Int, count: 
Int): Unit = {
+    intercept[InvalidRecordException] {
+      validateRecordBatchWithCountOverrides(lastOffsetDelta, count)
+    }
+  }
+
+  private def validateRecordBatchWithCountOverrides(lastOffsetDelta: Int, 
count: Int) {
+    val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, 
timestamp = 1234L, codec = CompressionType.NONE)
+    records.buffer.putInt(DefaultRecordBatch.RECORDS_COUNT_OFFSET, count)
+    records.buffer.putInt(DefaultRecordBatch.LAST_OFFSET_DELTA_OFFSET, 
lastOffsetDelta)
+    LogValidator.validateMessagesAndAssignOffsets(
+      records,
+      offsetCounter = new LongRef(0),
+      time = time,
+      now = time.milliseconds(),
+      sourceCodec = DefaultCompressionCodec,
+      targetCodec = DefaultCompressionCodec,
+      compactedTopic = false,
+      magic = RecordBatch.MAGIC_VALUE_V2,
+      timestampType = TimestampType.LOG_APPEND_TIME,
+      timestampDiffMaxMs = 1000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
+      isFromClient = true)
+  }
+
   @Test
   def testLogAppendTimeWithoutRecompressionV2() {
     checkLogAppendTimeWithoutRecompression(RecordBatch.MAGIC_VALUE_V2)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Broker doesn't reject Produce request with inconsistent state
> -------------------------------------------------------------
>
>                 Key: KAFKA-6554
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6554
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 1.0.0
>            Reporter: Simon Fell
>            Assignee: Jason Gustafson
>            Priority: Minor
>         Attachments: produce_v3.txt
>
>
> Produce messages of type v3 have offset deltas in each record along with a 
> LastOffsetDelta for the topic/partition set. In investigating an issue with 
> missing offsets, I found a bug in a producer library where it would send 
> multiple records, but leave LastOffsetDelta at 0. This causes various 
> problems including holes in the offsets fetched by the consumer. 
> As lastOffsetDelta can be computed by looking at the records, it seems like 
> the broker should at least validate the LastOffsetDelta field against the 
> contained records to stop this bad data getting in.
> I've attached a decode v3 produce message that was causing the problems, and 
> was accepted by the broker.
> Here's a link to the issue in the kafka library we were using which has more 
> context if you need it.
> https://github.com/Shopify/sarama/issues/1032
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to