[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata
vamossagar12 commented on a change in pull request #9756: URL: https://github.com/apache/kafka/pull/9756#discussion_r616018340 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ## @@ -76,6 +76,11 @@ "wait for writes to accumulate before flushing them to disk."; public static final int DEFAULT_QUORUM_LINGER_MS = 25; +public static final String QUORUM_APPEND_MAX_UNFLUSHED_BYTES_CONFIG = QUORUM_PREFIX + "append.max.unflushed.bytes"; Review comment: hi @jsancio / @hachikuji could you plz review the PR whenever you get the chance? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata
vamossagar12 commented on a change in pull request #9756: URL: https://github.com/apache/kafka/pull/9756#discussion_r606198207 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ## @@ -76,6 +76,11 @@ "wait for writes to accumulate before flushing them to disk."; public static final int DEFAULT_QUORUM_LINGER_MS = 25; +public static final String QUORUM_APPEND_MAX_UNFLUSHED_BYTES_CONFIG = QUORUM_PREFIX + "append.max.unflushed.bytes"; Review comment: hey @jsancio .. sorry to bother again, but could you plz review this whenever you get the chance? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata
vamossagar12 commented on a change in pull request #9756: URL: https://github.com/apache/kafka/pull/9756#discussion_r593033011 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ## @@ -76,6 +76,11 @@ "wait for writes to accumulate before flushing them to disk."; Review comment: I updated the doc for the maxUnflushedBytes config. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata
vamossagar12 commented on a change in pull request #9756: URL: https://github.com/apache/kafka/pull/9756#discussion_r593032627 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ## @@ -76,6 +76,11 @@ "wait for writes to accumulate before flushing them to disk."; public static final int DEFAULT_QUORUM_LINGER_MS = 25; +public static final String QUORUM_APPEND_MAX_UNFLUSHED_BYTES_CONFIG = QUORUM_PREFIX + "append.max.unflushed.bytes"; Review comment: I see what you are trying to say. Well, the premise of this ticket originally was to trigger fsyncs happen the moment a configured amount of bytes have been accumulated. Here is the original description for Jason in the ticket: > In KAFKA-10601, we implemented linger semantics similar to the producer to let the leader accumulate a batch of writes before fsyncing them to disk. Currently the fsync is only based on the linger time, but it would be helpful to make it size-based as well. In other words, if we accumulate a configurable N bytes, then we should not wait for linger expiration and should just fsync immediately. But as you pointed out, it is also due to the fact that in the current implementation batch append and fsync goes hand in hand. With the future implementation on deferring fsync, this might just affect the batch appends and considering that in mind, imo it makes sense to rename it to `append.linger.bytes` . It also matches with `append.linger.ms` . BTW, on the fsync deferral track. i had created a draft PR where i have outlined my approach: https://github.com/apache/kafka/pull/10278 Request you or Jason to review this.. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata
vamossagar12 commented on a change in pull request #9756: URL: https://github.com/apache/kafka/pull/9756#discussion_r593024705 ## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java ## @@ -474,6 +474,7 @@ public void testAccumulatorClearedAfterBecomingFollower() throws Exception { .thenReturn(buffer); RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) +.withMaxUnflushedBytes(KafkaRaftClient.MAX_BATCH_SIZE) Review comment: Yeah. The reason for that is that in this PR @hachikuji had suggested to change the logic for setting maxBatchSize in BatchAccumulator to the following way: `this.maxBatchSize = Math.min(maxBatchSize, maxUnflushedBytes);` That is the reason I am setting some combinations of value to check if it behaves correctly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata
vamossagar12 commented on a change in pull request #9756: URL: https://github.com/apache/kafka/pull/9756#discussion_r593019808 ## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java ## @@ -617,6 +620,38 @@ public void testChannelWokenUpIfLingerTimeoutReachedDuringAppend() throws Except assertEquals(3L, context.log.endOffset().offset); } +@Test +public void testChannelWokenUpIfMinFlushSizeReachedDuringAppend() throws Exception { +// This test verifies that the client will get woken up immediately +// if the linger timeout has expired during an append + +int localId = 0; +int otherNodeId = 1; +int minFlushSizeInBytes = 120; +Set voters = Utils.mkSet(localId, otherNodeId); + +RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) +.withMaxUnflushedBytes(minFlushSizeInBytes) Review comment: yeah that was an oversight as well. Changed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata
vamossagar12 commented on a change in pull request #9756: URL: https://github.com/apache/kafka/pull/9756#discussion_r593019665 ## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java ## @@ -617,6 +620,38 @@ public void testChannelWokenUpIfLingerTimeoutReachedDuringAppend() throws Except assertEquals(3L, context.log.endOffset().offset); } +@Test +public void testChannelWokenUpIfMinFlushSizeReachedDuringAppend() throws Exception { +// This test verifies that the client will get woken up immediately +// if the linger timeout has expired during an append Review comment: yes.. changed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata
vamossagar12 commented on a change in pull request #9756: URL: https://github.com/apache/kafka/pull/9756#discussion_r593019531 ## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java ## @@ -1078,6 +1078,7 @@ private static FetchResponseData snapshotFetchResponse( private static SnapshotWriter snapshotWriter(RaftClientTestContext context, RawSnapshotWriter snapshot) { return new SnapshotWriter<>( snapshot, +1024, Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata
vamossagar12 commented on a change in pull request #9756: URL: https://github.com/apache/kafka/pull/9756#discussion_r556429212 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ## @@ -37,6 +37,7 @@ private final Time time; private final SimpleTimer lingerTimer; private final int lingerMs; +private final int minFlushSize; Review comment: @hachikuji , @jsancio I agree to the points. I am slightly confused on the advantage of letting the config to be set higher than maxBatchSize(1MB) and then limiting the effective batch size to the minimum of `maxUnflushedBytes` and `maxBatchSize`. Won't it lead to confusion in terms of usage? Infact, I have followed a similar approach in the PR in a different way. What I have done is restricting the value of this new config within a range of 0(which is wrong, need to change) to maxBatchSize - 1: ```.define(QUORUM_FLUSH_MIN_SIZE_BYTES_CONFIG, ConfigDef.Type.INT, 1024, between(0, (1024 * 1024) - 1), ConfigDef.Importance.MEDIUM, QUORUM_FLUSH_MIN_SIZE_BYTES_DOC); ``` I feel this way, what the config provides and what actually happens remains the same and effectively, this will be equivalent to `min(maxBatchsize, maxUnflushedBytes)`. Also @hachikuji , wanted to understand how the newly proposed config; `quorum.append.max.linger.ms` would interplay with the existing `quorum.append.linger.ms` config. As per my understanding, the moment `quorum.append.linger.ms` is crossed, the flush would start. This happens even in this new implementation irrespective of hitting `maxUnflushedBytes` or not. Are you suggesting that we still hold onto writes until we hit the `quorum.append.max.linger.ms` thereby overriding `quorum.append.linger.ms`? Considering just these 2 configs, I am confused of how it will work but if we add the `maxUnflushedBytes`, then probably we will have to write some logic to hold onto writes between `quorum.append.linger.ms` and `quorum.append.max.linger.ms`. Maybe, we give further time between this time range to let the writes be accumulated. The moment we hit either of the 2 i.e `maxUnflushedBytes` or `quorum.append.max.linger.ms`, we flush the writes. This gives me the impression that we are giving more preference to `maxUnflushedBytes`. Let me know your thoughts plz. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata
vamossagar12 commented on a change in pull request #9756: URL: https://github.com/apache/kafka/pull/9756#discussion_r556429212 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ## @@ -37,6 +37,7 @@ private final Time time; private final SimpleTimer lingerTimer; private final int lingerMs; +private final int minFlushSize; Review comment: @hachikuji , @jsancio I agree to the points. I am slightly confused on the advantage of letting the config to be set higher than maxBatchSize(1MB) and then limiting the effective batch size to the minimum of `maxUnflushedBytes` and `maxBatchSize`. Won't it lead to confusion in terms of usage? Infact, I have followed a similar approach in the PR in a different way. What I have done is restricting the value of this new config within a range of 0(which is wrong, need to change) to maxBatchSize - 1: ```.define(QUORUM_FLUSH_MIN_SIZE_BYTES_CONFIG, ConfigDef.Type.INT, 1024, between(0, (1024 * 1024) - 1), ConfigDef.Importance.MEDIUM, QUORUM_FLUSH_MIN_SIZE_BYTES_DOC); ``` I feel this way, what the config provides and what actually happens remains the same and effectively, this will be equivalent to `min(maxBatchsize, maxUnflushedBytes)`. Let me know your thoughts plz. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata
vamossagar12 commented on a change in pull request #9756: URL: https://github.com/apache/kafka/pull/9756#discussion_r553481214 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ## @@ -211,6 +214,33 @@ public long timeUntilDrain(long currentTimeMs) { } } +/** + * Check if the current batch size has exceeded the min flush size. + * + * Note that this method works on best effort i.e it tries to acquire the append lock and if it can't + * then instead of blocking, it returns false. + * + * This means that if the thread responsible for appending is holding the lock and the linger time hasn't expired + * yet, then even though the batch size exceeds the min flush size, the records won't be drained as the lock + * couldn't be acquired. This also means that in subsequent run(s), this method should be able to acquire the lock + * and return true in the event the linger time hasn't expired yet. + * + * @return true if the append lock could be acquired and the accumulated bytes are greater than configured min flush + * bytes size, false otherwise. + */ +public boolean batchSizeExceedsMinFlushSize() { Review comment: Yeah completed batches should be considered. Also, with what you have suggested, it removes the need to hold the lock again in batchSizeExceedsMinFlushSize. Basically, we won't even need this method anymore. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata
vamossagar12 commented on a change in pull request #9756: URL: https://github.com/apache/kafka/pull/9756#discussion_r553472551 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ## @@ -37,6 +37,7 @@ private final Time time; private final SimpleTimer lingerTimer; private final int lingerMs; +private final int minFlushSize; Review comment: The way I thought about it is to still keep it configurable and keep it within the bounds of maxBatchSize. With that, the users would have the option of being able to flush more frequently based on size if it suits them and not wait for time bound flushes. We should still ensure a lower bound for this config otherwise, the fsyncs can become too frequent which can have adverse effects. I agree, setting it higher than `maxBatchSize` also might not be too useful as that might potentially lead to delayed fsyncs if in case the linger time has also been set to a higher value. Having said that, IMO it might be useful to keep minFlushSize configurable which gives the users more knobs to control the behaviour based upon their needs(even though more knobs sometimes can mean more confusion). Do you think it makes sense? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org