hachikuji commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r552267020



##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##########
@@ -59,6 +60,11 @@
     private static final String QUORUM_LINGER_MS_DOC = "The duration in 
milliseconds that the leader will " +
         "wait for writes to accumulate before flushing them to disk.";
 
+    public static final String QUORUM_FLUSH_MIN_SIZE_BYTES_CONFIG = 
QUORUM_PREFIX + "flush.minSize.bytes";

Review comment:
       If we decide to keep this, how about we call it 
`quorum.append.max.unflushed.bytes` or something like that? Basically it's the 
maximum number of bytes that the raft implementation is allowed to accumulate 
before forcing an fsync.

##########
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -37,6 +37,7 @@
     private final Time time;
     private final SimpleTimer lingerTimer;
     private final int lingerMs;
+    private final int minFlushSize;

Review comment:
       The main thing I'm wondering is if it makes sense to merge 
`minFlushSize` and `maxBatchSize` into a single configuration. I think that 
would simplify the implementation a bit since we could then check if 
`completed` is not empty to know whether to drain. It is clear that 
`minFlushSize` should be at least as large as `maxBatchSize`, but I'm not sure 
how useful it is for it to be larger. I was planning to keep `maxBatchSize` a 
static configuration, so I guess if we want `minFlushSize` to be configurable, 
then we need to allow for `minFlushSize` to be larger. What do you think?

##########
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -211,6 +214,33 @@ public long timeUntilDrain(long currentTimeMs) {
         }
     }
 
+    /**
+     * Check if the current batch size has exceeded the min flush size.
+     *
+     * Note that this method works on best effort i.e it tries to acquire the 
append lock and if it can't
+     * then instead of blocking, it returns false.
+     *
+     * This means that if the thread responsible for appending is holding the 
lock and the linger time hasn't expired
+     * yet, then even though the batch size exceeds the min flush size, the 
records won't be drained as the lock
+     * couldn't be acquired. This also means that in subsequent run(s), this 
method should be able to acquire the lock
+     * and return true in the event the linger time hasn't expired yet.
+     *
+     * @return true if the append lock could be acquired and the accumulated 
bytes are greater than configured min flush
+     * bytes size, false otherwise.
+     */
+    public boolean batchSizeExceedsMinFlushSize() {

Review comment:
       I think we need to look at completed batches in here as well, right? To 
simplify the implementation, I think we could do something like the following:
   
   1. Inside `append`, while we are already holding the lock, we can check if 
the accumulated bytes (including `completed` and `currentBatch`) have reached 
`minFlushSize`. If so, we can call `completeCurrentBatch` to ensure that 
`completed` holds all the data that needs to be drained.
   2. Inside `timeUntilDrain`, if the linger timer hasn't been reached, we can 
iterate `completed` and check if there are enough bytes to flush. Then we don't 
need to acquire the lock unless we need to drain.
   
   Would that work?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to