jiamei xie created KAFKA-9703: --------------------------------- Summary: ProducerBatch.split takes up too many resources if the bigBatch is huge Key: KAFKA-9703 URL: https://issues.apache.org/jira/browse/KAFKA-9703 Project: Kafka Issue Type: Bug Reporter: jiamei xie
ProducerBatch.split takes up too many resources and might cause outOfMemory error if the bigBatch is huge. About how I found this issue is in https://lists.apache.org/list.html?us...@kafka.apache.org:lte=1M:MESSAGE_TOO_LARGE Following is the code which takes a lot of resources. {code:java} for (Record record : recordBatch) { assert thunkIter.hasNext(); Thunk thunk = thunkIter.next(); if (batch == null) batch = createBatchOffAccumulatorForRecord(record, splitBatchSize); // A newly created batch can always host the first message. if (!batch.tryAppendForSplit(record.timestamp(), record.key(), record.value(), record.headers(), thunk)) { batches.add(batch); batch = createBatchOffAccumulatorForRecord(record, splitBatchSize); batch.tryAppendForSplit(record.timestamp(), record.key(), record.value(), record.headers(), thunk); } {code} Refer to RecordAccumulator#tryAppend, we can call closeForRecordAppends() after a batch is full. {code:java} private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, Deque<ProducerBatch> deque, long nowMs) { ProducerBatch last = deque.peekLast(); if (last != null) { FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs); if (future == null) last.closeForRecordAppends(); else return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false); } return null; } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)