hachikuji commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r560627883



##########
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -37,6 +37,7 @@
     private final Time time;
     private final SimpleTimer lingerTimer;
     private final int lingerMs;
+    private final int minFlushSize;

Review comment:
       > I am slightly confused on the advantage of letting the config to be 
set higher than maxBatchSize(1MB) and then limiting the effective batch size to 
the minimum of maxUnflushedBytes and maxBatchSize. Won't it lead to confusion 
in terms of usage?
   
   Not sure if this is the cause of the confusion, but just to be clear, what 
I'm referring to as the batch size is just a limit to the size of a 
`DefaultRecordBatch`. Records in Kafka are grouped together into batches with a 
schema roughly like this:
   ```
   Batch => Size Offset Crc ... [Record]
   ```
   It is useful to keep batches from getting too large because we don't have a 
way to index into a batch without storing the whole decompressed contents in 
memory. 
   
   However, we can let `maxUnflushedBytes` be as large as we want. That just 
means that we will collect multiple batches (i.e. multiple `CompletedBatch` 
instances) as the code already does. So if `maxUnflushedBytes` is set to 4MB, 
then we will end up collecting 4x1MB batches before we flush to disk rather 
than collecting 1x4MB batch. 
   
   We could also require that `maxUnflushedBytes` be less than or equal to our 
desired max batch size, but if it's a similar level of effort to support the 
more general config, then that seems better.
   
   > Also @hachikuji , wanted to understand how the newly proposed config; 
quorum.append.max.linger.ms would interplay with the existing 
quorum.append.linger.ms config. As per my understanding, the moment 
quorum.append.linger.ms is crossed, the flush would start. This happens even in 
this new implementation irrespective of hitting maxUnflushedBytes or not. Are 
you suggesting that we still hold onto writes until we hit the 
quorum.append.max.linger.ms thereby overriding quorum.append.linger.ms?
   
   My expectation is that we flush after either of these are reached. So if the 
linger time is hit first, then we take whatever unflushed bytes we have even if 
they are smaller than `maxUnflushedBytes`. On the other hand, if 
`maxUnflushedBytes` is reached, then we ignore linger. I think this is 
consistent with the implementation I suggested here: 
https://github.com/apache/kafka/pull/9756#discussion_r552268919. Basically we 
override `timeUntilDrain` so that it returns 0 when `maxUnflushedBytes` is 
reached.
   
   Hope that helps!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to