kamalcph commented on code in PR #14113:
URL: https://github.com/apache/kafka/pull/14113#discussion_r1312514688
##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -754,6 +755,61 @@ void
testCopyLogSegmentsToRemoteShouldNotCopySegmentForFollower() throws Excepti
verify(mockLog, never()).updateHighestOffsetInRemoteStorage(anyLong());
}
+ @Test
+ void
testRLMTaskDoesNotUploadSegmentsWhenRemoteLogMetadataManagerIsNotInitialized()
throws Exception {
+ long oldSegmentStartOffset = 0L;
+ long nextSegmentStartOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+ // leader epoch preparation
+ checkpoint.write(totalEpochEntries);
+ LeaderEpochFileCache cache = new
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+ // Throw a retryable exception so indicate that the remote log
metadata manager is not initialized yet
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
anyInt()))
+ .thenThrow(new ReplicaNotAvailableException("Remote log metadata
cache is not initialized for partition: " + leaderTopicIdPartition));
+
+ // create 2 log segments, with 0 and 150 as log start offset
+ LogSegment oldSegment = mock(LogSegment.class);
+ LogSegment activeSegment = mock(LogSegment.class);
+
+ when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+ when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+ when(mockLog.activeSegment()).thenReturn(activeSegment);
+ when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+ when(mockLog.logSegments(anyLong(),
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
activeSegment)));
+ when(mockLog.lastStableOffset()).thenReturn(250L);
+
+ // Ensure the metrics for remote write requests/failures is zero
before attempt to copy log segment
+ assertEquals(0,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
+ assertEquals(0,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
+ // Ensure aggregate metrics
+ assertEquals(0,
brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
+ assertEquals(0,
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
+
+ RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
+ task.convertToLeader(0);
+
+ RemoteLogManager.RLMTask spyTask = spy(task);
Review Comment:
Why the RLMTask is wrapped with `spy`? The test doesn't use the `spy` to
verify/mock any behaviour.
##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -754,6 +755,61 @@ void
testCopyLogSegmentsToRemoteShouldNotCopySegmentForFollower() throws Excepti
verify(mockLog, never()).updateHighestOffsetInRemoteStorage(anyLong());
}
+ @Test
+ void
testRLMTaskDoesNotUploadSegmentsWhenRemoteLogMetadataManagerIsNotInitialized()
throws Exception {
+ long oldSegmentStartOffset = 0L;
+ long nextSegmentStartOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+ // leader epoch preparation
+ checkpoint.write(totalEpochEntries);
+ LeaderEpochFileCache cache = new
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+ // Throw a retryable exception so indicate that the remote log
metadata manager is not initialized yet
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
anyInt()))
+ .thenThrow(new ReplicaNotAvailableException("Remote log metadata
cache is not initialized for partition: " + leaderTopicIdPartition));
Review Comment:
can we have another invocation where it returns the value instead of
exception? (eg) once the partition is initialized, the copy segments should
work as expected:
```java
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
anyInt()))
.thenThrow(new ReplicaNotAvailableException("Remote log metadata
cache is not initialized for partition: " + leaderTopicIdPartition))
.thenReturn(-1);
```
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -776,6 +780,9 @@ public void run() {
if (!isCancelled()) {
logger.warn("Current thread for topic-partition-id {} is
interrupted. Reason: {}", topicIdPartition, ex.getMessage());
}
+ } catch (RetriableException ex) {
+ logger.debug("Encountered a retryable error while executing
current task for topic-partition {}. " +
Review Comment:
Why are we logging it twice in L678 and L784?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]