jiamei xie created KAFKA-9703:
---------------------------------
Summary: ProducerBatch.split takes up too many resources if the
bigBatch is huge
Key: KAFKA-9703
URL: https://issues.apache.org/jira/browse/KAFKA-9703
Project: Kafka
Issue Type: Bug
Reporter: jiamei xie
ProducerBatch.split takes up too many resources and might cause outOfMemory
error if the bigBatch is huge. About how I found this issue is in
https://lists.apache.org/[email protected]:lte=1M:MESSAGE_TOO_LARGE
Following is the code which takes a lot of resources.
{code:java}
for (Record record : recordBatch) {
assert thunkIter.hasNext();
Thunk thunk = thunkIter.next();
if (batch == null)
batch = createBatchOffAccumulatorForRecord(record,
splitBatchSize);
// A newly created batch can always host the first message.
if (!batch.tryAppendForSplit(record.timestamp(), record.key(),
record.value(), record.headers(), thunk)) {
batches.add(batch);
batch = createBatchOffAccumulatorForRecord(record,
splitBatchSize);
batch.tryAppendForSplit(record.timestamp(), record.key(),
record.value(), record.headers(), thunk);
}
{code}
Refer to RecordAccumulator#tryAppend, we can call closeForRecordAppends() after
a batch is full.
{code:java}
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[]
value, Header[] headers,
Callback callback,
Deque<ProducerBatch> deque, long nowMs) {
ProducerBatch last = deque.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value,
headers, callback, nowMs);
if (future == null)
last.closeForRecordAppends();
else
return new RecordAppendResult(future, deque.size() > 1 ||
last.isFull(), false, false);
}
return null;
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)