This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new cc26fcc KAFKA-9669; Loosen validation of inner offsets for older
message formats (#8647)
cc26fcc is described below
commit cc26fccea330087441fe276f1d2047f067e49e46
Author: Jason Gustafson <[email protected]>
AuthorDate: Tue May 12 12:06:08 2020 -0700
KAFKA-9669; Loosen validation of inner offsets for older message formats
(#8647)
Prior to KAFKA-8106, we allowed the v0 and v1 message formats to contain
non-consecutive inner offsets. Inside `LogValidator`, we would detect this case
and rewrite the batch. After KAFKA-8106, we changed the logic to raise an error
in the case of the v1 message format (v0 was still expected to be rewritten).
This caused an incompatibility for older clients which were depending on the
looser validation. This patch reverts the old logic of rewriting the batch to
fix the invalid inner offsets.
Note that the v2 message format has always had stricter validation. This
patch also adds a test case for this.
Reviewers: José Armando García Sancio <[email protected]>,
Ismael Juma <[email protected]>
---
.../kafka/common/record/MemoryRecordsBuilder.java | 30 +++++++++++++
core/src/main/scala/kafka/log/LogValidator.scala | 14 +++----
.../scala/unit/kafka/log/LogValidatorTest.scala | 49 ++++++++++++++++++++--
3 files changed, 83 insertions(+), 10 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 054fb86..fd5a680 100644
---
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.Utils;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -588,6 +589,35 @@ public class MemoryRecordsBuilder implements AutoCloseable
{
}
/**
+ * Append a record without doing offset/magic validation (this should only
be used in testing).
+ *
+ * @param offset The offset of the record
+ * @param record The record to add
+ */
+ public void appendUncheckedWithOffset(long offset, SimpleRecord record)
throws IOException {
+ if (magic >= RecordBatch.MAGIC_VALUE_V2) {
+ int offsetDelta = (int) (offset - baseOffset);
+ long timestamp = record.timestamp();
+ if (firstTimestamp == null)
+ firstTimestamp = timestamp;
+
+ int sizeInBytes = DefaultRecord.writeTo(appendStream,
+ offsetDelta,
+ timestamp - firstTimestamp,
+ record.key(),
+ record.value(),
+ record.headers());
+ recordWritten(offset, timestamp, sizeInBytes);
+ } else {
+ LegacyRecord legacyRecord = LegacyRecord.create(magic,
+ record.timestamp(),
+ Utils.toNullableArray(record.key()),
+ Utils.toNullableArray(record.value()));
+ appendUncheckedWithOffset(offset, legacyRecord);
+ }
+ }
+
+ /**
* Append a record at the next sequential offset.
* @param record the record to add
*/
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala
b/core/src/main/scala/kafka/log/LogValidator.scala
index b72ab1f..b31d45a 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -394,14 +394,14 @@ private[log] object LogValidator extends Logging {
uncompressedSizeInBytes += record.sizeInBytes()
if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic >
RecordBatch.MAGIC_VALUE_V0) {
- // inner records offset should always be continuous
+ // Some older clients do not implement the V1 internal offsets
correctly.
+ // Historically the broker handled this by rewriting the batches
rather
+ // than rejecting the request. We must continue this handling here
to avoid
+ // breaking these clients.
val expectedOffset = expectedInnerOffset.getAndIncrement()
- if (record.offset != expectedOffset) {
-
brokerTopicStats.allTopicsStats.invalidOffsetOrSequenceRecordsPerSec.mark()
- throw new RecordValidationException(
- new InvalidRecordException(s"Inner record $record inside the
compressed record batch does not have incremental offsets, expected offset is
$expectedOffset in topic partition $topicPartition."),
- List(new RecordError(batchIndex)))
- }
+ if (record.offset != expectedOffset)
+ inPlaceAssignment = false
+
if (record.timestamp > maxTimestamp)
maxTimestamp = record.timestamp
}
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index 0515e79..ba56f14 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -22,14 +22,14 @@ import java.util.concurrent.TimeUnit
import com.yammer.metrics.Metrics
import kafka.api.{ApiVersion, KAFKA_2_0_IV1, KAFKA_2_3_IV1}
import kafka.common.{LongRef, RecordValidationException}
+import kafka.log.LogValidator.ValidationAndOffsetAssignResult
import kafka.message._
import kafka.server.BrokerTopicStats
import kafka.utils.TestUtils.meterCount
-import org.apache.kafka.common.InvalidRecordException
-import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{InvalidTimestampException,
UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.{InvalidRecordException, TopicPartition}
import org.apache.kafka.test.TestUtils
import org.junit.Assert._
import org.junit.Test
@@ -65,6 +65,29 @@ class LogValidatorTest {
}
@Test
+ def testValidationOfBatchesWithNonSequentialInnerOffsets(): Unit = {
+ def testMessageValidation(magicValue: Byte): Unit = {
+ val numRecords = 20
+ val invalidRecords = recordsWithNonSequentialInnerOffsets(magicValue,
CompressionType.GZIP, numRecords)
+
+ // Validation for v2 and above is strict for this case. For older
formats, we fix invalid
+ // internal offsets by rewriting the batch.
+ if (magicValue >= RecordBatch.MAGIC_VALUE_V2) {
+ assertThrows[InvalidRecordException] {
+ validateMessages(invalidRecords, magicValue, CompressionType.GZIP,
CompressionType.GZIP)
+ }
+ } else {
+ val result = validateMessages(invalidRecords, magicValue,
CompressionType.GZIP, CompressionType.GZIP)
+ assertEquals(0 until numRecords,
result.validatedRecords.records.asScala.map(_.offset))
+ }
+ }
+
+ for (version <- RecordVersion.values) {
+ testMessageValidation(version.value)
+ }
+ }
+
+ @Test
def testMisMatchMagic(): Unit = {
checkMismatchMagic(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1,
CompressionType.GZIP)
checkMismatchMagic(RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V0,
CompressionType.GZIP)
@@ -88,7 +111,10 @@ class LogValidatorTest {
assertTrue(meterCount(s"${BrokerTopicStats.InvalidMagicNumberRecordsPerSec}") >
0)
}
- private def validateMessages(records: MemoryRecords, magic: Byte,
sourceCompressionType: CompressionType, targetCompressionType:
CompressionType): Unit = {
+ private def validateMessages(records: MemoryRecords,
+ magic: Byte,
+ sourceCompressionType: CompressionType,
+ targetCompressionType: CompressionType):
ValidationAndOffsetAssignResult = {
LogValidator.validateMessagesAndAssignOffsets(records,
topicPartition,
new LongRef(0L),
@@ -1371,6 +1397,23 @@ class LogValidatorTest {
}
}
+ private def recordsWithNonSequentialInnerOffsets(magicValue: Byte,
+ codec: CompressionType,
+ numRecords: Int):
MemoryRecords = {
+ val records = (0 until numRecords).map { id =>
+ new SimpleRecord(id.toString.getBytes)
+ }
+
+ val buffer = ByteBuffer.allocate(1024)
+ val builder = MemoryRecords.builder(buffer, magicValue, codec,
TimestampType.CREATE_TIME, 0L)
+
+ records.foreach { record =>
+ builder.appendUncheckedWithOffset(0, record)
+ }
+
+ builder.build()
+ }
+
private def recordsWithInvalidInnerMagic(batchMagicValue: Byte,
recordMagicValue: Byte,
codec: CompressionType):
MemoryRecords = {