Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-11 Thread via GitHub


satishd merged PR #15820:
URL: https://github.com/apache/kafka/pull/15820


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-11 Thread via GitHub


satishd commented on PR #15820:
URL: https://github.com/apache/kafka/pull/15820#issuecomment-2161862623

   There are a few unrelated test failures, merging it to trunk and 3.8.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-11 Thread via GitHub


satishd commented on PR #15820:
URL: https://github.com/apache/kafka/pull/15820#issuecomment-2161861876

   Thanks @abhijeetk88 for addressing the review comments. LGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-11 Thread via GitHub


showuon commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1634689117


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 isCancelled(), isLeader());
 return;
 }
+
+copyQuotaManagerLock.lock();
+try {
+while (rlmCopyQuotaManager.isQuotaExceeded()) {

Review Comment:
   @abhijeetk88 , thanks for the response. I agree we can keep it as is, and 
just document it clearly in monitoring section: 
https://kafka.apache.org/documentation/#monitoring . We can improve it in the 
future if necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-10 Thread via GitHub


abhijeetk88 commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1633050128


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2730,6 +2742,206 @@ public void testEpochEntriesAsByteBuffer() throws 
Exception {
 }
 
 
+@ParameterizedTest
+@ValueSource(booleans = {false, true})
+public void testCopyQuota(boolean quotaExceeded) throws Exception {
+RemoteLogManager.RLMTask task = setupRLMTask(quotaExceeded);
+
+if (quotaExceeded) {
+// Verify that the copy operation times out, since no segments can 
be copied due to quota being exceeded
+assertThrows(AssertionFailedError.class, () -> 
assertTimeoutPreemptively(Duration.ofMillis(200), () -> 
task.copyLogSegmentsToRemote(mockLog)));
+
+// Verify the highest offset in remote storage is updated only once
+ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class);
+verify(mockLog, 
times(1)).updateHighestOffsetInRemoteStorage(capture.capture());
+// Verify the highest offset in remote storage was -1L before the 
copy started
+assertEquals(-1L, capture.getValue());
+} else {
+// Verify the copy operation completes within the timeout, since 
it does not need to wait for quota availability
+assertTimeoutPreemptively(Duration.ofMillis(100), () -> 
task.copyLogSegmentsToRemote(mockLog));
+
+// Verify quota check was performed
+verify(rlmCopyQuotaManager, times(1)).isQuotaExceeded();
+// Verify bytes to copy was recorded with the quota manager
+verify(rlmCopyQuotaManager, times(1)).record(10);
+
+// Verify the highest offset in remote storage is updated
+ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class);
+verify(mockLog, 
times(2)).updateHighestOffsetInRemoteStorage(capture.capture());
+List capturedValues = capture.getAllValues();
+// Verify the highest offset in remote storage was -1L before the 
copy
+assertEquals(-1L, capturedValues.get(0).longValue());
+// Verify it was updated to 149L after the copy
+assertEquals(149L, capturedValues.get(1).longValue());
+}
+}
+
+@Test
+public void testRLMTaskShutdownDuringQuotaExceededScenario() throws 
Exception {
+RemoteLogManager.RLMTask task = setupRLMTask(true);
+
+Thread t = new Thread(task);
+t.start();
+// Sleep for a while to allow the task to start and quota check to be 
performed once
+Thread.sleep(100);

Review Comment:
   Changed the test as suggested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-09 Thread via GitHub


kamalcph commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1632218079


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2730,6 +2742,206 @@ public void testEpochEntriesAsByteBuffer() throws 
Exception {
 }
 
 
+@ParameterizedTest
+@ValueSource(booleans = {false, true})
+public void testCopyQuota(boolean quotaExceeded) throws Exception {
+RemoteLogManager.RLMTask task = setupRLMTask(quotaExceeded);
+
+if (quotaExceeded) {
+// Verify that the copy operation times out, since no segments can 
be copied due to quota being exceeded
+assertThrows(AssertionFailedError.class, () -> 
assertTimeoutPreemptively(Duration.ofMillis(200), () -> 
task.copyLogSegmentsToRemote(mockLog)));
+
+// Verify the highest offset in remote storage is updated only once
+ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class);
+verify(mockLog, 
times(1)).updateHighestOffsetInRemoteStorage(capture.capture());
+// Verify the highest offset in remote storage was -1L before the 
copy started
+assertEquals(-1L, capture.getValue());
+} else {
+// Verify the copy operation completes within the timeout, since 
it does not need to wait for quota availability
+assertTimeoutPreemptively(Duration.ofMillis(100), () -> 
task.copyLogSegmentsToRemote(mockLog));
+
+// Verify quota check was performed
+verify(rlmCopyQuotaManager, times(1)).isQuotaExceeded();
+// Verify bytes to copy was recorded with the quota manager
+verify(rlmCopyQuotaManager, times(1)).record(10);
+
+// Verify the highest offset in remote storage is updated
+ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class);
+verify(mockLog, 
times(2)).updateHighestOffsetInRemoteStorage(capture.capture());
+List capturedValues = capture.getAllValues();
+// Verify the highest offset in remote storage was -1L before the 
copy
+assertEquals(-1L, capturedValues.get(0).longValue());
+// Verify it was updated to 149L after the copy
+assertEquals(149L, capturedValues.get(1).longValue());
+}
+}
+
+@Test
+public void testRLMTaskShutdownDuringQuotaExceededScenario() throws 
Exception {
+RemoteLogManager.RLMTask task = setupRLMTask(true);
+
+Thread t = new Thread(task);
+t.start();
+// Sleep for a while to allow the task to start and quota check to be 
performed once
+Thread.sleep(100);

Review Comment:
   `sleep` can lead to flaky tests in CI and the shutdown/close behavior is not 
tested E2E. Can we rewrite the test? 
   
   ```
   @Test
   public void testRLMTaskShutdownDuringQuotaExceededScenario() throws 
Exception {
   remoteLogManager.startup();
   setupRLMTask(true);
   remoteLogManager.onLeadershipChange(
   Collections.singleton(mockPartition(leaderTopicIdPartition)), 
Collections.emptySet(), topicIds);
   TestUtils.waitForCondition(() -> {
   verify(rlmCopyQuotaManager, atLeast(1)).isQuotaExceeded();
   return true;
   }, "Quota exceeded check did not happen");
   assertTimeoutPreemptively(Duration.ofMillis(100), () -> 
remoteLogManager.close());
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-09 Thread via GitHub


kamalcph commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1632218079


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2730,6 +2742,206 @@ public void testEpochEntriesAsByteBuffer() throws 
Exception {
 }
 
 
+@ParameterizedTest
+@ValueSource(booleans = {false, true})
+public void testCopyQuota(boolean quotaExceeded) throws Exception {
+RemoteLogManager.RLMTask task = setupRLMTask(quotaExceeded);
+
+if (quotaExceeded) {
+// Verify that the copy operation times out, since no segments can 
be copied due to quota being exceeded
+assertThrows(AssertionFailedError.class, () -> 
assertTimeoutPreemptively(Duration.ofMillis(200), () -> 
task.copyLogSegmentsToRemote(mockLog)));
+
+// Verify the highest offset in remote storage is updated only once
+ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class);
+verify(mockLog, 
times(1)).updateHighestOffsetInRemoteStorage(capture.capture());
+// Verify the highest offset in remote storage was -1L before the 
copy started
+assertEquals(-1L, capture.getValue());
+} else {
+// Verify the copy operation completes within the timeout, since 
it does not need to wait for quota availability
+assertTimeoutPreemptively(Duration.ofMillis(100), () -> 
task.copyLogSegmentsToRemote(mockLog));
+
+// Verify quota check was performed
+verify(rlmCopyQuotaManager, times(1)).isQuotaExceeded();
+// Verify bytes to copy was recorded with the quota manager
+verify(rlmCopyQuotaManager, times(1)).record(10);
+
+// Verify the highest offset in remote storage is updated
+ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class);
+verify(mockLog, 
times(2)).updateHighestOffsetInRemoteStorage(capture.capture());
+List capturedValues = capture.getAllValues();
+// Verify the highest offset in remote storage was -1L before the 
copy
+assertEquals(-1L, capturedValues.get(0).longValue());
+// Verify it was updated to 149L after the copy
+assertEquals(149L, capturedValues.get(1).longValue());
+}
+}
+
+@Test
+public void testRLMTaskShutdownDuringQuotaExceededScenario() throws 
Exception {
+RemoteLogManager.RLMTask task = setupRLMTask(true);
+
+Thread t = new Thread(task);
+t.start();
+// Sleep for a while to allow the task to start and quota check to be 
performed once
+Thread.sleep(100);

Review Comment:
   `sleep` can lead to flaky tests in CI and the shutdown/close behavior is not 
tested E2E. Can we rewrite the test? Also, we have to give some buffer time in 
the close timeout (quotaTimeout + 50 ms) to avoid test flakiness.
   
   ```
   @Test
   public void testRLMTaskShutdownDuringQuotaExceededScenario() throws 
Exception {
   remoteLogManager.startup();
   setupRLMTask(true);
   remoteLogManager.onLeadershipChange(
   Collections.singleton(mockPartition(leaderTopicIdPartition)), 
Collections.emptySet(), topicIds);
   TestUtils.waitForCondition(() -> {
   verify(rlmCopyQuotaManager, atLeast(1)).isQuotaExceeded();
   return true;
   }, "Quota exceeded check did not happen");
   assertTimeoutPreemptively(Duration.ofMillis(150), () -> 
remoteLogManager.close());
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-08 Thread via GitHub


abhijeetk88 commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1632125146


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -750,6 +762,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 isCancelled(), isLeader());
 return;
 }
+
+copyQuotaManagerLock.lock();
+try {
+while (rlmCopyQuotaManager.isQuotaExceeded()) {
+logger.debug("Quota exceeded for copying 
log segments, waiting for the quota to be available.");
+// If the thread gets interrupted while 
waiting, the InterruptedException is thrown
+// back to the caller. It's important to 
note that the task being executed is already
+// cancelled before the executing thread 
is interrupted. The caller is responsible
+// for handling the exception gracefully 
by checking if the task is already cancelled.

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-08 Thread via GitHub


abhijeetk88 commented on PR #15820:
URL: https://github.com/apache/kafka/pull/15820#issuecomment-2156212270

   @kamalcph I have addressed your comments, please take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-06 Thread via GitHub


abhijeetk88 commented on PR #15820:
URL: https://github.com/apache/kafka/pull/15820#issuecomment-2151797298

   Hi @showuon. I have responded to your comment 
[here](https://github.com/apache/kafka/pull/15820#discussion_r1625465435). 
Please take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-05 Thread via GitHub


abhijeetk88 commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1628876545


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -750,6 +762,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 isCancelled(), isLeader());
 return;
 }
+
+copyQuotaManagerLock.lock();
+try {
+while (rlmCopyQuotaManager.isQuotaExceeded()) {
+logger.debug("Quota exceeded for copying 
log segments, waiting for the quota to be available.");
+// If the thread gets interrupted while 
waiting, the InterruptedException is thrown
+// back to the caller. It's important to 
note that the task being executed is already
+// cancelled before the executing thread 
is interrupted. The caller is responsible
+// for handling the exception gracefully 
by checking if the task is already cancelled.

Review Comment:
   Will add.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-05 Thread via GitHub


abhijeetk88 commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1628819130


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 isCancelled(), isLeader());
 return;
 }
+
+copyQuotaManagerLock.lock();
+try {
+while (rlmCopyQuotaManager.isQuotaExceeded()) {

Review Comment:
   I agree that the theadpool will look busy when the threads are waiting for 
the quota to be available. However, we will also have another metric for 
throttling which can be referred to verify if the copy operations are getting 
throttled.
   If we see **threadpool busy and copy being throttled -> copy quota is fully 
utilized and there is nothing to do**
   However, if **threadpool is busy and no throttles are happening -> there are 
not enough threads in the pool and threadpool size needs to be increased**.
   
   Skipping the current run (as Kamal pointed out) will result in ineffective 
use of the quota because the next run of the task happens after 30 sec. Also, 
this may cause starvation for some topic partitions. There is no fairness. For 
eg. the copy for one topic partition gets skipped, but for another one is 
picked immediately if the quota is available. These problems will continue to 
exist if we schedule the next run with 1 sec delay.
   
   It is also not simple to schedule a new RLMTask with a 1-sec delay (if we 
skip the current run). The schedule of the tasks is already fixed because we 
are using a ScheduledTheadPoolExecutor with _scheduleWithFixedDelay_ to 
schedule the tasks. If we need to make the above change, the task needs to own 
its own scheduling. i.e after each run, it schedules its next run. This will be 
a larger change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-05 Thread via GitHub


showuon commented on PR #15820:
URL: https://github.com/apache/kafka/pull/15820#issuecomment-2151343142

   @abhijeetk88 , any update to this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-04 Thread via GitHub


showuon commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1625465435


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 isCancelled(), isLeader());
 return;
 }
+
+copyQuotaManagerLock.lock();
+try {
+while (rlmCopyQuotaManager.isQuotaExceeded()) {

Review Comment:
   Good point! [KAFKA-16887](https://issues.apache.org/jira/browse/KAFKA-16887) 
is created for this issue. 
   I'm also thinking, maybe we could still skip this copy when quota exceeds, 
and waiting for next RLMTask run. And if it's throttled, we can scheduleOnce 
another task in 1 second delay. That will mitigate the issue @kamalcph 
mentioned above, also get the same result. WDYT?



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 isCancelled(), isLeader());
 return;
 }
+
+copyQuotaManagerLock.lock();
+try {
+while (rlmCopyQuotaManager.isQuotaExceeded()) {

Review Comment:
   Good point! [KAFKA-16887](https://issues.apache.org/jira/browse/KAFKA-16887) 
is created for this issue. 
   I'm also thinking, maybe we could still skip this copy when quota exceeds, 
and waiting for next RLMTask run. And if it's throttled, we can scheduleOnce 
another RLMtask in 1 second delay. That will mitigate the issue @kamalcph 
mentioned above, also get the same result. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-04 Thread via GitHub


showuon commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1625465435


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 isCancelled(), isLeader());
 return;
 }
+
+copyQuotaManagerLock.lock();
+try {
+while (rlmCopyQuotaManager.isQuotaExceeded()) {

Review Comment:
   Good point! [KAFKA-16887](https://issues.apache.org/jira/browse/KAFKA-16887) 
is created for this issue. 
   I'm also thinking, maybe we could still skip this copy when quota exceeds, 
and waiting for next RLMTask run. And if it's throttled, we can schedule 
another task in 1 second delay. That will mitigate the issue @kamalcph 
mentioned above, also get the same result. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-03 Thread via GitHub


kamalcph commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1625304008


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 isCancelled(), isLeader());
 return;
 }
+
+copyQuotaManagerLock.lock();
+try {
+while (rlmCopyQuotaManager.isQuotaExceeded()) {

Review Comment:
   One drawback of the current approach is that the thread-pool will be shown 
as all "busy" in the metrics when the threads are waiting for the quota to be 
available. It can raise false alarms if user configures an 100% thread-pool 
usage alert. We can mention about the behavior in the docs section.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-03 Thread via GitHub


kamalcph commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1625304008


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 isCancelled(), isLeader());
 return;
 }
+
+copyQuotaManagerLock.lock();
+try {
+while (rlmCopyQuotaManager.isQuotaExceeded()) {

Review Comment:
   One drawback of the current approach is that the thread-pool will be shown 
as all "busy" in the metrics. It can raise false alarms if user configures an 
alert on top of it. We can mention about the behavior in the docs section.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-03 Thread via GitHub


kamalcph commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1625304008


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 isCancelled(), isLeader());
 return;
 }
+
+copyQuotaManagerLock.lock();
+try {
+while (rlmCopyQuotaManager.isQuotaExceeded()) {

Review Comment:
   @abhijeetk88 
   
   One drawback of the current approach is that the thread-pool will be shown 
as all "busy" in the metrics. Could you confirm whether the threads are "shown 
as" busy when they are in `await`?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-03 Thread via GitHub


kamalcph commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1625290704


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -750,6 +762,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 isCancelled(), isLeader());
 return;
 }
+
+copyQuotaManagerLock.lock();
+try {
+while (rlmCopyQuotaManager.isQuotaExceeded()) {
+logger.debug("Quota exceeded for copying 
log segments, waiting for the quota to be available.");
+// If the thread gets interrupted while 
waiting, the InterruptedException is thrown
+// back to the caller. It's important to 
note that the task being executed is already
+// cancelled before the executing thread 
is interrupted. The caller is responsible
+// for handling the exception gracefully 
by checking if the task is already cancelled.

Review Comment:
   Could you cover shutdown when the quota gets breached as unit test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-03 Thread via GitHub


showuon commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1625271623


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2716,6 +2728,198 @@ public void testFetchQuotaManagerConfig() {
 assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds());
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {false, true})
+public void testCopyQuota(boolean quotaExceeded) throws Exception {
+long oldSegmentStartOffset = 0L;
+long nextSegmentStartOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+// leader epoch preparation
+checkpoint.write(totalEpochEntries);
+LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+// create 2 log segments, with 0 and 150 as log start offset
+LogSegment oldSegment = mock(LogSegment.class);
+LogSegment activeSegment = mock(LogSegment.class);
+
+File tempFile = TestUtils.tempFile();
+FileRecords fileRecords = mock(FileRecords.class);
+when(fileRecords.file()).thenReturn(tempFile);
+when(fileRecords.sizeInBytes()).thenReturn(10);
+
+// Set up the segment that is eligible for copy
+when(oldSegment.log()).thenReturn(fileRecords);
+when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+// set up the active segment
+when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+when(mockLog.activeSegment()).thenReturn(activeSegment);
+when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
 activeSegment)));
+
+File mockProducerSnapshotIndex = TestUtils.tempFile();
+ProducerStateManager mockStateManager = 
mock(ProducerStateManager.class);
+
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+
+when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+when(mockLog.lastStableOffset()).thenReturn(250L);
+
+File tempDir = TestUtils.tempDirectory();
+OffsetIndex idx = 
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, 
oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
+TimeIndex timeIdx = 
LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, 
""), oldSegmentStartOffset, 1500).get();
+File txnFile = UnifiedLog.transactionIndexFile(tempDir, 
oldSegmentStartOffset, "");
+txnFile.createNewFile();
+TransactionIndex txnIndex = new 
TransactionIndex(oldSegmentStartOffset, txnFile);
+when(oldSegment.timeIndex()).thenReturn(timeIdx);
+when(oldSegment.offsetIndex()).thenReturn(idx);
+when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+CompletableFuture dummyFuture = new CompletableFuture<>();
+dummyFuture.complete(null);
+
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
+
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
 any(LogSegmentData.class))).thenReturn(Optional.empty());
+
+when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded);
+doNothing().when(rlmCopyQuotaManager).record(anyInt());
+
+RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+task.convertToLeader(2);
+
+if (quotaExceeded) {
+// Verify that the copy operation times out, since no segments can 
be copied due to quota being exceeded
+try {
+assertTimeoutPreemptively(Duration.ofSeconds(1), () -> 
task.copyLogSegmentsToRemote(mockLog));
+fail(EXPECTED_THE_OPERATION_TO_TIME_OUT);
+} catch (AssertionFailedError e) {
+// Fail the test if the operation completed within the timeout
+if (e.getMessage().equals(EXPECTED_THE_OPERATION_TO_TIME_OUT)) 
{
+fail(e.getMessage());
+}
+}
+
+// Verify the highest offset in remote storage is updated only once
+ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class);
+verify(mockLog, 
times(1)).updateHighestOffsetInRemot

Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-03 Thread via GitHub


abhijeetk88 commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1624109218


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2716,6 +2728,198 @@ public void testFetchQuotaManagerConfig() {
 assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds());
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {false, true})
+public void testCopyQuota(boolean quotaExceeded) throws Exception {
+long oldSegmentStartOffset = 0L;
+long nextSegmentStartOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+// leader epoch preparation
+checkpoint.write(totalEpochEntries);
+LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+// create 2 log segments, with 0 and 150 as log start offset
+LogSegment oldSegment = mock(LogSegment.class);
+LogSegment activeSegment = mock(LogSegment.class);
+
+File tempFile = TestUtils.tempFile();
+FileRecords fileRecords = mock(FileRecords.class);
+when(fileRecords.file()).thenReturn(tempFile);
+when(fileRecords.sizeInBytes()).thenReturn(10);
+
+// Set up the segment that is eligible for copy
+when(oldSegment.log()).thenReturn(fileRecords);
+when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+// set up the active segment
+when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+when(mockLog.activeSegment()).thenReturn(activeSegment);
+when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
 activeSegment)));
+
+File mockProducerSnapshotIndex = TestUtils.tempFile();
+ProducerStateManager mockStateManager = 
mock(ProducerStateManager.class);
+
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+
+when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+when(mockLog.lastStableOffset()).thenReturn(250L);
+
+File tempDir = TestUtils.tempDirectory();
+OffsetIndex idx = 
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, 
oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
+TimeIndex timeIdx = 
LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, 
""), oldSegmentStartOffset, 1500).get();
+File txnFile = UnifiedLog.transactionIndexFile(tempDir, 
oldSegmentStartOffset, "");
+txnFile.createNewFile();
+TransactionIndex txnIndex = new 
TransactionIndex(oldSegmentStartOffset, txnFile);
+when(oldSegment.timeIndex()).thenReturn(timeIdx);
+when(oldSegment.offsetIndex()).thenReturn(idx);
+when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+CompletableFuture dummyFuture = new CompletableFuture<>();
+dummyFuture.complete(null);
+
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
+
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
 any(LogSegmentData.class))).thenReturn(Optional.empty());
+
+when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded);
+doNothing().when(rlmCopyQuotaManager).record(anyInt());
+
+RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+task.convertToLeader(2);
+
+if (quotaExceeded) {
+// Verify that the copy operation times out, since no segments can 
be copied due to quota being exceeded
+try {
+assertTimeoutPreemptively(Duration.ofSeconds(1), () -> 
task.copyLogSegmentsToRemote(mockLog));
+fail(EXPECTED_THE_OPERATION_TO_TIME_OUT);
+} catch (AssertionFailedError e) {
+// Fail the test if the operation completed within the timeout
+if (e.getMessage().equals(EXPECTED_THE_OPERATION_TO_TIME_OUT)) 
{
+fail(e.getMessage());
+}
+}

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: j

Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-03 Thread via GitHub


abhijeetk88 commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1624108812


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -750,6 +762,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 isCancelled(), isLeader());
 return;
 }
+
+copyQuotaManagerLock.lock();
+try {
+while (rlmCopyQuotaManager.isQuotaExceeded()) {
+logger.debug("Quota exceeded for copying 
log segments, waiting for the quota to be available.");
+try {
+
copyQuotaManagerLockCondition.await(quotaTimeout().toMillis(), 
TimeUnit.MILLISECONDS);
+} catch (InterruptedException e) {

Review Comment:
   Addressed this. Also added comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-03 Thread via GitHub


abhijeetk88 commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1624108252


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2716,6 +2728,198 @@ public void testFetchQuotaManagerConfig() {
 assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds());
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {false, true})
+public void testCopyQuota(boolean quotaExceeded) throws Exception {
+long oldSegmentStartOffset = 0L;
+long nextSegmentStartOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+// leader epoch preparation
+checkpoint.write(totalEpochEntries);
+LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+// create 2 log segments, with 0 and 150 as log start offset
+LogSegment oldSegment = mock(LogSegment.class);
+LogSegment activeSegment = mock(LogSegment.class);
+
+File tempFile = TestUtils.tempFile();
+FileRecords fileRecords = mock(FileRecords.class);
+when(fileRecords.file()).thenReturn(tempFile);
+when(fileRecords.sizeInBytes()).thenReturn(10);
+
+// Set up the segment that is eligible for copy
+when(oldSegment.log()).thenReturn(fileRecords);
+when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+// set up the active segment
+when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+when(mockLog.activeSegment()).thenReturn(activeSegment);
+when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
 activeSegment)));
+
+File mockProducerSnapshotIndex = TestUtils.tempFile();
+ProducerStateManager mockStateManager = 
mock(ProducerStateManager.class);
+
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+
+when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+when(mockLog.lastStableOffset()).thenReturn(250L);
+
+File tempDir = TestUtils.tempDirectory();
+OffsetIndex idx = 
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, 
oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
+TimeIndex timeIdx = 
LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, 
""), oldSegmentStartOffset, 1500).get();
+File txnFile = UnifiedLog.transactionIndexFile(tempDir, 
oldSegmentStartOffset, "");
+txnFile.createNewFile();
+TransactionIndex txnIndex = new 
TransactionIndex(oldSegmentStartOffset, txnFile);
+when(oldSegment.timeIndex()).thenReturn(timeIdx);
+when(oldSegment.offsetIndex()).thenReturn(idx);
+when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+CompletableFuture dummyFuture = new CompletableFuture<>();
+dummyFuture.complete(null);
+
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
+
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
 any(LogSegmentData.class))).thenReturn(Optional.empty());
+
+when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded);
+doNothing().when(rlmCopyQuotaManager).record(anyInt());
+
+RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+task.convertToLeader(2);
+
+if (quotaExceeded) {
+// Verify that the copy operation times out, since no segments can 
be copied due to quota being exceeded
+try {
+assertTimeoutPreemptively(Duration.ofSeconds(1), () -> 
task.copyLogSegmentsToRemote(mockLog));

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-03 Thread via GitHub


abhijeetk88 commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1624107462


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -239,6 +244,13 @@ private void removeMetrics() {
 remoteStorageReaderThreadPool.removeMetrics();
 }
 
+/**
+ * Returns the timeout for the RLM Tasks to wait for the quota to be 
available
+ */
+Duration quotaTimeout() {
+return Duration.ofSeconds(1);

Review Comment:
   I want to override this method to have a smaller timeout for tests. That is 
the reason I did not use a constant value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-02 Thread via GitHub


satishd commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1623765428


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -239,6 +244,13 @@ private void removeMetrics() {
 remoteStorageReaderThreadPool.removeMetrics();
 }
 
+/**
+ * Returns the timeout for the RLM Tasks to wait for the quota to be 
available
+ */
+Duration quotaTimeout() {
+return Duration.ofSeconds(1);

Review Comment:
   Can you use the declared constant value?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-05-31 Thread via GitHub


abhijeetk88 commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1622486629


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2716,6 +2728,198 @@ public void testFetchQuotaManagerConfig() {
 assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds());
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {false, true})
+public void testCopyQuota(boolean quotaExceeded) throws Exception {
+long oldSegmentStartOffset = 0L;
+long nextSegmentStartOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+// leader epoch preparation
+checkpoint.write(totalEpochEntries);
+LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+// create 2 log segments, with 0 and 150 as log start offset
+LogSegment oldSegment = mock(LogSegment.class);
+LogSegment activeSegment = mock(LogSegment.class);
+
+File tempFile = TestUtils.tempFile();
+FileRecords fileRecords = mock(FileRecords.class);
+when(fileRecords.file()).thenReturn(tempFile);
+when(fileRecords.sizeInBytes()).thenReturn(10);
+
+// Set up the segment that is eligible for copy
+when(oldSegment.log()).thenReturn(fileRecords);
+when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+// set up the active segment
+when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+when(mockLog.activeSegment()).thenReturn(activeSegment);
+when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
 activeSegment)));
+
+File mockProducerSnapshotIndex = TestUtils.tempFile();
+ProducerStateManager mockStateManager = 
mock(ProducerStateManager.class);
+
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+
+when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+when(mockLog.lastStableOffset()).thenReturn(250L);
+
+File tempDir = TestUtils.tempDirectory();
+OffsetIndex idx = 
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, 
oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
+TimeIndex timeIdx = 
LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, 
""), oldSegmentStartOffset, 1500).get();
+File txnFile = UnifiedLog.transactionIndexFile(tempDir, 
oldSegmentStartOffset, "");
+txnFile.createNewFile();
+TransactionIndex txnIndex = new 
TransactionIndex(oldSegmentStartOffset, txnFile);
+when(oldSegment.timeIndex()).thenReturn(timeIdx);
+when(oldSegment.offsetIndex()).thenReturn(idx);
+when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+CompletableFuture dummyFuture = new CompletableFuture<>();
+dummyFuture.complete(null);
+
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
+
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
 any(LogSegmentData.class))).thenReturn(Optional.empty());
+
+when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded);
+doNothing().when(rlmCopyQuotaManager).record(anyInt());
+
+RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+task.convertToLeader(2);
+
+if (quotaExceeded) {
+// Verify that the copy operation times out, since no segments can 
be copied due to quota being exceeded
+try {
+assertTimeoutPreemptively(Duration.ofSeconds(1), () -> 
task.copyLogSegmentsToRemote(mockLog));
+fail(EXPECTED_THE_OPERATION_TO_TIME_OUT);
+} catch (AssertionFailedError e) {
+// Fail the test if the operation completed within the timeout
+if (e.getMessage().equals(EXPECTED_THE_OPERATION_TO_TIME_OUT)) 
{
+fail(e.getMessage());
+}
+}

Review Comment:
   ok. Got it now. Will simplify this. Thanks for the suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above

Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-05-31 Thread via GitHub


abhijeetk88 commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1622440767


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2716,6 +2728,198 @@ public void testFetchQuotaManagerConfig() {
 assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds());
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {false, true})
+public void testCopyQuota(boolean quotaExceeded) throws Exception {
+long oldSegmentStartOffset = 0L;
+long nextSegmentStartOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+// leader epoch preparation
+checkpoint.write(totalEpochEntries);
+LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+// create 2 log segments, with 0 and 150 as log start offset
+LogSegment oldSegment = mock(LogSegment.class);
+LogSegment activeSegment = mock(LogSegment.class);
+
+File tempFile = TestUtils.tempFile();
+FileRecords fileRecords = mock(FileRecords.class);
+when(fileRecords.file()).thenReturn(tempFile);
+when(fileRecords.sizeInBytes()).thenReturn(10);
+
+// Set up the segment that is eligible for copy
+when(oldSegment.log()).thenReturn(fileRecords);
+when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+// set up the active segment
+when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+when(mockLog.activeSegment()).thenReturn(activeSegment);
+when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
 activeSegment)));
+
+File mockProducerSnapshotIndex = TestUtils.tempFile();
+ProducerStateManager mockStateManager = 
mock(ProducerStateManager.class);
+
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+
+when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+when(mockLog.lastStableOffset()).thenReturn(250L);
+
+File tempDir = TestUtils.tempDirectory();
+OffsetIndex idx = 
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, 
oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
+TimeIndex timeIdx = 
LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, 
""), oldSegmentStartOffset, 1500).get();
+File txnFile = UnifiedLog.transactionIndexFile(tempDir, 
oldSegmentStartOffset, "");
+txnFile.createNewFile();
+TransactionIndex txnIndex = new 
TransactionIndex(oldSegmentStartOffset, txnFile);
+when(oldSegment.timeIndex()).thenReturn(timeIdx);
+when(oldSegment.offsetIndex()).thenReturn(idx);
+when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+CompletableFuture dummyFuture = new CompletableFuture<>();
+dummyFuture.complete(null);
+
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
+
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
 any(LogSegmentData.class))).thenReturn(Optional.empty());
+
+when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded);
+doNothing().when(rlmCopyQuotaManager).record(anyInt());
+
+RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+task.convertToLeader(2);
+
+if (quotaExceeded) {
+// Verify that the copy operation times out, since no segments can 
be copied due to quota being exceeded
+try {
+assertTimeoutPreemptively(Duration.ofSeconds(1), () -> 
task.copyLogSegmentsToRemote(mockLog));
+fail(EXPECTED_THE_OPERATION_TO_TIME_OUT);
+} catch (AssertionFailedError e) {
+// Fail the test if the operation completed within the timeout
+if (e.getMessage().equals(EXPECTED_THE_OPERATION_TO_TIME_OUT)) 
{
+fail(e.getMessage());
+}
+}
+
+// Verify the highest offset in remote storage is updated only once
+ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class);
+verify(mockLog, 
times(1)).updateHighestOffsetInR

Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-05-31 Thread via GitHub


abhijeetk88 commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1622434675


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2716,6 +2728,198 @@ public void testFetchQuotaManagerConfig() {
 assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds());
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {false, true})
+public void testCopyQuota(boolean quotaExceeded) throws Exception {
+long oldSegmentStartOffset = 0L;
+long nextSegmentStartOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+// leader epoch preparation
+checkpoint.write(totalEpochEntries);
+LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+// create 2 log segments, with 0 and 150 as log start offset
+LogSegment oldSegment = mock(LogSegment.class);
+LogSegment activeSegment = mock(LogSegment.class);
+
+File tempFile = TestUtils.tempFile();
+FileRecords fileRecords = mock(FileRecords.class);
+when(fileRecords.file()).thenReturn(tempFile);
+when(fileRecords.sizeInBytes()).thenReturn(10);
+
+// Set up the segment that is eligible for copy
+when(oldSegment.log()).thenReturn(fileRecords);
+when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+// set up the active segment
+when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+when(mockLog.activeSegment()).thenReturn(activeSegment);
+when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
 activeSegment)));
+
+File mockProducerSnapshotIndex = TestUtils.tempFile();
+ProducerStateManager mockStateManager = 
mock(ProducerStateManager.class);
+
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+
+when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+when(mockLog.lastStableOffset()).thenReturn(250L);
+
+File tempDir = TestUtils.tempDirectory();
+OffsetIndex idx = 
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, 
oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
+TimeIndex timeIdx = 
LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, 
""), oldSegmentStartOffset, 1500).get();
+File txnFile = UnifiedLog.transactionIndexFile(tempDir, 
oldSegmentStartOffset, "");
+txnFile.createNewFile();
+TransactionIndex txnIndex = new 
TransactionIndex(oldSegmentStartOffset, txnFile);
+when(oldSegment.timeIndex()).thenReturn(timeIdx);
+when(oldSegment.offsetIndex()).thenReturn(idx);
+when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+CompletableFuture dummyFuture = new CompletableFuture<>();
+dummyFuture.complete(null);
+
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
+
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
 any(LogSegmentData.class))).thenReturn(Optional.empty());
+
+when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded);
+doNothing().when(rlmCopyQuotaManager).record(anyInt());
+
+RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+task.convertToLeader(2);
+
+if (quotaExceeded) {
+// Verify that the copy operation times out, since no segments can 
be copied due to quota being exceeded
+try {
+assertTimeoutPreemptively(Duration.ofSeconds(1), () -> 
task.copyLogSegmentsToRemote(mockLog));

Review Comment:
   Will change it here and in the other test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-05-31 Thread via GitHub


abhijeetk88 commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1622418590


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2716,6 +2728,198 @@ public void testFetchQuotaManagerConfig() {
 assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds());
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {false, true})
+public void testCopyQuota(boolean quotaExceeded) throws Exception {
+long oldSegmentStartOffset = 0L;
+long nextSegmentStartOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+// leader epoch preparation
+checkpoint.write(totalEpochEntries);
+LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+// create 2 log segments, with 0 and 150 as log start offset
+LogSegment oldSegment = mock(LogSegment.class);
+LogSegment activeSegment = mock(LogSegment.class);
+
+File tempFile = TestUtils.tempFile();
+FileRecords fileRecords = mock(FileRecords.class);
+when(fileRecords.file()).thenReturn(tempFile);
+when(fileRecords.sizeInBytes()).thenReturn(10);
+
+// Set up the segment that is eligible for copy
+when(oldSegment.log()).thenReturn(fileRecords);
+when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+// set up the active segment
+when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+when(mockLog.activeSegment()).thenReturn(activeSegment);
+when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
 activeSegment)));
+
+File mockProducerSnapshotIndex = TestUtils.tempFile();
+ProducerStateManager mockStateManager = 
mock(ProducerStateManager.class);
+
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+
+when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+when(mockLog.lastStableOffset()).thenReturn(250L);
+
+File tempDir = TestUtils.tempDirectory();
+OffsetIndex idx = 
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, 
oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
+TimeIndex timeIdx = 
LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, 
""), oldSegmentStartOffset, 1500).get();
+File txnFile = UnifiedLog.transactionIndexFile(tempDir, 
oldSegmentStartOffset, "");
+txnFile.createNewFile();
+TransactionIndex txnIndex = new 
TransactionIndex(oldSegmentStartOffset, txnFile);
+when(oldSegment.timeIndex()).thenReturn(timeIdx);
+when(oldSegment.offsetIndex()).thenReturn(idx);
+when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+CompletableFuture dummyFuture = new CompletableFuture<>();
+dummyFuture.complete(null);
+
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
+
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
 any(LogSegmentData.class))).thenReturn(Optional.empty());
+
+when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded);
+doNothing().when(rlmCopyQuotaManager).record(anyInt());
+
+RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+task.convertToLeader(2);
+
+if (quotaExceeded) {
+// Verify that the copy operation times out, since no segments can 
be copied due to quota being exceeded
+try {
+assertTimeoutPreemptively(Duration.ofSeconds(1), () -> 
task.copyLogSegmentsToRemote(mockLog));
+fail(EXPECTED_THE_OPERATION_TO_TIME_OUT);
+} catch (AssertionFailedError e) {
+// Fail the test if the operation completed within the timeout
+if (e.getMessage().equals(EXPECTED_THE_OPERATION_TO_TIME_OUT)) 
{
+fail(e.getMessage());
+}
+}
+
+// Verify the highest offset in remote storage is updated only once
+ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class);
+verify(mockLog, 
times(1)).updateHighestOffsetInR

Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-05-31 Thread via GitHub


abhijeetk88 commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1622418254


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2716,6 +2728,198 @@ public void testFetchQuotaManagerConfig() {
 assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds());
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {false, true})
+public void testCopyQuota(boolean quotaExceeded) throws Exception {
+long oldSegmentStartOffset = 0L;
+long nextSegmentStartOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+// leader epoch preparation
+checkpoint.write(totalEpochEntries);
+LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+// create 2 log segments, with 0 and 150 as log start offset
+LogSegment oldSegment = mock(LogSegment.class);
+LogSegment activeSegment = mock(LogSegment.class);
+
+File tempFile = TestUtils.tempFile();
+FileRecords fileRecords = mock(FileRecords.class);
+when(fileRecords.file()).thenReturn(tempFile);
+when(fileRecords.sizeInBytes()).thenReturn(10);
+
+// Set up the segment that is eligible for copy
+when(oldSegment.log()).thenReturn(fileRecords);
+when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+// set up the active segment
+when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+when(mockLog.activeSegment()).thenReturn(activeSegment);
+when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
 activeSegment)));
+
+File mockProducerSnapshotIndex = TestUtils.tempFile();
+ProducerStateManager mockStateManager = 
mock(ProducerStateManager.class);
+
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+
+when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+when(mockLog.lastStableOffset()).thenReturn(250L);
+
+File tempDir = TestUtils.tempDirectory();
+OffsetIndex idx = 
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, 
oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
+TimeIndex timeIdx = 
LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, 
""), oldSegmentStartOffset, 1500).get();
+File txnFile = UnifiedLog.transactionIndexFile(tempDir, 
oldSegmentStartOffset, "");
+txnFile.createNewFile();
+TransactionIndex txnIndex = new 
TransactionIndex(oldSegmentStartOffset, txnFile);
+when(oldSegment.timeIndex()).thenReturn(timeIdx);
+when(oldSegment.offsetIndex()).thenReturn(idx);
+when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+CompletableFuture dummyFuture = new CompletableFuture<>();
+dummyFuture.complete(null);
+
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
+
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
 any(LogSegmentData.class))).thenReturn(Optional.empty());
+
+when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded);
+doNothing().when(rlmCopyQuotaManager).record(anyInt());
+
+RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+task.convertToLeader(2);
+
+if (quotaExceeded) {
+// Verify that the copy operation times out, since no segments can 
be copied due to quota being exceeded
+try {
+assertTimeoutPreemptively(Duration.ofSeconds(1), () -> 
task.copyLogSegmentsToRemote(mockLog));
+fail(EXPECTED_THE_OPERATION_TO_TIME_OUT);
+} catch (AssertionFailedError e) {
+// Fail the test if the operation completed within the timeout
+if (e.getMessage().equals(EXPECTED_THE_OPERATION_TO_TIME_OUT)) 
{
+fail(e.getMessage());
+}
+}

Review Comment:
   The suggested approach will not work.
   
   The assertTimeoutPreemptively method throws an assertion failed error if the 
executable does not complete within the timeout. 
   
   I want to verify the

Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-05-31 Thread via GitHub


abhijeetk88 commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1622404030


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -750,6 +762,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 isCancelled(), isLeader());
 return;
 }
+
+copyQuotaManagerLock.lock();
+try {
+while (rlmCopyQuotaManager.isQuotaExceeded()) {
+logger.debug("Quota exceeded for copying 
log segments, waiting for the quota to be available.");
+try {
+
copyQuotaManagerLockCondition.await(quotaTimeout().toMillis(), 
TimeUnit.MILLISECONDS);
+} catch (InterruptedException e) {

Review Comment:
   You are right. We should not be swallowing the exception. Let me change 
this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-05-31 Thread via GitHub


abhijeetk88 commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1622404030


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -750,6 +762,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 isCancelled(), isLeader());
 return;
 }
+
+copyQuotaManagerLock.lock();
+try {
+while (rlmCopyQuotaManager.isQuotaExceeded()) {
+logger.debug("Quota exceeded for copying 
log segments, waiting for the quota to be available.");
+try {
+
copyQuotaManagerLockCondition.await(quotaTimeout().toMillis(), 
TimeUnit.MILLISECONDS);
+} catch (InterruptedException e) {

Review Comment:
   Thanks for catching this. You are right. We should not be swallowing the 
exception, as it may prevent application shutdown. Let me change this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-05-31 Thread via GitHub


showuon commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1622254618


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 isCancelled(), isLeader());
 return;
 }
+
+copyQuotaManagerLock.lock();
+try {
+while (rlmCopyQuotaManager.isQuotaExceeded()) {

Review Comment:
   Fair enough. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-05-31 Thread via GitHub


kamalcph commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1622187413


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 isCancelled(), isLeader());
 return;
 }
+
+copyQuotaManagerLock.lock();
+try {
+while (rlmCopyQuotaManager.isQuotaExceeded()) {

Review Comment:
   > we could skip this copy when quota exceeds, and waiting for next RLMTask 
run? 
   
   RLMTask schedule interval is configurable and defaults to 30 seconds. The 
next run will happen after 30 seconds, the quota might be available before the 
timeout. If we go with the suggested approach, then we may not be utilizing the 
quota effectively. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-05-31 Thread via GitHub


kamalcph commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1622187413


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 isCancelled(), isLeader());
 return;
 }
+
+copyQuotaManagerLock.lock();
+try {
+while (rlmCopyQuotaManager.isQuotaExceeded()) {

Review Comment:
   > we could skip this copy when quota exceeds, and waiting for next RLMTask 
run? 
   
   RLMTask schedule interval is configurable and defaults to 30 seconds. The 
next run will happen after 30 seconds, the quota might be available before the 
next run. If we go with the suggested approach, then we may not be utilizing 
the quota effectively. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-05-31 Thread via GitHub


kamalcph commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1622181661


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2716,6 +2728,198 @@ public void testFetchQuotaManagerConfig() {
 assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds());
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {false, true})
+public void testCopyQuota(boolean quotaExceeded) throws Exception {
+long oldSegmentStartOffset = 0L;
+long nextSegmentStartOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+// leader epoch preparation
+checkpoint.write(totalEpochEntries);
+LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+// create 2 log segments, with 0 and 150 as log start offset
+LogSegment oldSegment = mock(LogSegment.class);
+LogSegment activeSegment = mock(LogSegment.class);
+
+File tempFile = TestUtils.tempFile();
+FileRecords fileRecords = mock(FileRecords.class);
+when(fileRecords.file()).thenReturn(tempFile);
+when(fileRecords.sizeInBytes()).thenReturn(10);
+
+// Set up the segment that is eligible for copy
+when(oldSegment.log()).thenReturn(fileRecords);
+when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+// set up the active segment
+when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+when(mockLog.activeSegment()).thenReturn(activeSegment);
+when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
 activeSegment)));
+
+File mockProducerSnapshotIndex = TestUtils.tempFile();
+ProducerStateManager mockStateManager = 
mock(ProducerStateManager.class);
+
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+
+when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+when(mockLog.lastStableOffset()).thenReturn(250L);
+
+File tempDir = TestUtils.tempDirectory();
+OffsetIndex idx = 
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, 
oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
+TimeIndex timeIdx = 
LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, 
""), oldSegmentStartOffset, 1500).get();
+File txnFile = UnifiedLog.transactionIndexFile(tempDir, 
oldSegmentStartOffset, "");
+txnFile.createNewFile();
+TransactionIndex txnIndex = new 
TransactionIndex(oldSegmentStartOffset, txnFile);
+when(oldSegment.timeIndex()).thenReturn(timeIdx);
+when(oldSegment.offsetIndex()).thenReturn(idx);
+when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+CompletableFuture dummyFuture = new CompletableFuture<>();
+dummyFuture.complete(null);
+
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
+
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
 any(LogSegmentData.class))).thenReturn(Optional.empty());
+
+when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded);
+doNothing().when(rlmCopyQuotaManager).record(anyInt());
+
+RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+task.convertToLeader(2);
+
+if (quotaExceeded) {
+// Verify that the copy operation times out, since no segments can 
be copied due to quota being exceeded
+try {
+assertTimeoutPreemptively(Duration.ofSeconds(1), () -> 
task.copyLogSegmentsToRemote(mockLog));
+fail(EXPECTED_THE_OPERATION_TO_TIME_OUT);
+} catch (AssertionFailedError e) {
+// Fail the test if the operation completed within the timeout
+if (e.getMessage().equals(EXPECTED_THE_OPERATION_TO_TIME_OUT)) 
{
+fail(e.getMessage());
+}
+}
+
+// Verify the highest offset in remote storage is updated only once
+ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class);
+verify(mockLog, 
times(1)).updateHighestOffsetInRemo

Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-05-31 Thread via GitHub


kamalcph commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1622178816


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2716,6 +2728,198 @@ public void testFetchQuotaManagerConfig() {
 assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds());
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {false, true})
+public void testCopyQuota(boolean quotaExceeded) throws Exception {
+long oldSegmentStartOffset = 0L;
+long nextSegmentStartOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+// leader epoch preparation
+checkpoint.write(totalEpochEntries);
+LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+// create 2 log segments, with 0 and 150 as log start offset
+LogSegment oldSegment = mock(LogSegment.class);
+LogSegment activeSegment = mock(LogSegment.class);
+
+File tempFile = TestUtils.tempFile();
+FileRecords fileRecords = mock(FileRecords.class);
+when(fileRecords.file()).thenReturn(tempFile);
+when(fileRecords.sizeInBytes()).thenReturn(10);
+
+// Set up the segment that is eligible for copy
+when(oldSegment.log()).thenReturn(fileRecords);
+when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+// set up the active segment
+when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+when(mockLog.activeSegment()).thenReturn(activeSegment);
+when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
 activeSegment)));
+
+File mockProducerSnapshotIndex = TestUtils.tempFile();
+ProducerStateManager mockStateManager = 
mock(ProducerStateManager.class);
+
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+
+when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+when(mockLog.lastStableOffset()).thenReturn(250L);
+
+File tempDir = TestUtils.tempDirectory();
+OffsetIndex idx = 
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, 
oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
+TimeIndex timeIdx = 
LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, 
""), oldSegmentStartOffset, 1500).get();
+File txnFile = UnifiedLog.transactionIndexFile(tempDir, 
oldSegmentStartOffset, "");
+txnFile.createNewFile();
+TransactionIndex txnIndex = new 
TransactionIndex(oldSegmentStartOffset, txnFile);
+when(oldSegment.timeIndex()).thenReturn(timeIdx);
+when(oldSegment.offsetIndex()).thenReturn(idx);
+when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+CompletableFuture dummyFuture = new CompletableFuture<>();
+dummyFuture.complete(null);
+
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
+
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
 any(LogSegmentData.class))).thenReturn(Optional.empty());
+
+when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded);
+doNothing().when(rlmCopyQuotaManager).record(anyInt());
+
+RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+task.convertToLeader(2);
+
+if (quotaExceeded) {
+// Verify that the copy operation times out, since no segments can 
be copied due to quota being exceeded
+try {
+assertTimeoutPreemptively(Duration.ofSeconds(1), () -> 
task.copyLogSegmentsToRemote(mockLog));
+fail(EXPECTED_THE_OPERATION_TO_TIME_OUT);
+} catch (AssertionFailedError e) {
+// Fail the test if the operation completed within the timeout
+if (e.getMessage().equals(EXPECTED_THE_OPERATION_TO_TIME_OUT)) 
{
+fail(e.getMessage());
+}
+}

Review Comment:
   ```java
   assertThrows(AssertionFailedError.class, () -> 
assertTimeoutPreemptively(Duration.ofSeconds(1), () -> 
task.copyLogSegmentsToRemote(mockLog)));
   ```
   
   can we rewrite it to be concise?

Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-05-31 Thread via GitHub


kamalcph commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1622159510


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -750,6 +762,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 isCancelled(), isLeader());
 return;
 }
+
+copyQuotaManagerLock.lock();
+try {
+while (rlmCopyQuotaManager.isQuotaExceeded()) {
+logger.debug("Quota exceeded for copying 
log segments, waiting for the quota to be available.");
+try {
+
copyQuotaManagerLockCondition.await(quotaTimeout().toMillis(), 
TimeUnit.MILLISECONDS);
+} catch (InterruptedException e) {

Review Comment:
   We are catching the InterruptedException:
   
   1. does it delays the shutdown? If yes, do we have to add a check whether 
the task is cancelled before copying the log segment?
   2. Why the logger level is set to WARN? can it be reduced to INFO?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-05-31 Thread via GitHub


showuon commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1622134355


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 isCancelled(), isLeader());
 return;
 }
+
+copyQuotaManagerLock.lock();
+try {
+while (rlmCopyQuotaManager.isQuotaExceeded()) {

Review Comment:
   @abhijeetk88 @kamalcph , I'm thinking if we could skip this copy when quota 
exceeds, and waiting for next RLMTask run? In that way, the implementation will 
be much simiplier without lock and wait. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-05-31 Thread via GitHub


showuon commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1622091326


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2716,6 +2728,198 @@ public void testFetchQuotaManagerConfig() {
 assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds());
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {false, true})
+public void testCopyQuota(boolean quotaExceeded) throws Exception {
+long oldSegmentStartOffset = 0L;
+long nextSegmentStartOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+// leader epoch preparation
+checkpoint.write(totalEpochEntries);
+LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+// create 2 log segments, with 0 and 150 as log start offset
+LogSegment oldSegment = mock(LogSegment.class);
+LogSegment activeSegment = mock(LogSegment.class);
+
+File tempFile = TestUtils.tempFile();
+FileRecords fileRecords = mock(FileRecords.class);
+when(fileRecords.file()).thenReturn(tempFile);
+when(fileRecords.sizeInBytes()).thenReturn(10);
+
+// Set up the segment that is eligible for copy
+when(oldSegment.log()).thenReturn(fileRecords);
+when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+// set up the active segment
+when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+when(mockLog.activeSegment()).thenReturn(activeSegment);
+when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
 activeSegment)));
+
+File mockProducerSnapshotIndex = TestUtils.tempFile();
+ProducerStateManager mockStateManager = 
mock(ProducerStateManager.class);
+
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+
+when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+when(mockLog.lastStableOffset()).thenReturn(250L);
+
+File tempDir = TestUtils.tempDirectory();
+OffsetIndex idx = 
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, 
oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
+TimeIndex timeIdx = 
LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, 
""), oldSegmentStartOffset, 1500).get();
+File txnFile = UnifiedLog.transactionIndexFile(tempDir, 
oldSegmentStartOffset, "");
+txnFile.createNewFile();
+TransactionIndex txnIndex = new 
TransactionIndex(oldSegmentStartOffset, txnFile);
+when(oldSegment.timeIndex()).thenReturn(timeIdx);
+when(oldSegment.offsetIndex()).thenReturn(idx);
+when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+CompletableFuture dummyFuture = new CompletableFuture<>();
+dummyFuture.complete(null);
+
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
+
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
 any(LogSegmentData.class))).thenReturn(Optional.empty());
+
+when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded);
+doNothing().when(rlmCopyQuotaManager).record(anyInt());
+
+RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+task.convertToLeader(2);
+
+if (quotaExceeded) {
+// Verify that the copy operation times out, since no segments can 
be copied due to quota being exceeded
+try {
+assertTimeoutPreemptively(Duration.ofSeconds(1), () -> 
task.copyLogSegmentsToRemote(mockLog));

Review Comment:
   I still see this is 1 second. I think we can add some buffer to give it 2x 
waiting time is enough, which is 200ms. Same as below. Otherwise, we need to 
run these tests at least 1 second.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-05-30 Thread via GitHub


satishd commented on PR #15820:
URL: https://github.com/apache/kafka/pull/15820#issuecomment-2141069773

   @abhijeetk88 Can you resolve the conflicts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-05-29 Thread via GitHub


abhijeetk88 commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1619996598


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2530,6 +2549,269 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, l
 
 }
 }
+
+@Test
+public void testCopyQuotaManagerConfig() {
+Properties defaultProps = new Properties();
+RemoteLogManagerConfig defaultRlmConfig = 
createRLMConfig(defaultProps);
+RLMQuotaManagerConfig defaultConfig = 
RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig);
+assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond());
+assertEquals(61, defaultConfig.numQuotaSamples());
+assertEquals(1, defaultConfig.quotaWindowSizeSeconds());
+
+Properties customProps = new Properties();
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
 100);
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP,
 31);
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP,
 1);
+RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps);
+RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = 
RemoteLogManager.copyQuotaManagerConfig(rlmConfig);
+assertEquals(100L, rlmCopyQuotaManagerConfig.quotaBytesPerSecond());
+assertEquals(31, rlmCopyQuotaManagerConfig.numQuotaSamples());
+assertEquals(1, rlmCopyQuotaManagerConfig.quotaWindowSizeSeconds());
+}
+
+@Test
+public void testFetchQuotaManagerConfig() {
+Properties defaultProps = new Properties();
+RemoteLogManagerConfig defaultRlmConfig = 
createRLMConfig(defaultProps);
+RLMQuotaManagerConfig defaultConfig = 
RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig);
+assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond());
+assertEquals(11, defaultConfig.numQuotaSamples());
+assertEquals(1, defaultConfig.quotaWindowSizeSeconds());
+
+Properties customProps = new Properties();
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP,
 100);
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP,
 31);
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP,
 1);
+RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps);
+RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = 
RemoteLogManager.fetchQuotaManagerConfig(rlmConfig);
+assertEquals(100L, rlmFetchQuotaManagerConfig.quotaBytesPerSecond());
+assertEquals(31, rlmFetchQuotaManagerConfig.numQuotaSamples());
+assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds());
+}
+
+@Test
+public void testCopyQuotaNotExceeded() throws Exception {
+long oldSegmentStartOffset = 0L;
+long nextSegmentStartOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+// leader epoch preparation
+checkpoint.write(totalEpochEntries);
+LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+// create 2 log segments, with 0 and 150 as log start offset
+LogSegment oldSegment = mock(LogSegment.class);
+LogSegment activeSegment = mock(LogSegment.class);
+
+File tempFile = TestUtils.tempFile();
+FileRecords fileRecords = mock(FileRecords.class);
+when(fileRecords.file()).thenReturn(tempFile);
+when(fileRecords.sizeInBytes()).thenReturn(10);
+
+// Set up the segment that will be copied
+when(oldSegment.log()).thenReturn(fileRecords);
+when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+// set up the active segment
+when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+when(mockLog.activeSegment()).thenReturn(activeSegment);
+when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
 activeSegment)));
+
+File mockProducerSnapshotIndex = TestUtils.tempFile();
+ProducerStateManager mockStateManager = 
mock(ProducerStateManager.class);
+
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+
+when(mockLog.producer

Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-05-29 Thread via GitHub


abhijeetk88 commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1619996273


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2530,6 +2549,269 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, l
 
 }
 }
+
+@Test
+public void testCopyQuotaManagerConfig() {
+Properties defaultProps = new Properties();
+RemoteLogManagerConfig defaultRlmConfig = 
createRLMConfig(defaultProps);
+RLMQuotaManagerConfig defaultConfig = 
RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig);
+assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond());
+assertEquals(61, defaultConfig.numQuotaSamples());
+assertEquals(1, defaultConfig.quotaWindowSizeSeconds());
+
+Properties customProps = new Properties();
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
 100);
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP,
 31);
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP,
 1);
+RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps);
+RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = 
RemoteLogManager.copyQuotaManagerConfig(rlmConfig);
+assertEquals(100L, rlmCopyQuotaManagerConfig.quotaBytesPerSecond());
+assertEquals(31, rlmCopyQuotaManagerConfig.numQuotaSamples());
+assertEquals(1, rlmCopyQuotaManagerConfig.quotaWindowSizeSeconds());
+}
+
+@Test
+public void testFetchQuotaManagerConfig() {
+Properties defaultProps = new Properties();
+RemoteLogManagerConfig defaultRlmConfig = 
createRLMConfig(defaultProps);
+RLMQuotaManagerConfig defaultConfig = 
RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig);
+assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond());
+assertEquals(11, defaultConfig.numQuotaSamples());
+assertEquals(1, defaultConfig.quotaWindowSizeSeconds());
+
+Properties customProps = new Properties();
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP,
 100);
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP,
 31);
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP,
 1);
+RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps);
+RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = 
RemoteLogManager.fetchQuotaManagerConfig(rlmConfig);
+assertEquals(100L, rlmFetchQuotaManagerConfig.quotaBytesPerSecond());
+assertEquals(31, rlmFetchQuotaManagerConfig.numQuotaSamples());
+assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds());
+}
+
+@Test
+public void testCopyQuotaNotExceeded() throws Exception {
+long oldSegmentStartOffset = 0L;
+long nextSegmentStartOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+// leader epoch preparation
+checkpoint.write(totalEpochEntries);
+LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+// create 2 log segments, with 0 and 150 as log start offset
+LogSegment oldSegment = mock(LogSegment.class);
+LogSegment activeSegment = mock(LogSegment.class);
+
+File tempFile = TestUtils.tempFile();
+FileRecords fileRecords = mock(FileRecords.class);
+when(fileRecords.file()).thenReturn(tempFile);
+when(fileRecords.sizeInBytes()).thenReturn(10);
+
+// Set up the segment that will be copied
+when(oldSegment.log()).thenReturn(fileRecords);
+when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+// set up the active segment
+when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+when(mockLog.activeSegment()).thenReturn(activeSegment);
+when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
 activeSegment)));
+
+File mockProducerSnapshotIndex = TestUtils.tempFile();
+ProducerStateManager mockStateManager = 
mock(ProducerStateManager.class);
+
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+
+when(mockLog.producer

Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-05-28 Thread via GitHub


abhijeetk88 commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1618199858


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2530,6 +2549,269 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, l
 
 }
 }
+
+@Test
+public void testCopyQuotaManagerConfig() {
+Properties defaultProps = new Properties();
+RemoteLogManagerConfig defaultRlmConfig = 
createRLMConfig(defaultProps);
+RLMQuotaManagerConfig defaultConfig = 
RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig);
+assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond());
+assertEquals(61, defaultConfig.numQuotaSamples());
+assertEquals(1, defaultConfig.quotaWindowSizeSeconds());
+
+Properties customProps = new Properties();
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
 100);
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP,
 31);
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP,
 1);
+RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps);
+RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = 
RemoteLogManager.copyQuotaManagerConfig(rlmConfig);
+assertEquals(100L, rlmCopyQuotaManagerConfig.quotaBytesPerSecond());
+assertEquals(31, rlmCopyQuotaManagerConfig.numQuotaSamples());
+assertEquals(1, rlmCopyQuotaManagerConfig.quotaWindowSizeSeconds());
+}
+
+@Test
+public void testFetchQuotaManagerConfig() {
+Properties defaultProps = new Properties();
+RemoteLogManagerConfig defaultRlmConfig = 
createRLMConfig(defaultProps);
+RLMQuotaManagerConfig defaultConfig = 
RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig);
+assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond());
+assertEquals(11, defaultConfig.numQuotaSamples());
+assertEquals(1, defaultConfig.quotaWindowSizeSeconds());
+
+Properties customProps = new Properties();
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP,
 100);
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP,
 31);
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP,
 1);
+RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps);
+RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = 
RemoteLogManager.fetchQuotaManagerConfig(rlmConfig);
+assertEquals(100L, rlmFetchQuotaManagerConfig.quotaBytesPerSecond());
+assertEquals(31, rlmFetchQuotaManagerConfig.numQuotaSamples());
+assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds());
+}
+
+@Test
+public void testCopyQuotaNotExceeded() throws Exception {
+long oldSegmentStartOffset = 0L;
+long nextSegmentStartOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+// leader epoch preparation
+checkpoint.write(totalEpochEntries);
+LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+// create 2 log segments, with 0 and 150 as log start offset
+LogSegment oldSegment = mock(LogSegment.class);
+LogSegment activeSegment = mock(LogSegment.class);
+
+File tempFile = TestUtils.tempFile();
+FileRecords fileRecords = mock(FileRecords.class);
+when(fileRecords.file()).thenReturn(tempFile);
+when(fileRecords.sizeInBytes()).thenReturn(10);
+
+// Set up the segment that will be copied
+when(oldSegment.log()).thenReturn(fileRecords);
+when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+// set up the active segment
+when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+when(mockLog.activeSegment()).thenReturn(activeSegment);
+when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
 activeSegment)));
+
+File mockProducerSnapshotIndex = TestUtils.tempFile();
+ProducerStateManager mockStateManager = 
mock(ProducerStateManager.class);
+
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+
+when(mockLog.producer

Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-05-28 Thread via GitHub


abhijeetk88 commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1618195673


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2530,6 +2549,269 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, l
 
 }
 }
+
+@Test
+public void testCopyQuotaManagerConfig() {
+Properties defaultProps = new Properties();
+RemoteLogManagerConfig defaultRlmConfig = 
createRLMConfig(defaultProps);
+RLMQuotaManagerConfig defaultConfig = 
RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig);
+assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond());
+assertEquals(61, defaultConfig.numQuotaSamples());
+assertEquals(1, defaultConfig.quotaWindowSizeSeconds());
+
+Properties customProps = new Properties();
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
 100);
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP,
 31);
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP,
 1);
+RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps);
+RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = 
RemoteLogManager.copyQuotaManagerConfig(rlmConfig);
+assertEquals(100L, rlmCopyQuotaManagerConfig.quotaBytesPerSecond());
+assertEquals(31, rlmCopyQuotaManagerConfig.numQuotaSamples());
+assertEquals(1, rlmCopyQuotaManagerConfig.quotaWindowSizeSeconds());
+}
+
+@Test
+public void testFetchQuotaManagerConfig() {
+Properties defaultProps = new Properties();
+RemoteLogManagerConfig defaultRlmConfig = 
createRLMConfig(defaultProps);
+RLMQuotaManagerConfig defaultConfig = 
RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig);
+assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond());
+assertEquals(11, defaultConfig.numQuotaSamples());
+assertEquals(1, defaultConfig.quotaWindowSizeSeconds());
+
+Properties customProps = new Properties();
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP,
 100);
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP,
 31);
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP,
 1);
+RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps);
+RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = 
RemoteLogManager.fetchQuotaManagerConfig(rlmConfig);
+assertEquals(100L, rlmFetchQuotaManagerConfig.quotaBytesPerSecond());
+assertEquals(31, rlmFetchQuotaManagerConfig.numQuotaSamples());
+assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds());
+}
+
+@Test
+public void testCopyQuotaNotExceeded() throws Exception {
+long oldSegmentStartOffset = 0L;
+long nextSegmentStartOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+// leader epoch preparation
+checkpoint.write(totalEpochEntries);
+LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+// create 2 log segments, with 0 and 150 as log start offset
+LogSegment oldSegment = mock(LogSegment.class);
+LogSegment activeSegment = mock(LogSegment.class);
+
+File tempFile = TestUtils.tempFile();
+FileRecords fileRecords = mock(FileRecords.class);
+when(fileRecords.file()).thenReturn(tempFile);
+when(fileRecords.sizeInBytes()).thenReturn(10);
+
+// Set up the segment that will be copied
+when(oldSegment.log()).thenReturn(fileRecords);
+when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+// set up the active segment
+when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+when(mockLog.activeSegment()).thenReturn(activeSegment);
+when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
 activeSegment)));
+
+File mockProducerSnapshotIndex = TestUtils.tempFile();
+ProducerStateManager mockStateManager = 
mock(ProducerStateManager.class);
+
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+
+when(mockLog.producer

Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-05-28 Thread via GitHub


abhijeetk88 commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1618195673


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2530,6 +2549,269 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, l
 
 }
 }
+
+@Test
+public void testCopyQuotaManagerConfig() {
+Properties defaultProps = new Properties();
+RemoteLogManagerConfig defaultRlmConfig = 
createRLMConfig(defaultProps);
+RLMQuotaManagerConfig defaultConfig = 
RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig);
+assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond());
+assertEquals(61, defaultConfig.numQuotaSamples());
+assertEquals(1, defaultConfig.quotaWindowSizeSeconds());
+
+Properties customProps = new Properties();
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
 100);
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP,
 31);
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP,
 1);
+RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps);
+RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = 
RemoteLogManager.copyQuotaManagerConfig(rlmConfig);
+assertEquals(100L, rlmCopyQuotaManagerConfig.quotaBytesPerSecond());
+assertEquals(31, rlmCopyQuotaManagerConfig.numQuotaSamples());
+assertEquals(1, rlmCopyQuotaManagerConfig.quotaWindowSizeSeconds());
+}
+
+@Test
+public void testFetchQuotaManagerConfig() {
+Properties defaultProps = new Properties();
+RemoteLogManagerConfig defaultRlmConfig = 
createRLMConfig(defaultProps);
+RLMQuotaManagerConfig defaultConfig = 
RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig);
+assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond());
+assertEquals(11, defaultConfig.numQuotaSamples());
+assertEquals(1, defaultConfig.quotaWindowSizeSeconds());
+
+Properties customProps = new Properties();
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP,
 100);
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP,
 31);
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP,
 1);
+RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps);
+RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = 
RemoteLogManager.fetchQuotaManagerConfig(rlmConfig);
+assertEquals(100L, rlmFetchQuotaManagerConfig.quotaBytesPerSecond());
+assertEquals(31, rlmFetchQuotaManagerConfig.numQuotaSamples());
+assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds());
+}
+
+@Test
+public void testCopyQuotaNotExceeded() throws Exception {
+long oldSegmentStartOffset = 0L;
+long nextSegmentStartOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+// leader epoch preparation
+checkpoint.write(totalEpochEntries);
+LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+// create 2 log segments, with 0 and 150 as log start offset
+LogSegment oldSegment = mock(LogSegment.class);
+LogSegment activeSegment = mock(LogSegment.class);
+
+File tempFile = TestUtils.tempFile();
+FileRecords fileRecords = mock(FileRecords.class);
+when(fileRecords.file()).thenReturn(tempFile);
+when(fileRecords.sizeInBytes()).thenReturn(10);
+
+// Set up the segment that will be copied
+when(oldSegment.log()).thenReturn(fileRecords);
+when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+// set up the active segment
+when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+when(mockLog.activeSegment()).thenReturn(activeSegment);
+when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
 activeSegment)));
+
+File mockProducerSnapshotIndex = TestUtils.tempFile();
+ProducerStateManager mockStateManager = 
mock(ProducerStateManager.class);
+
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+
+when(mockLog.producer

Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-05-28 Thread via GitHub


abhijeetk88 commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1618191025


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2530,6 +2549,269 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, l
 
 }
 }
+
+@Test
+public void testCopyQuotaManagerConfig() {
+Properties defaultProps = new Properties();
+RemoteLogManagerConfig defaultRlmConfig = 
createRLMConfig(defaultProps);
+RLMQuotaManagerConfig defaultConfig = 
RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig);
+assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond());
+assertEquals(61, defaultConfig.numQuotaSamples());
+assertEquals(1, defaultConfig.quotaWindowSizeSeconds());
+
+Properties customProps = new Properties();
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
 100);
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP,
 31);
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP,
 1);
+RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps);
+RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = 
RemoteLogManager.copyQuotaManagerConfig(rlmConfig);
+assertEquals(100L, rlmCopyQuotaManagerConfig.quotaBytesPerSecond());
+assertEquals(31, rlmCopyQuotaManagerConfig.numQuotaSamples());
+assertEquals(1, rlmCopyQuotaManagerConfig.quotaWindowSizeSeconds());
+}
+
+@Test
+public void testFetchQuotaManagerConfig() {
+Properties defaultProps = new Properties();
+RemoteLogManagerConfig defaultRlmConfig = 
createRLMConfig(defaultProps);
+RLMQuotaManagerConfig defaultConfig = 
RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig);
+assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond());
+assertEquals(11, defaultConfig.numQuotaSamples());
+assertEquals(1, defaultConfig.quotaWindowSizeSeconds());
+
+Properties customProps = new Properties();
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP,
 100);
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP,
 31);
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP,
 1);
+RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps);
+RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = 
RemoteLogManager.fetchQuotaManagerConfig(rlmConfig);
+assertEquals(100L, rlmFetchQuotaManagerConfig.quotaBytesPerSecond());
+assertEquals(31, rlmFetchQuotaManagerConfig.numQuotaSamples());
+assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds());
+}
+
+@Test
+public void testCopyQuotaNotExceeded() throws Exception {
+long oldSegmentStartOffset = 0L;
+long nextSegmentStartOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+// leader epoch preparation
+checkpoint.write(totalEpochEntries);
+LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+// create 2 log segments, with 0 and 150 as log start offset
+LogSegment oldSegment = mock(LogSegment.class);
+LogSegment activeSegment = mock(LogSegment.class);
+
+File tempFile = TestUtils.tempFile();
+FileRecords fileRecords = mock(FileRecords.class);
+when(fileRecords.file()).thenReturn(tempFile);
+when(fileRecords.sizeInBytes()).thenReturn(10);
+
+// Set up the segment that will be copied
+when(oldSegment.log()).thenReturn(fileRecords);
+when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+// set up the active segment
+when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+when(mockLog.activeSegment()).thenReturn(activeSegment);
+when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
 activeSegment)));
+
+File mockProducerSnapshotIndex = TestUtils.tempFile();
+ProducerStateManager mockStateManager = 
mock(ProducerStateManager.class);
+
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+
+when(mockLog.producer

Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-05-28 Thread via GitHub


abhijeetk88 commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1618190777


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -153,6 +156,8 @@ public class RemoteLogManager implements Closeable {
 
 private final RemoteLogMetadataManager remoteLogMetadataManager;
 
+private final ReentrantLock copyQuotaManagerLock = new ReentrantLock(true);

Review Comment:
   We want the threads that were blocked first to be unblocked first. This is 
to prevent the starvation of some threads. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-05-28 Thread via GitHub


abhijeetk88 commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1618189739


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 isCancelled(), isLeader());
 return;
 }
+
+copyQuotaManagerLock.lock();
+try {
+while (rlmCopyQuotaManager.isQuotaExceeded()) {

Review Comment:
   Yes, this was planned as part of KIP-950 implementation. We will have a 
separate change for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-05-28 Thread via GitHub


kamalcph commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1617086269


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -237,6 +242,13 @@ private void removeMetrics() {
 remoteStorageReaderThreadPool.removeMetrics();
 }
 
+/**
+ * Returns the timeout for the RLM Tasks to wait for the quota to be 
available
+ */
+Duration quotaTimeout() {

Review Comment:
   shall we rename `quotaTimeout` to `throttleTimeMs`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-05-28 Thread via GitHub


kamalcph commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1617519570


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 isCancelled(), isLeader());
 return;
 }
+
+copyQuotaManagerLock.lock();
+try {
+while (rlmCopyQuotaManager.isQuotaExceeded()) {

Review Comment:
   We can this up in 
[KAFKA-16853](https://issues.apache.org/jira/browse/KAFKA-16853)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-05-28 Thread via GitHub


kamalcph commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1617105411


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 isCancelled(), isLeader());
 return;
 }
+
+copyQuotaManagerLock.lock();
+try {
+while (rlmCopyQuotaManager.isQuotaExceeded()) {

Review Comment:
   If the copy quota gets breached, all the `kafka-rlm-thread-pool` threads 
will wait for the quota to be available which might delay the deletion of 
remote log segments since the same thread does both copy and delete.
   
   Do you plan to split the copy/delete operations in a separate thread pool?



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -237,6 +242,13 @@ private void removeMetrics() {
 remoteStorageReaderThreadPool.removeMetrics();
 }
 
+/**
+ * Returns the timeout for the RLM Tasks to wait for the quota to be 
available
+ */
+Duration quotaTimeout() {

Review Comment:
   shall we rename `quotaTimeout` to `throttleTimeMs`?



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -153,6 +156,8 @@ public class RemoteLogManager implements Closeable {
 
 private final RemoteLogMetadataManager remoteLogMetadataManager;
 
+private final ReentrantLock copyQuotaManagerLock = new ReentrantLock(true);

Review Comment:
   Why `fairness` is turned on to true?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-05-27 Thread via GitHub


showuon commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1615786577


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2530,6 +2549,269 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, l
 
 }
 }
+
+@Test
+public void testCopyQuotaManagerConfig() {
+Properties defaultProps = new Properties();
+RemoteLogManagerConfig defaultRlmConfig = 
createRLMConfig(defaultProps);
+RLMQuotaManagerConfig defaultConfig = 
RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig);
+assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond());
+assertEquals(61, defaultConfig.numQuotaSamples());
+assertEquals(1, defaultConfig.quotaWindowSizeSeconds());
+
+Properties customProps = new Properties();
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
 100);
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP,
 31);
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP,
 1);
+RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps);
+RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = 
RemoteLogManager.copyQuotaManagerConfig(rlmConfig);
+assertEquals(100L, rlmCopyQuotaManagerConfig.quotaBytesPerSecond());
+assertEquals(31, rlmCopyQuotaManagerConfig.numQuotaSamples());
+assertEquals(1, rlmCopyQuotaManagerConfig.quotaWindowSizeSeconds());
+}
+
+@Test
+public void testFetchQuotaManagerConfig() {
+Properties defaultProps = new Properties();
+RemoteLogManagerConfig defaultRlmConfig = 
createRLMConfig(defaultProps);
+RLMQuotaManagerConfig defaultConfig = 
RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig);
+assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond());
+assertEquals(11, defaultConfig.numQuotaSamples());
+assertEquals(1, defaultConfig.quotaWindowSizeSeconds());
+
+Properties customProps = new Properties();
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP,
 100);
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP,
 31);
+
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP,
 1);
+RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps);
+RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = 
RemoteLogManager.fetchQuotaManagerConfig(rlmConfig);
+assertEquals(100L, rlmFetchQuotaManagerConfig.quotaBytesPerSecond());
+assertEquals(31, rlmFetchQuotaManagerConfig.numQuotaSamples());
+assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds());
+}
+
+@Test
+public void testCopyQuotaNotExceeded() throws Exception {
+long oldSegmentStartOffset = 0L;
+long nextSegmentStartOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+// leader epoch preparation
+checkpoint.write(totalEpochEntries);
+LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+// create 2 log segments, with 0 and 150 as log start offset
+LogSegment oldSegment = mock(LogSegment.class);
+LogSegment activeSegment = mock(LogSegment.class);
+
+File tempFile = TestUtils.tempFile();
+FileRecords fileRecords = mock(FileRecords.class);
+when(fileRecords.file()).thenReturn(tempFile);
+when(fileRecords.sizeInBytes()).thenReturn(10);
+
+// Set up the segment that will be copied
+when(oldSegment.log()).thenReturn(fileRecords);
+when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+// set up the active segment
+when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+when(mockLog.activeSegment()).thenReturn(activeSegment);
+when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
 activeSegment)));
+
+File mockProducerSnapshotIndex = TestUtils.tempFile();
+ProducerStateManager mockStateManager = 
mock(ProducerStateManager.class);
+
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+
+when(mockLog.producerStat