[jira] [Commented] (KAFKA-6834) log cleaner should handle the case when the size of a message set is larger than the max message size
[ https://issues.apache.org/jira/browse/KAFKA-6834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035922#comment-17035922 ] zhangzhisheng commented on KAFKA-6834: -- kafka2.12_0.11.0.3 log clean tread stoped,throw exception info e.g {code:java} // code placeholder java.lang.IllegalStateException: This log contains a message larger than maximum allowable size of 100. {code} > log cleaner should handle the case when the size of a message set is larger > than the max message size > - > > Key: KAFKA-6834 > URL: https://issues.apache.org/jira/browse/KAFKA-6834 > Project: Kafka > Issue Type: Bug >Reporter: Jun Rao >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.0.0 > > > In KAFKA-5316, we added the logic to allow a message (set) larger than the > per topic message size to be written to the log during log cleaning. However, > the buffer size in the log cleaner is still bounded by the per topic message > size. This can cause the log cleaner to die and cause the broker to run out > of disk space. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-6834) log cleaner should handle the case when the size of a message set is larger than the max message size
[ https://issues.apache.org/jira/browse/KAFKA-6834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16468693#comment-16468693 ] ASF GitHub Bot commented on KAFKA-6834: --- rajinisivaram closed pull request #4953: KAFKA-6834: Handle compaction with batches bigger than max.message.bytes URL: https://github.com/apache/kafka/pull/4953 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java index 22f417f8dda..7f91f266158 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import static org.apache.kafka.common.record.Records.HEADER_SIZE_UP_TO_MAGIC; import static org.apache.kafka.common.record.Records.LOG_OVERHEAD; import static org.apache.kafka.common.record.Records.MAGIC_OFFSET; import static org.apache.kafka.common.record.Records.SIZE_OFFSET; @@ -40,9 +41,33 @@ public MutableRecordBatch nextBatch() throws IOException { int remaining = buffer.remaining(); -if (remaining < LOG_OVERHEAD) + +Integer batchSize = nextBatchSize(); +if (batchSize == null || remaining < batchSize) return null; +byte magic = buffer.get(buffer.position() + MAGIC_OFFSET); + +ByteBuffer batchSlice = buffer.slice(); +batchSlice.limit(batchSize); +buffer.position(buffer.position() + batchSize); + +if (magic > RecordBatch.MAGIC_VALUE_V1) +return new DefaultRecordBatch(batchSlice); +else +return new AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchSlice); +} + +/** + * Validates the header of the next batch and returns batch size. + * @return next batch size including LOG_OVERHEAD if buffer contains header up to + * magic byte, null otherwise + * @throws CorruptRecordException if record size or magic is invalid + */ +Integer nextBatchSize() throws CorruptRecordException { +int remaining = buffer.remaining(); +if (remaining < LOG_OVERHEAD) +return null; int recordSize = buffer.getInt(buffer.position() + SIZE_OFFSET); // V0 has the smallest overhead, stricter checking is done later if (recordSize < LegacyRecord.RECORD_OVERHEAD_V0) @@ -52,23 +77,13 @@ public MutableRecordBatch nextBatch() throws IOException { throw new CorruptRecordException(String.format("Record size %d exceeds the largest allowable message size (%d).", recordSize, maxMessageSize)); -int batchSize = recordSize + LOG_OVERHEAD; -if (remaining < batchSize) +if (remaining < HEADER_SIZE_UP_TO_MAGIC) return null; byte magic = buffer.get(buffer.position() + MAGIC_OFFSET); - -ByteBuffer batchSlice = buffer.slice(); -batchSlice.limit(batchSize); -buffer.position(buffer.position() + batchSize); - if (magic < 0 || magic > RecordBatch.CURRENT_MAGIC_VALUE) throw new CorruptRecordException("Invalid magic found in record: " + magic); -if (magic > RecordBatch.MAGIC_VALUE_V1) -return new DefaultRecordBatch(batchSlice); -else -return new AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchSlice); +return recordSize + LOG_OVERHEAD; } - } diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index ea6aa4ce3a9..eb4e31b6e58 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.CorruptRecordException; import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention; import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.kafka.common.utils.CloseableIterator; @@ -117,6 +118,18 @@ public int validBytes() { return downConvert(batches(), toMagic, firstOffset, time); } +/** + * Validates the header of the first batch and returns batch size. + * @return first batch size including LOG_OVERHEAD if buffer contains header up to + * magic byte, null otherwise + *
[jira] [Commented] (KAFKA-6834) log cleaner should handle the case when the size of a message set is larger than the max message size
[ https://issues.apache.org/jira/browse/KAFKA-6834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16460689#comment-16460689 ] ASF GitHub Bot commented on KAFKA-6834: --- rajinisivaram opened a new pull request #4953: KAFKA-6834: Handle compaction with batches bigger than max.message.bytes URL: https://github.com/apache/kafka/pull/4953 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > log cleaner should handle the case when the size of a message set is larger > than the max message size > - > > Key: KAFKA-6834 > URL: https://issues.apache.org/jira/browse/KAFKA-6834 > Project: Kafka > Issue Type: Bug >Reporter: Jun Rao >Assignee: Rajini Sivaram >Priority: Major > > In KAFKA-5316, we added the logic to allow a message (set) larger than the > per topic message size to be written to the log during log cleaning. However, > the buffer size in the log cleaner is still bounded by the per topic message > size. This can cause the log cleaner to die and cause the broker to run out > of disk space. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6834) log cleaner should handle the case when the size of a message set is larger than the max message size
[ https://issues.apache.org/jira/browse/KAFKA-6834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458810#comment-16458810 ] Colin P. McCabe commented on KAFKA-6834: Good catch! Maybe we should validate the CRC of the overlarge message batch before enlarging the buffer, just to make sure we're not allocating memory based on corrupt data. > log cleaner should handle the case when the size of a message set is larger > than the max message size > - > > Key: KAFKA-6834 > URL: https://issues.apache.org/jira/browse/KAFKA-6834 > Project: Kafka > Issue Type: Bug >Reporter: Jun Rao >Assignee: Rajini Sivaram >Priority: Major > > In KAFKA-5316, we added the logic to allow a message (set) larger than the > per topic message size to be written to the log during log cleaning. However, > the buffer size in the log cleaner is still bounded by the per topic message > size. This can cause the log cleaner to die and cause the broker to run out > of disk space. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6834) log cleaner should handle the case when the size of a message set is larger than the max message size
[ https://issues.apache.org/jira/browse/KAFKA-6834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16457130#comment-16457130 ] Jun Rao commented on KAFKA-6834: To fix this, we need to handle the cleaner buffer to grow up to the size of a single message set in the log. > log cleaner should handle the case when the size of a message set is larger > than the max message size > - > > Key: KAFKA-6834 > URL: https://issues.apache.org/jira/browse/KAFKA-6834 > Project: Kafka > Issue Type: Bug >Reporter: Jun Rao >Priority: Major > > In KAFKA-5316, we added the logic to allow a message (set) larger than the > per topic message size to be written to the log during log cleaning. However, > the buffer size in the log cleaner is still bounded by the per topic message > size. This can cause the log cleaner to die and cause the broker to run out > of disk space. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)