This is an automated email from the ASF dual-hosted git repository.
satishd pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.6 by this push:
new 6d3aa70b269 KAFKA-15260: RLM Task should handle uninitialized RLMM for
the associated topic-parititon (#14113)
6d3aa70b269 is described below
commit 6d3aa70b26917ccf14d6ce77bda3c6ea14ca6374
Author: Abhijeet Kumar <[email protected]>
AuthorDate: Mon Sep 4 09:13:04 2023 +0530
KAFKA-15260: RLM Task should handle uninitialized RLMM for the associated
topic-parititon (#14113)
This change is about RLM task handling retriable exception when it tries to
copy segments to remote but the RLMM is not yet initialized. On encountering
the exception, we log the error and throw the exception back to the caller. We
also make sure that the failure metrics are updated since this is a temporary
error because RLMM is not yet initialized.
Added unit tests to verify RLM task does not attempt to copy segments to
remote on encountering the retriable exception and that failure metrics remain
unchanged.
Reviewers: Satish Duggana <[email protected]>, Luke Chen
<[email protected]>, Kamal Chandraprakash<[email protected]>
---
.../java/kafka/log/remote/RemoteLogManager.java | 5 +-
.../kafka/log/remote/RemoteLogManagerTest.java | 54 ++++++++++++++++++++++
2 files changed, 58 insertions(+), 1 deletion(-)
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 8122d7992da..fdd16347a89 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
@@ -671,7 +672,7 @@ public class RemoteLogManager implements Closeable {
brokerTopicStats.topicStats(log.topicPartition().topic()).failedRemoteCopyRequestRate().mark();
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().mark();
this.cancel();
- } catch (InterruptedException ex) {
+ } catch (InterruptedException | RetriableException ex) {
throw ex;
} catch (Exception ex) {
if (!isCancelled()) {
@@ -776,6 +777,8 @@ public class RemoteLogManager implements Closeable {
if (!isCancelled()) {
logger.warn("Current thread for topic-partition-id {} is
interrupted. Reason: {}", topicIdPartition, ex.getMessage());
}
+ } catch (RetriableException ex) {
+ logger.debug("Encountered a retryable error while executing
current task for topic-partition {}", topicIdPartition, ex);
} catch (Exception ex) {
if (!isCancelled()) {
logger.warn("Current task for topic-partition {} received
error but it will be scheduled. " +
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 588b31786c2..53456e58e70 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.errors.ReplicaNotAvailableException;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.FileRecords;
@@ -754,6 +755,59 @@ public class RemoteLogManagerTest {
verify(mockLog, never()).updateHighestOffsetInRemoteStorage(anyLong());
}
+ @Test
+ void
testRLMTaskDoesNotUploadSegmentsWhenRemoteLogMetadataManagerIsNotInitialized()
throws Exception {
+ long oldSegmentStartOffset = 0L;
+ long nextSegmentStartOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+ // leader epoch preparation
+ checkpoint.write(totalEpochEntries);
+ LeaderEpochFileCache cache = new
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+ // Throw a retryable exception so indicate that the remote log
metadata manager is not initialized yet
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
anyInt()))
+ .thenThrow(new ReplicaNotAvailableException("Remote log metadata
cache is not initialized for partition: " + leaderTopicIdPartition));
+
+ // create 2 log segments, with 0 and 150 as log start offset
+ LogSegment oldSegment = mock(LogSegment.class);
+ LogSegment activeSegment = mock(LogSegment.class);
+
+ when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+ when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+ when(mockLog.activeSegment()).thenReturn(activeSegment);
+ when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+ when(mockLog.logSegments(anyLong(),
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
activeSegment)));
+ when(mockLog.lastStableOffset()).thenReturn(250L);
+
+ // Ensure the metrics for remote write requests/failures is zero
before attempt to copy log segment
+ assertEquals(0,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
+ assertEquals(0,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
+ // Ensure aggregate metrics
+ assertEquals(0,
brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
+ assertEquals(0,
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
+
+ RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
+ task.convertToLeader(0);
+ task.run();
+
+ // verify the remoteLogMetadataManager never add any metadata and
remoteStorageManager never copy log segments
+ verify(remoteLogMetadataManager,
never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class));
+ verify(remoteStorageManager,
never()).copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
any(LogSegmentData.class));
+ verify(remoteLogMetadataManager,
never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
+ verify(mockLog, never()).updateHighestOffsetInRemoteStorage(anyLong());
+
+ // Verify the metric for remote write requests/failures was not
updated.
+ assertEquals(0,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
+ assertEquals(0,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
+ // Verify aggregate metrics
+ assertEquals(0,
brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
+ assertEquals(0,
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
+ }
+
private void verifyRemoteLogSegmentMetadata(RemoteLogSegmentMetadata
remoteLogSegmentMetadata,
long oldSegmentStartOffset,
long oldSegmentEndOffset,