This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5074c8038e4 KAFKA-15260: RLM Task should handle uninitialized RLMM for 
the associated topic-parititon (#14113)
5074c8038e4 is described below

commit 5074c8038e44620b48d7700226810b983febd864
Author: Abhijeet Kumar <[email protected]>
AuthorDate: Mon Sep 4 09:13:04 2023 +0530

    KAFKA-15260: RLM Task should handle uninitialized RLMM for the associated 
topic-parititon (#14113)
    
    This change is about RLM task handling retriable exception when it tries to 
copy segments to remote but the RLMM is not yet initialized. On encountering 
the exception, we log the error and throw the exception back to the caller. We 
also make sure that the failure metrics are updated since this is a temporary 
error because RLMM is not yet initialized.
    
    Added unit tests to verify RLM task does not attempt to copy segments to 
remote on encountering the retriable exception and that failure metrics remain 
unchanged.
    
    Reviewers: Satish Duggana <[email protected]>, Luke Chen 
<[email protected]>, Kamal Chandraprakash<[email protected]>
---
 .../java/kafka/log/remote/RemoteLogManager.java    |  5 +-
 .../kafka/log/remote/RemoteLogManagerTest.java     | 54 ++++++++++++++++++++++
 2 files changed, 58 insertions(+), 1 deletion(-)

diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 8122d7992da..fdd16347a89 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.message.FetchResponseData;
 import org.apache.kafka.common.record.FileRecords;
 import org.apache.kafka.common.record.MemoryRecords;
@@ -671,7 +672,7 @@ public class RemoteLogManager implements Closeable {
                 
brokerTopicStats.topicStats(log.topicPartition().topic()).failedRemoteCopyRequestRate().mark();
                 
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().mark();
                 this.cancel();
-            } catch (InterruptedException ex) {
+            } catch (InterruptedException | RetriableException ex) {
                 throw ex;
             } catch (Exception ex) {
                 if (!isCancelled()) {
@@ -776,6 +777,8 @@ public class RemoteLogManager implements Closeable {
                 if (!isCancelled()) {
                     logger.warn("Current thread for topic-partition-id {} is 
interrupted. Reason: {}", topicIdPartition, ex.getMessage());
                 }
+            } catch (RetriableException ex) {
+                logger.debug("Encountered a retryable error while executing 
current task for topic-partition {}", topicIdPartition, ex);
             } catch (Exception ex) {
                 if (!isCancelled()) {
                     logger.warn("Current task for topic-partition {} received 
error but it will be scheduled. " +
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java 
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 588b31786c2..53456e58e70 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.errors.ReplicaNotAvailableException;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.FileRecords;
@@ -754,6 +755,59 @@ public class RemoteLogManagerTest {
         verify(mockLog, never()).updateHighestOffsetInRemoteStorage(anyLong());
     }
 
+    @Test
+    void 
testRLMTaskDoesNotUploadSegmentsWhenRemoteLogMetadataManagerIsNotInitialized() 
throws Exception {
+        long oldSegmentStartOffset = 0L;
+        long nextSegmentStartOffset = 150L;
+
+        
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+        // leader epoch preparation
+        checkpoint.write(totalEpochEntries);
+        LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+        // Throw a retryable exception so indicate that the remote log 
metadata manager is not initialized yet
+        
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt()))
+            .thenThrow(new ReplicaNotAvailableException("Remote log metadata 
cache is not initialized for partition: " + leaderTopicIdPartition));
+
+        // create 2 log segments, with 0 and 150 as log start offset
+        LogSegment oldSegment = mock(LogSegment.class);
+        LogSegment activeSegment = mock(LogSegment.class);
+
+        when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+        when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+        when(mockLog.activeSegment()).thenReturn(activeSegment);
+        when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+        when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
 activeSegment)));
+        when(mockLog.lastStableOffset()).thenReturn(250L);
+
+        // Ensure the metrics for remote write requests/failures is zero 
before attempt to copy log segment
+        assertEquals(0, 
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
+        assertEquals(0, 
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
+        // Ensure aggregate metrics
+        assertEquals(0, 
brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
+        assertEquals(0, 
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
+
+        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+        task.convertToLeader(0);
+        task.run();
+
+        // verify the remoteLogMetadataManager never add any metadata and 
remoteStorageManager never copy log segments
+        verify(remoteLogMetadataManager, 
never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class));
+        verify(remoteStorageManager, 
never()).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), 
any(LogSegmentData.class));
+        verify(remoteLogMetadataManager, 
never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
+        verify(mockLog, never()).updateHighestOffsetInRemoteStorage(anyLong());
+
+        // Verify the metric for remote write requests/failures was not 
updated.
+        assertEquals(0, 
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
+        assertEquals(0, 
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
+        // Verify aggregate metrics
+        assertEquals(0, 
brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
+        assertEquals(0, 
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
+    }
+
     private void verifyRemoteLogSegmentMetadata(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
                                                 long oldSegmentStartOffset,
                                                 long oldSegmentEndOffset,

Reply via email to