[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

2021-04-19 Thread GitBox


vamossagar12 commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r616018340



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -76,6 +76,11 @@
 "wait for writes to accumulate before flushing them to disk.";
 public static final int DEFAULT_QUORUM_LINGER_MS = 25;
 
+public static final String QUORUM_APPEND_MAX_UNFLUSHED_BYTES_CONFIG = 
QUORUM_PREFIX + "append.max.unflushed.bytes";

Review comment:
   hi @jsancio / @hachikuji  could you plz review the PR whenever you get 
the chance?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

2021-04-02 Thread GitBox


vamossagar12 commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r606198207



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -76,6 +76,11 @@
 "wait for writes to accumulate before flushing them to disk.";
 public static final int DEFAULT_QUORUM_LINGER_MS = 25;
 
+public static final String QUORUM_APPEND_MAX_UNFLUSHED_BYTES_CONFIG = 
QUORUM_PREFIX + "append.max.unflushed.bytes";

Review comment:
   hey @jsancio .. sorry to bother again, but could you plz review this 
whenever you get the chance?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

2021-03-12 Thread GitBox


vamossagar12 commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r593033011



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -76,6 +76,11 @@
 "wait for writes to accumulate before flushing them to disk.";

Review comment:
   I updated the doc for the maxUnflushedBytes config.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

2021-03-12 Thread GitBox


vamossagar12 commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r593032627



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -76,6 +76,11 @@
 "wait for writes to accumulate before flushing them to disk.";
 public static final int DEFAULT_QUORUM_LINGER_MS = 25;
 
+public static final String QUORUM_APPEND_MAX_UNFLUSHED_BYTES_CONFIG = 
QUORUM_PREFIX + "append.max.unflushed.bytes";

Review comment:
   I see what you are trying to say. Well, the premise of this ticket 
originally was to trigger fsyncs happen the moment a configured amount of bytes 
have been accumulated. Here is the original description for Jason in the ticket:
   
   > In KAFKA-10601, we implemented linger semantics similar to the producer to 
let the leader accumulate a batch of writes before fsyncing them to disk. 
Currently the fsync is only based on the linger time, but it would be helpful 
to make it size-based as well. In other words, if we accumulate a configurable 
N bytes, then we should not wait for linger expiration and should just fsync 
immediately.
   
   But as you pointed out, it is also due to the fact that in the current 
implementation batch append and fsync goes hand in hand. 
   
   With the future implementation on deferring fsync, this might just affect 
the batch appends and considering that in mind, imo it makes sense to rename it 
to `append.linger.bytes` . It also matches with `append.linger.ms` .
   
   BTW, on the fsync deferral track. i had created a draft PR where i have 
outlined my approach: https://github.com/apache/kafka/pull/10278
   
   Request you or Jason to review this..





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

2021-03-12 Thread GitBox


vamossagar12 commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r593024705



##
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##
@@ -474,6 +474,7 @@ public void testAccumulatorClearedAfterBecomingFollower() 
throws Exception {
 .thenReturn(buffer);
 
 RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+.withMaxUnflushedBytes(KafkaRaftClient.MAX_BATCH_SIZE)

Review comment:
   Yeah. The reason for that is that in this PR @hachikuji had suggested to 
change the logic for setting maxBatchSize in BatchAccumulator to the following 
way: 
   
   `this.maxBatchSize = Math.min(maxBatchSize, maxUnflushedBytes);`
   
   That is the reason I am setting some combinations of value to check if it 
behaves correctly. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

2021-03-12 Thread GitBox


vamossagar12 commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r593019808



##
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##
@@ -617,6 +620,38 @@ public void 
testChannelWokenUpIfLingerTimeoutReachedDuringAppend() throws Except
 assertEquals(3L, context.log.endOffset().offset);
 }
 
+@Test
+public void testChannelWokenUpIfMinFlushSizeReachedDuringAppend() throws 
Exception {
+// This test verifies that the client will get woken up immediately
+// if the linger timeout has expired during an append
+
+int localId = 0;
+int otherNodeId = 1;
+int minFlushSizeInBytes = 120;
+Set voters = Utils.mkSet(localId, otherNodeId);
+
+RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+.withMaxUnflushedBytes(minFlushSizeInBytes)

Review comment:
   yeah that was an oversight as well. Changed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

2021-03-12 Thread GitBox


vamossagar12 commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r593019665



##
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##
@@ -617,6 +620,38 @@ public void 
testChannelWokenUpIfLingerTimeoutReachedDuringAppend() throws Except
 assertEquals(3L, context.log.endOffset().offset);
 }
 
+@Test
+public void testChannelWokenUpIfMinFlushSizeReachedDuringAppend() throws 
Exception {
+// This test verifies that the client will get woken up immediately
+// if the linger timeout has expired during an append

Review comment:
   yes.. changed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

2021-03-12 Thread GitBox


vamossagar12 commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r593019531



##
File path: 
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
##
@@ -1078,6 +1078,7 @@ private static FetchResponseData snapshotFetchResponse(
 private static SnapshotWriter snapshotWriter(RaftClientTestContext 
context, RawSnapshotWriter snapshot) {
 return new SnapshotWriter<>(
 snapshot,
+1024,

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

2021-01-13 Thread GitBox


vamossagar12 commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r556429212



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -37,6 +37,7 @@
 private final Time time;
 private final SimpleTimer lingerTimer;
 private final int lingerMs;
+private final int minFlushSize;

Review comment:
   @hachikuji , @jsancio I agree to the points. I am slightly confused on 
the advantage of letting the config to be set higher than maxBatchSize(1MB) and 
then limiting the effective batch size to the minimum of `maxUnflushedBytes` 
and `maxBatchSize`. Won't it lead to confusion in terms of usage?
   Infact, I have followed a similar approach in the PR in a different way. 
What I have done is restricting the value of this new config within a range of 
0(which is wrong, need to change) to maxBatchSize - 1:
   
   
   ```.define(QUORUM_FLUSH_MIN_SIZE_BYTES_CONFIG,
   ConfigDef.Type.INT,
   1024,
   between(0, (1024 * 1024) - 1),
   ConfigDef.Importance.MEDIUM,
   QUORUM_FLUSH_MIN_SIZE_BYTES_DOC);
   ```
   
   I feel this way, what the config provides and what actually happens remains 
the same and effectively, this will be equivalent to `min(maxBatchsize, 
maxUnflushedBytes)`.
   
   Also @hachikuji , wanted to understand how the newly proposed config; 
`quorum.append.max.linger.ms` would interplay with the existing 
`quorum.append.linger.ms` config.  As per my understanding, the moment 
`quorum.append.linger.ms` is crossed, the flush would start. This happens even 
in this new implementation irrespective of hitting `maxUnflushedBytes` or not. 
Are you suggesting that we still hold onto writes until we hit the 
`quorum.append.max.linger.ms`  thereby overriding `quorum.append.linger.ms`? 
   
   Considering just these 2 configs, I am confused of how it will work but if 
we add the `maxUnflushedBytes`, then probably we will have to write some logic 
to hold onto writes between `quorum.append.linger.ms` and 
`quorum.append.max.linger.ms`. Maybe, we give further time between this time 
range to let the writes be accumulated. The moment we hit either of the 2 i.e 
`maxUnflushedBytes` or `quorum.append.max.linger.ms`, we flush the writes. This 
gives me the impression that we are giving more preference to 
`maxUnflushedBytes`. 
   
   Let me know your thoughts plz.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

2021-01-13 Thread GitBox


vamossagar12 commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r556429212



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -37,6 +37,7 @@
 private final Time time;
 private final SimpleTimer lingerTimer;
 private final int lingerMs;
+private final int minFlushSize;

Review comment:
   @hachikuji , @jsancio I agree to the points. I am slightly confused on 
the advantage of letting the config to be set higher than maxBatchSize(1MB) and 
then limiting the effective batch size to the minimum of `maxUnflushedBytes` 
and `maxBatchSize`. Won't it lead to confusion in terms of usage?
   Infact, I have followed a similar approach in the PR in a different way. 
What I have done is restricting the value of this new config within a range of 
0(which is wrong, need to change) to maxBatchSize - 1:
   
   
   ```.define(QUORUM_FLUSH_MIN_SIZE_BYTES_CONFIG,
   ConfigDef.Type.INT,
   1024,
   between(0, (1024 * 1024) - 1),
   ConfigDef.Importance.MEDIUM,
   QUORUM_FLUSH_MIN_SIZE_BYTES_DOC);
   ```
   
   I feel this way, what the config provides and what actually happens remains 
the same and effectively, this will be equivalent to `min(maxBatchsize, 
maxUnflushedBytes)`.
   
   Let me know your thoughts plz.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

2021-01-07 Thread GitBox


vamossagar12 commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r553481214



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -211,6 +214,33 @@ public long timeUntilDrain(long currentTimeMs) {
 }
 }
 
+/**
+ * Check if the current batch size has exceeded the min flush size.
+ *
+ * Note that this method works on best effort i.e it tries to acquire the 
append lock and if it can't
+ * then instead of blocking, it returns false.
+ *
+ * This means that if the thread responsible for appending is holding the 
lock and the linger time hasn't expired
+ * yet, then even though the batch size exceeds the min flush size, the 
records won't be drained as the lock
+ * couldn't be acquired. This also means that in subsequent run(s), this 
method should be able to acquire the lock
+ * and return true in the event the linger time hasn't expired yet.
+ *
+ * @return true if the append lock could be acquired and the accumulated 
bytes are greater than configured min flush
+ * bytes size, false otherwise.
+ */
+public boolean batchSizeExceedsMinFlushSize() {

Review comment:
   Yeah completed batches should be considered. Also, with what you have 
suggested, it removes the need to hold the lock again in 
batchSizeExceedsMinFlushSize. Basically, we won't even need this method anymore.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

2021-01-07 Thread GitBox


vamossagar12 commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r553472551



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -37,6 +37,7 @@
 private final Time time;
 private final SimpleTimer lingerTimer;
 private final int lingerMs;
+private final int minFlushSize;

Review comment:
   The way I thought about it is to still keep it configurable and keep it 
within the bounds of maxBatchSize. With that, the users would have the option 
of being able to flush more frequently based on size if it suits them and not 
wait for time bound flushes. We should still ensure a lower bound for this 
config otherwise, the fsyncs can become too frequent which can have adverse 
effects. I agree, setting it higher than `maxBatchSize` also might not be too 
useful as that might potentially lead to delayed fsyncs if in case the linger 
time has also been set to a higher value.
   
   Having said that, IMO it might be useful to keep minFlushSize configurable 
which gives the users more knobs to control the behaviour based upon their 
needs(even though more knobs sometimes can mean more confusion). Do you think 
it makes sense?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org