jsancio commented on a change in pull request #10063: URL: https://github.com/apache/kafka/pull/10063#discussion_r578711429
########## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java ########## @@ -307,4 +309,38 @@ public int writeRecord( recordOutput.writeVarint(0); return ByteUtils.sizeOfVarint(sizeInBytes) + sizeInBytes; } + + private int batchHeaderSizeInBytes() { + return AbstractRecords.recordBatchHeaderSizeInBytes( + RecordBatch.MAGIC_VALUE_V2, + compressionType + ); + } + + private int bytesNeededForRecords( + Collection<T> records, + ObjectSerializationCache serializationCache + ) { + long expectedNextOffset = nextOffset; + int bytesNeeded = 0; + for (T record : records) { + if (expectedNextOffset - baseOffset >= Integer.MAX_VALUE) { + return Integer.MAX_VALUE; Review comment: True. I think the cleanest solution is to throw in this case. I was thinking of returning `OptionalInt.of(Integer.MAX_VALUE)` and checking for it in the `BatchAccumulator`. With this solution we can't distinguish between the current batch plus the new records exceed this constraint. And the new records that are being added to an empty batch exceed this constraint. I think throwing an exception is fine since this is a fatal error. This mean that we have at least `Integer.MAX_VALUE` which would most like result in a OOM exception somewhere else in the system. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org