This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
     new 0b4fcbb16d0 KAFKA-15265: Integrate RLMQuotaManager for throttling 
copies to remote storage (#15820)
0b4fcbb16d0 is described below

commit 0b4fcbb16d0fbab67df906bdfe0bb7b880503e22
Author: Abhijeet Kumar <abhijeet.cse....@gmail.com>
AuthorDate: Wed Jun 12 06:27:02 2024 +0530

    KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote 
storage (#15820)
    
    - Added the integration of the quota manager to throttle copy requests to 
the remote storage. Reference KIP-956
    - Added unit-tests for the copy throttling logic.
    
    Reviewers: Satish Duggana <sati...@apache.org>, Luke Chen 
<show...@gmail.com>, Kamal Chandraprakash<kamal.chandraprak...@gmail.com>
---
 checkstyle/import-control-core.xml                 |   1 +
 .../java/kafka/log/remote/RemoteLogManager.java    |  29 +++
 .../kafka/log/remote/RemoteLogManagerTest.java     | 210 +++++++++++++++++++++
 3 files changed, 240 insertions(+)

diff --git a/checkstyle/import-control-core.xml 
b/checkstyle/import-control-core.xml
index ed6c53a322b..a30de55e415 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -38,6 +38,7 @@
   <allow pkg="org.apache.kafka.common" />
   <allow pkg="org.mockito" class="AssignmentsManagerTest"/>
   <allow pkg="org.apache.kafka.server"/>
+  <allow pkg="org.opentest4j" class="RemoteLogManagerTest"/>
   <!-- see KIP-544 for why KafkaYammerMetrics should be used instead of the 
global default yammer metrics registry
        
https://cwiki.apache.org/confluence/display/KAFKA/KIP-544%3A+Make+metrics+exposed+via+JMX+configurable
 -->
   <disallow class="com.yammer.metrics.Metrics" />
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 43c03190767..b920a962afc 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -97,6 +97,7 @@ import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.security.PrivilegedAction;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -123,6 +124,8 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -160,6 +163,8 @@ public class RemoteLogManager implements Closeable {
 
     private final RemoteLogMetadataManager remoteLogMetadataManager;
 
+    private final ReentrantLock copyQuotaManagerLock = new ReentrantLock(true);
+    private final Condition copyQuotaManagerLockCondition = 
copyQuotaManagerLock.newCondition();
     private final RLMQuotaManager rlmCopyQuotaManager;
     private final RLMQuotaManager rlmFetchQuotaManager;
 
@@ -250,6 +255,13 @@ public class RemoteLogManager implements Closeable {
         remoteStorageReaderThreadPool.removeMetrics();
     }
 
+    /**
+     * Returns the timeout for the RLM Tasks to wait for the quota to be 
available
+     */
+    Duration quotaTimeout() {
+        return Duration.ofSeconds(1);
+    }
+
     RLMQuotaManager createRLMCopyQuotaManager() {
         return new RLMQuotaManager(copyQuotaManagerConfig(rlmConfig), metrics, 
QuotaType.RLMCopy$.MODULE$,
           "Tracking copy byte-rate for Remote Log Manager", time);
@@ -763,6 +775,23 @@ public class RemoteLogManager implements Closeable {
                                         isCancelled(), isLeader());
                                 return;
                             }
+
+                            copyQuotaManagerLock.lock();
+                            try {
+                                while (rlmCopyQuotaManager.isQuotaExceeded()) {
+                                    logger.debug("Quota exceeded for copying 
log segments, waiting for the quota to be available.");
+                                    // If the thread gets interrupted while 
waiting, the InterruptedException is thrown
+                                    // back to the caller. It's important to 
note that the task being executed is already
+                                    // cancelled before the executing thread 
is interrupted. The caller is responsible
+                                    // for handling the exception gracefully 
by checking if the task is already cancelled.
+                                    boolean ignored = 
copyQuotaManagerLockCondition.await(quotaTimeout().toMillis(), 
TimeUnit.MILLISECONDS);
+                                }
+                                
rlmCopyQuotaManager.record(candidateLogSegment.logSegment.log().sizeInBytes());
+                                // Signal waiting threads to check the quota 
again
+                                copyQuotaManagerLockCondition.signalAll();
+                            } finally {
+                                copyQuotaManagerLock.unlock();
+                            }
                             copyLogSegment(log, 
candidateLogSegment.logSegment, candidateLogSegment.nextSegmentOffset);
                         }
                     }
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java 
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 83fc5966b49..4c4976f060d 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -21,6 +21,7 @@ import com.yammer.metrics.core.MetricName;
 import kafka.cluster.EndPoint;
 import kafka.cluster.Partition;
 import kafka.log.UnifiedLog;
+import kafka.log.remote.quota.RLMQuotaManager;
 import kafka.log.remote.quota.RLMQuotaManagerConfig;
 import kafka.server.BrokerTopicStats;
 import kafka.server.KafkaConfig;
@@ -87,6 +88,7 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.InOrder;
 import org.mockito.MockedConstruction;
 import org.mockito.Mockito;
+import org.opentest4j.AssertionFailedError;
 import scala.Option;
 import scala.collection.JavaConverters;
 
@@ -101,6 +103,7 @@ import java.io.UncheckedIOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -146,6 +149,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
@@ -153,6 +157,7 @@ import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
@@ -187,6 +192,7 @@ public class RemoteLogManagerTest {
 
     private final RemoteStorageManager remoteStorageManager = 
mock(RemoteStorageManager.class);
     private final RemoteLogMetadataManager remoteLogMetadataManager = 
mock(RemoteLogMetadataManager.class);
+    private final RLMQuotaManager rlmCopyQuotaManager = 
mock(RLMQuotaManager.class);
     private RemoteLogManagerConfig remoteLogManagerConfig = null;
 
     private BrokerTopicStats brokerTopicStats = null;
@@ -230,6 +236,12 @@ public class RemoteLogManagerTest {
             public RemoteLogMetadataManager createRemoteLogMetadataManager() {
                 return remoteLogMetadataManager;
             }
+            public RLMQuotaManager createRLMCopyQuotaManager() {
+                return rlmCopyQuotaManager;
+            }
+            public Duration quotaTimeout() {
+                return Duration.ofMillis(100);
+            }
             @Override
             long findLogStartOffset(TopicIdPartition topicIdPartition, 
UnifiedLog log) {
                 return 0L;
@@ -2735,6 +2747,204 @@ public class RemoteLogManagerTest {
     }
 
 
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testCopyQuota(boolean quotaExceeded) throws Exception {
+        RemoteLogManager.RLMTask task = setupRLMTask(quotaExceeded);
+
+        if (quotaExceeded) {
+            // Verify that the copy operation times out, since no segments can 
be copied due to quota being exceeded
+            assertThrows(AssertionFailedError.class, () -> 
assertTimeoutPreemptively(Duration.ofMillis(200), () -> 
task.copyLogSegmentsToRemote(mockLog)));
+
+            // Verify the highest offset in remote storage is updated only once
+            ArgumentCaptor<Long> capture = ArgumentCaptor.forClass(Long.class);
+            verify(mockLog, 
times(1)).updateHighestOffsetInRemoteStorage(capture.capture());
+            // Verify the highest offset in remote storage was -1L before the 
copy started
+            assertEquals(-1L, capture.getValue());
+        } else {
+            // Verify the copy operation completes within the timeout, since 
it does not need to wait for quota availability
+            assertTimeoutPreemptively(Duration.ofMillis(100), () -> 
task.copyLogSegmentsToRemote(mockLog));
+
+            // Verify quota check was performed
+            verify(rlmCopyQuotaManager, times(1)).isQuotaExceeded();
+            // Verify bytes to copy was recorded with the quota manager
+            verify(rlmCopyQuotaManager, times(1)).record(10);
+
+            // Verify the highest offset in remote storage is updated
+            ArgumentCaptor<Long> capture = ArgumentCaptor.forClass(Long.class);
+            verify(mockLog, 
times(2)).updateHighestOffsetInRemoteStorage(capture.capture());
+            List<Long> capturedValues = capture.getAllValues();
+            // Verify the highest offset in remote storage was -1L before the 
copy
+            assertEquals(-1L, capturedValues.get(0).longValue());
+            // Verify it was updated to 149L after the copy
+            assertEquals(149L, capturedValues.get(1).longValue());
+        }
+    }
+
+    @Test
+    public void testRLMShutdownDuringQuotaExceededScenario() throws Exception {
+        remoteLogManager.startup();
+        setupRLMTask(true);
+        remoteLogManager.onLeadershipChange(
+            Collections.singleton(mockPartition(leaderTopicIdPartition)), 
Collections.emptySet(), topicIds);
+        // Ensure the copy operation is waiting for quota to be available
+        TestUtils.waitForCondition(() -> {
+            verify(rlmCopyQuotaManager, atLeast(1)).isQuotaExceeded();
+            return true;
+        }, "Quota exceeded check did not happen");
+        // Verify RLM is able to shut down
+        assertTimeoutPreemptively(Duration.ofMillis(100), () -> 
remoteLogManager.close());
+    }
+
+    // helper method to set up a RemoteLogManager.RLMTask for testing copy 
quota behaviour
+    private RemoteLogManager.RLMTask setupRLMTask(boolean quotaExceeded) 
throws RemoteStorageException, IOException {
+        long oldSegmentStartOffset = 0L;
+        long nextSegmentStartOffset = 150L;
+
+        
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+        // leader epoch preparation
+        checkpoint.write(totalEpochEntries);
+        LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, 
scheduler);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        when(mockLog.parentDir()).thenReturn("dir1");
+        
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+        // create 2 log segments, with 0 and 150 as log start offset
+        LogSegment oldSegment = mock(LogSegment.class);
+        LogSegment activeSegment = mock(LogSegment.class);
+
+        File tempFile = TestUtils.tempFile();
+        FileRecords fileRecords = mock(FileRecords.class);
+        when(fileRecords.file()).thenReturn(tempFile);
+        when(fileRecords.sizeInBytes()).thenReturn(10);
+
+        // Set up the segment that is eligible for copy
+        when(oldSegment.log()).thenReturn(fileRecords);
+        when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+        when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+        // set up the active segment
+        when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+        when(mockLog.activeSegment()).thenReturn(activeSegment);
+        when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+        when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
 activeSegment)));
+
+        File mockProducerSnapshotIndex = TestUtils.tempFile();
+        ProducerStateManager mockStateManager = 
mock(ProducerStateManager.class);
+        
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+
+        when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+        when(mockLog.lastStableOffset()).thenReturn(250L);
+
+        File tempDir = TestUtils.tempDirectory();
+        OffsetIndex idx = 
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, 
oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
+        TimeIndex timeIdx = 
LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, 
""), oldSegmentStartOffset, 1500).get();
+        File txnFile = UnifiedLog.transactionIndexFile(tempDir, 
oldSegmentStartOffset, "");
+        txnFile.createNewFile();
+        TransactionIndex txnIndex = new 
TransactionIndex(oldSegmentStartOffset, txnFile);
+        when(oldSegment.timeIndex()).thenReturn(timeIdx);
+        when(oldSegment.offsetIndex()).thenReturn(idx);
+        when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+        CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
+        dummyFuture.complete(null);
+        
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+        
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
+        
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
 any(LogSegmentData.class))).thenReturn(Optional.empty());
+
+        when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded);
+        doNothing().when(rlmCopyQuotaManager).record(anyInt());
+
+        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+        task.convertToLeader(2);
+        return task;
+    }
+
+    @Test
+    public void testCopyThrottling() throws Exception {
+        long oldestSegmentStartOffset = 0L;
+
+        
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+        // leader epoch preparation
+        checkpoint.write(totalEpochEntries);
+        LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, 
scheduler);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+        // create 3 log segments
+        LogSegment segmentToCopy = mock(LogSegment.class);
+        LogSegment segmentToThrottle = mock(LogSegment.class);
+        LogSegment activeSegment = mock(LogSegment.class);
+
+        File tempFile = TestUtils.tempFile();
+        FileRecords fileRecords = mock(FileRecords.class);
+        when(fileRecords.file()).thenReturn(tempFile);
+        when(fileRecords.sizeInBytes()).thenReturn(10);
+
+        // set up the segment that will be copied
+        when(segmentToCopy.log()).thenReturn(fileRecords);
+        when(segmentToCopy.baseOffset()).thenReturn(oldestSegmentStartOffset);
+        when(segmentToCopy.readNextOffset()).thenReturn(100L);
+
+        // set up the segment that will not be copied because of hitting quota
+        when(segmentToThrottle.log()).thenReturn(fileRecords);
+        when(segmentToThrottle.baseOffset()).thenReturn(100L);
+        when(segmentToThrottle.readNextOffset()).thenReturn(150L);
+
+        // set up the active segment
+        when(activeSegment.log()).thenReturn(fileRecords);
+        when(activeSegment.baseOffset()).thenReturn(150L);
+
+        when(mockLog.activeSegment()).thenReturn(activeSegment);
+        when(mockLog.logStartOffset()).thenReturn(oldestSegmentStartOffset);
+        when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segmentToCopy,
 segmentToThrottle, activeSegment)));
+
+        File mockProducerSnapshotIndex = TestUtils.tempFile();
+        ProducerStateManager mockStateManager = 
mock(ProducerStateManager.class);
+        
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+
+        when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+        when(mockLog.lastStableOffset()).thenReturn(250L);
+
+        File tempDir = TestUtils.tempDirectory();
+        OffsetIndex idx = 
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, 
oldestSegmentStartOffset, ""), oldestSegmentStartOffset, 1000).get();
+        TimeIndex timeIdx = 
LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldestSegmentStartOffset, 
""), oldestSegmentStartOffset, 1500).get();
+        File txnFile = UnifiedLog.transactionIndexFile(tempDir, 
oldestSegmentStartOffset, "");
+        txnFile.createNewFile();
+        TransactionIndex txnIndex = new 
TransactionIndex(oldestSegmentStartOffset, txnFile);
+        when(segmentToCopy.timeIndex()).thenReturn(timeIdx);
+        when(segmentToCopy.offsetIndex()).thenReturn(idx);
+        when(segmentToCopy.txnIndex()).thenReturn(txnIndex);
+
+        CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
+        dummyFuture.complete(null);
+        
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+        
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
+        
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
 any(LogSegmentData.class))).thenReturn(Optional.empty());
+
+        // After the first call, isQuotaExceeded should return true
+        when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(false, true);
+        doNothing().when(rlmCopyQuotaManager).record(anyInt());
+
+        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+        task.convertToLeader(2);
+
+        // Verify that the copy operation times out, since the second segment 
cannot be copied due to quota being exceeded
+        assertThrows(AssertionFailedError.class, () -> 
assertTimeoutPreemptively(Duration.ofMillis(200), () -> 
task.copyLogSegmentsToRemote(mockLog)));
+
+        // Verify the highest offset in remote storage is updated 
corresponding to the only segment that was copied
+        ArgumentCaptor<Long> capture = ArgumentCaptor.forClass(Long.class);
+        verify(mockLog, 
times(2)).updateHighestOffsetInRemoteStorage(capture.capture());
+        List<Long> capturedValues = capture.getAllValues();
+        // Verify the highest offset in remote storage was -1L before the copy
+        assertEquals(-1L, capturedValues.get(0).longValue());
+        // Verify it was updated to 99L after the copy
+        assertEquals(99L, capturedValues.get(1).longValue());
+    }
+
     private Partition mockPartition(TopicIdPartition topicIdPartition) {
         TopicPartition tp = topicIdPartition.topicPartition();
         Partition partition = mock(Partition.class);

Reply via email to