[jira] [Commented] (KAFKA-6834) log cleaner should handle the case when the size of a message set is larger than the max message size

2020-02-12 Thread zhangzhisheng (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035922#comment-17035922
 ] 

zhangzhisheng commented on KAFKA-6834:
--

kafka2.12_0.11.0.3 log clean tread stoped,throw exception info e.g
{code:java}
// code placeholder
java.lang.IllegalStateException: This log contains a message larger than 
maximum allowable size of 100.
{code}
 

> log cleaner should handle the case when the size of a message set is larger 
> than the max message size
> -
>
> Key: KAFKA-6834
> URL: https://issues.apache.org/jira/browse/KAFKA-6834
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jun Rao
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.0.0
>
>
> In KAFKA-5316, we added the logic to allow a message (set) larger than the 
> per topic message size to be written to the log during log cleaning. However, 
> the buffer size in the log cleaner is still bounded by the per topic message 
> size. This can cause the log cleaner to die and cause the broker to run out 
> of disk space.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-6834) log cleaner should handle the case when the size of a message set is larger than the max message size

2018-05-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16468693#comment-16468693
 ] 

ASF GitHub Bot commented on KAFKA-6834:
---

rajinisivaram closed pull request #4953: KAFKA-6834: Handle compaction with 
batches bigger than max.message.bytes
URL: https://github.com/apache/kafka/pull/4953
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
 
b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
index 22f417f8dda..7f91f266158 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import static org.apache.kafka.common.record.Records.HEADER_SIZE_UP_TO_MAGIC;
 import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
 import static org.apache.kafka.common.record.Records.MAGIC_OFFSET;
 import static org.apache.kafka.common.record.Records.SIZE_OFFSET;
@@ -40,9 +41,33 @@
 
 public MutableRecordBatch nextBatch() throws IOException {
 int remaining = buffer.remaining();
-if (remaining < LOG_OVERHEAD)
+
+Integer batchSize = nextBatchSize();
+if (batchSize == null || remaining < batchSize)
 return null;
 
+byte magic = buffer.get(buffer.position() + MAGIC_OFFSET);
+
+ByteBuffer batchSlice = buffer.slice();
+batchSlice.limit(batchSize);
+buffer.position(buffer.position() + batchSize);
+
+if (magic > RecordBatch.MAGIC_VALUE_V1)
+return new DefaultRecordBatch(batchSlice);
+else
+return new 
AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchSlice);
+}
+
+/**
+ * Validates the header of the next batch and returns batch size.
+ * @return next batch size including LOG_OVERHEAD if buffer contains 
header up to
+ * magic byte, null otherwise
+ * @throws CorruptRecordException if record size or magic is invalid
+ */
+Integer nextBatchSize() throws CorruptRecordException {
+int remaining = buffer.remaining();
+if (remaining < LOG_OVERHEAD)
+return null;
 int recordSize = buffer.getInt(buffer.position() + SIZE_OFFSET);
 // V0 has the smallest overhead, stricter checking is done later
 if (recordSize < LegacyRecord.RECORD_OVERHEAD_V0)
@@ -52,23 +77,13 @@ public MutableRecordBatch nextBatch() throws IOException {
 throw new CorruptRecordException(String.format("Record size %d 
exceeds the largest allowable message size (%d).",
 recordSize, maxMessageSize));
 
-int batchSize = recordSize + LOG_OVERHEAD;
-if (remaining < batchSize)
+if (remaining < HEADER_SIZE_UP_TO_MAGIC)
 return null;
 
 byte magic = buffer.get(buffer.position() + MAGIC_OFFSET);
-
-ByteBuffer batchSlice = buffer.slice();
-batchSlice.limit(batchSize);
-buffer.position(buffer.position() + batchSize);
-
 if (magic < 0 || magic > RecordBatch.CURRENT_MAGIC_VALUE)
 throw new CorruptRecordException("Invalid magic found in record: " 
+ magic);
 
-if (magic > RecordBatch.MAGIC_VALUE_V1)
-return new DefaultRecordBatch(batchSlice);
-else
-return new 
AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchSlice);
+return recordSize + LOG_OVERHEAD;
 }
-
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index ea6aa4ce3a9..eb4e31b6e58 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -18,6 +18,7 @@
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
 import 
org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
 import org.apache.kafka.common.utils.CloseableIterator;
@@ -117,6 +118,18 @@ public int validBytes() {
 return downConvert(batches(), toMagic, firstOffset, time);
 }
 
+/**
+ * Validates the header of the first batch and returns batch size.
+ * @return first batch size including LOG_OVERHEAD if buffer contains 
header up to
+ * magic byte, null otherwise
+ * 

[jira] [Commented] (KAFKA-6834) log cleaner should handle the case when the size of a message set is larger than the max message size

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16460689#comment-16460689
 ] 

ASF GitHub Bot commented on KAFKA-6834:
---

rajinisivaram opened a new pull request #4953: KAFKA-6834: Handle compaction 
with batches bigger than max.message.bytes
URL: https://github.com/apache/kafka/pull/4953
 
 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> log cleaner should handle the case when the size of a message set is larger 
> than the max message size
> -
>
> Key: KAFKA-6834
> URL: https://issues.apache.org/jira/browse/KAFKA-6834
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jun Rao
>Assignee: Rajini Sivaram
>Priority: Major
>
> In KAFKA-5316, we added the logic to allow a message (set) larger than the 
> per topic message size to be written to the log during log cleaning. However, 
> the buffer size in the log cleaner is still bounded by the per topic message 
> size. This can cause the log cleaner to die and cause the broker to run out 
> of disk space.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6834) log cleaner should handle the case when the size of a message set is larger than the max message size

2018-04-30 Thread Colin P. McCabe (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458810#comment-16458810
 ] 

Colin P. McCabe commented on KAFKA-6834:


Good catch!

Maybe we should validate the CRC of the overlarge message batch before 
enlarging the buffer, just to make sure we're not allocating memory based on 
corrupt data.

> log cleaner should handle the case when the size of a message set is larger 
> than the max message size
> -
>
> Key: KAFKA-6834
> URL: https://issues.apache.org/jira/browse/KAFKA-6834
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jun Rao
>Assignee: Rajini Sivaram
>Priority: Major
>
> In KAFKA-5316, we added the logic to allow a message (set) larger than the 
> per topic message size to be written to the log during log cleaning. However, 
> the buffer size in the log cleaner is still bounded by the per topic message 
> size. This can cause the log cleaner to die and cause the broker to run out 
> of disk space.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6834) log cleaner should handle the case when the size of a message set is larger than the max message size

2018-04-27 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16457130#comment-16457130
 ] 

Jun Rao commented on KAFKA-6834:


To fix this, we need to handle the cleaner buffer to grow up to the size of a 
single message set in the log.

> log cleaner should handle the case when the size of a message set is larger 
> than the max message size
> -
>
> Key: KAFKA-6834
> URL: https://issues.apache.org/jira/browse/KAFKA-6834
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jun Rao
>Priority: Major
>
> In KAFKA-5316, we added the logic to allow a message (set) larger than the 
> per topic message size to be written to the log during log cleaning. However, 
> the buffer size in the log cleaner is still bounded by the per topic message 
> size. This can cause the log cleaner to die and cause the broker to run out 
> of disk space.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)