jsancio commented on a change in pull request #10063:
URL: https://github.com/apache/kafka/pull/10063#discussion_r578711429



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java
##########
@@ -307,4 +309,38 @@ public int writeRecord(
         recordOutput.writeVarint(0);
         return ByteUtils.sizeOfVarint(sizeInBytes) + sizeInBytes;
     }
+
+    private int batchHeaderSizeInBytes() {
+        return AbstractRecords.recordBatchHeaderSizeInBytes(
+            RecordBatch.MAGIC_VALUE_V2,
+            compressionType
+        );
+    }
+
+    private int bytesNeededForRecords(
+        Collection<T> records,
+        ObjectSerializationCache serializationCache
+    ) {
+        long expectedNextOffset = nextOffset;
+        int bytesNeeded = 0;
+        for (T record : records) {
+            if (expectedNextOffset - baseOffset >= Integer.MAX_VALUE) {
+                return Integer.MAX_VALUE;

Review comment:
       True. I think the cleanest solution is to throw in this case.
   
   I was thinking of returning `OptionalInt.of(Integer.MAX_VALUE)` and checking 
for it in the `BatchAccumulator`. With this solution we can't distinguish 
between the current batch plus the new records exceed this constraint. And the 
new records that are being added to an empty batch exceed this constraint.
   
   I think throwing an exception is fine since this is a fatal error. This mean 
that we have at least `Integer.MAX_VALUE` which would most like result in a OOM 
exception somewhere else in the system.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to