[jira] [Issue Comment Deleted] (KAFKA-9703) ProducerBatch.split takes up too many resources if the bigBatch is huge

2020-03-12 Thread jiamei xie (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jiamei xie updated KAFKA-9703:
--
Comment: was deleted

(was: https://issues.apache.org/jira/browse/KAFKA-9703)

> ProducerBatch.split takes up too many resources if the bigBatch is huge
> ---
>
> Key: KAFKA-9703
> URL: https://issues.apache.org/jira/browse/KAFKA-9703
> Project: Kafka
>  Issue Type: Bug
>Reporter: jiamei xie
>Priority: Major
>
> ProducerBatch.split takes up too many resources  and might cause outOfMemory 
> error if the bigBatch is huge. About how I found this issue is in 
> https://lists.apache.org/list.html?us...@kafka.apache.org:lte=1M:MESSAGE_TOO_LARGE
> Following is the code which takes a lot of resources.
> {code:java}
>  for (Record record : recordBatch) {
> assert thunkIter.hasNext();
> Thunk thunk = thunkIter.next();
> if (batch == null)
> batch = createBatchOffAccumulatorForRecord(record, 
> splitBatchSize);
> // A newly created batch can always host the first message.
> if (!batch.tryAppendForSplit(record.timestamp(), record.key(), 
> record.value(), record.headers(), thunk)) {
> batches.add(batch);
> batch = createBatchOffAccumulatorForRecord(record, 
> splitBatchSize);
> batch.tryAppendForSplit(record.timestamp(), record.key(), 
> record.value(), record.headers(), thunk);
> }
> {code}
> Refer to RecordAccumulator#tryAppend, we can call closeForRecordAppends() 
> after a batch is full.
> {code:java}
> private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] 
> value, Header[] headers,
>  Callback callback, 
> Deque deque, long nowMs) {
> ProducerBatch last = deque.peekLast();
> if (last != null) {
> FutureRecordMetadata future = last.tryAppend(timestamp, key, 
> value, headers, callback, nowMs);
> if (future == null)
> last.closeForRecordAppends();
> else
> return new RecordAppendResult(future, deque.size() > 1 || 
> last.isFull(), false, false);
> }
> return null;
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (KAFKA-9703) ProducerBatch.split takes up too many resources if the bigBatch is huge

2020-03-12 Thread jiamei xie (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jiamei xie updated KAFKA-9703:
--
Comment: was deleted

(was: PR has been created in github.com/apache/kafka/pull/8286)

> ProducerBatch.split takes up too many resources if the bigBatch is huge
> ---
>
> Key: KAFKA-9703
> URL: https://issues.apache.org/jira/browse/KAFKA-9703
> Project: Kafka
>  Issue Type: Bug
>Reporter: jiamei xie
>Priority: Major
>
> ProducerBatch.split takes up too many resources  and might cause outOfMemory 
> error if the bigBatch is huge. About how I found this issue is in 
> https://lists.apache.org/list.html?us...@kafka.apache.org:lte=1M:MESSAGE_TOO_LARGE
> Following is the code which takes a lot of resources.
> {code:java}
>  for (Record record : recordBatch) {
> assert thunkIter.hasNext();
> Thunk thunk = thunkIter.next();
> if (batch == null)
> batch = createBatchOffAccumulatorForRecord(record, 
> splitBatchSize);
> // A newly created batch can always host the first message.
> if (!batch.tryAppendForSplit(record.timestamp(), record.key(), 
> record.value(), record.headers(), thunk)) {
> batches.add(batch);
> batch = createBatchOffAccumulatorForRecord(record, 
> splitBatchSize);
> batch.tryAppendForSplit(record.timestamp(), record.key(), 
> record.value(), record.headers(), thunk);
> }
> {code}
> Refer to RecordAccumulator#tryAppend, we can call closeForRecordAppends() 
> after a batch is full.
> {code:java}
> private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] 
> value, Header[] headers,
>  Callback callback, 
> Deque deque, long nowMs) {
> ProducerBatch last = deque.peekLast();
> if (last != null) {
> FutureRecordMetadata future = last.tryAppend(timestamp, key, 
> value, headers, callback, nowMs);
> if (future == null)
> last.closeForRecordAppends();
> else
> return new RecordAppendResult(future, deque.size() > 1 || 
> last.isFull(), false, false);
> }
> return null;
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)