This is an automated email from the ASF dual-hosted git repository.
kamalcph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new da61b19aae8 KAFKA-19981: Handle retriable remote storage exception in
RemoteLogManager (#21150)
da61b19aae8 is described below
commit da61b19aae888005f113a943486c5e22383fcc84
Author: Lan Ding <[email protected]>
AuthorDate: Thu Dec 18 12:19:43 2025 +0800
KAFKA-19981: Handle retriable remote storage exception in RemoteLogManager
(#21150)
This PR distinguishes between `RemoteStorageException` and
`RetriableRemoteStorageException` in `RemoteLogManager` to handle
temporary storage degradations gracefully:
1. Copy path: Avoids deleting partially uploaded segments when
`RetriableRemoteStorageException` is thrown;
2. Delete path: Skips incrementing failedRemoteDeleteRequestRate metric
for retriable exceptions;
3. Documentation: Updates `RemoteStorageManager` Javadoc to clarify
exception usage;
4. Testing: Adds UT for retriable scenarios.
Reviewers: Kamal Chandraprakash <[email protected]>, Luke
Chen <[email protected]>
---
.../log/remote/storage/RemoteStorageManager.java | 12 +-
.../log/remote/storage/RemoteLogManager.java | 13 +-
.../log/remote/storage/RemoteLogManagerTest.java | 143 ++++++++++++++++++++-
3 files changed, 160 insertions(+), 8 deletions(-)
diff --git
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
index 3fd6a633b7d..e17e3af596d 100644
---
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
+++
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
@@ -43,6 +43,11 @@ import java.util.Optional;
* Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the
manager to register metrics.
* The following tags are automatically added to all metrics registered:
<code>config</code> set to
* <code>remote.log.storage.manager.class.name</code>, and <code>class</code>
set to the RemoteStorageManager class name.
+ * <p>
+ * Plugin implementors of {@link RemoteStorageManager} should throw {@link
RetriableRemoteStorageException}
+ * for transient errors that can be recovered by retrying. For non-recoverable
errors,
+ * {@link RemoteStorageException} should be thrown. This distinction allows
RemoteLogManager to
+ * handle retries gracefully and report metrics accurately.
*/
public interface RemoteStorageManager extends Configurable, Closeable {
@@ -90,11 +95,11 @@ public interface RemoteStorageManager extends Configurable,
Closeable {
* @param remoteLogSegmentMetadata metadata about the remote log segment.
* @param logSegmentData data to be copied to tiered storage.
* @return custom metadata to be added to the segment metadata after
copying.
- * @throws RemoteStorageException if there are any errors in storing the
data of the segment.
+ * @throws RemoteStorageException if there are any errors in
storing the data of the segment.
+ * @throws RetriableRemoteStorageException if the error is transient and
the operation can be retried.
*/
Optional<CustomMetadata> copyLogSegmentData(RemoteLogSegmentMetadata
remoteLogSegmentMetadata,
- LogSegmentData logSegmentData)
- throws RemoteStorageException;
+ LogSegmentData logSegmentData)
throws RemoteStorageException;
/**
* Returns the remote log segment data file/object as InputStream for the
given {@link RemoteLogSegmentMetadata}
@@ -150,6 +155,7 @@ public interface RemoteStorageManager extends Configurable,
Closeable {
*
* @param remoteLogSegmentMetadata metadata about the remote log segment
to be deleted.
* @throws RemoteStorageException if there are any storage
related errors occurred.
+ * @throws RetriableRemoteStorageException if the error is transient and
the operation can be retried.
*/
void deleteLogSegmentData(RemoteLogSegmentMetadata
remoteLogSegmentMetadata) throws RemoteStorageException;
}
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
index a9848633eff..604a43925bf 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
@@ -836,7 +836,7 @@ public class RemoteLogManager implements Closeable,
AsyncOffsetReader {
if (!isCancelled()) {
logger.warn("Current thread for partition {} is
interrupted", topicIdPartition, ex);
}
- } catch (RetriableException ex) {
+ } catch (RetriableException | RetriableRemoteStorageException ex) {
logger.debug("Encountered a retryable error while executing
current task for partition {}", topicIdPartition, ex);
} catch (Exception ex) {
if (!isCancelled()) {
@@ -869,7 +869,7 @@ public class RemoteLogManager implements Closeable,
AsyncOffsetReader {
}
@Override
- protected void execute(UnifiedLog log) throws InterruptedException {
+ protected void execute(UnifiedLog log) throws InterruptedException,
RetriableRemoteStorageException {
// In the first run after completing altering logDir within
broker, we should make sure the state is reset. (KAFKA-16711)
if (!log.parentDir().equals(logDirectory.orElse(null))) {
copiedOffsetOption = Optional.empty();
@@ -928,7 +928,7 @@ public class RemoteLogManager implements Closeable,
AsyncOffsetReader {
return candidateLogSegments;
}
- public void copyLogSegmentsToRemote(UnifiedLog log) throws
InterruptedException {
+ public void copyLogSegmentsToRemote(UnifiedLog log) throws
InterruptedException, RetriableRemoteStorageException {
if (isCancelled())
return;
@@ -1001,7 +1001,7 @@ public class RemoteLogManager implements Closeable,
AsyncOffsetReader {
brokerTopicStats.topicStats(log.topicPartition().topic()).failedRemoteCopyRequestRate().mark();
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().mark();
this.cancel();
- } catch (InterruptedException | RetriableException ex) {
+ } catch (InterruptedException | RetriableException |
RetriableRemoteStorageException ex) {
throw ex;
} catch (Exception ex) {
if (!isCancelled()) {
@@ -1044,6 +1044,9 @@ public class RemoteLogManager implements Closeable,
AsyncOffsetReader {
try {
customMetadata =
remoteStorageManagerPlugin.get().copyLogSegmentData(copySegmentStartedRlsm,
segmentData);
+ } catch (RetriableRemoteStorageException e) {
+ logger.info("Copy failed with retriable error for segment {}",
copySegmentStartedRlsm.remoteLogSegmentId());
+ throw e;
} catch (RemoteStorageException e) {
logger.info("Copy failed, cleaning segment {}",
copySegmentStartedRlsm.remoteLogSegmentId());
try {
@@ -1513,6 +1516,8 @@ public class RemoteLogManager implements Closeable,
AsyncOffsetReader {
// Delete the segment in remote storage.
try {
remoteStorageManagerPlugin.get().deleteLogSegmentData(segmentMetadata);
+ } catch (RetriableRemoteStorageException e) {
+ throw e;
} catch (RemoteStorageException e) {
brokerTopicStats.topicStats(topic).failedRemoteDeleteRequestRate().mark();
brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().mark();
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
index 5b0676088e1..bcb0b91b0f8 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
@@ -774,6 +774,92 @@ public class RemoteLogManagerTest {
assertEquals(1,
brokerTopicStats.allTopicsStats().remoteCopyLagSegmentsAggrMetric().value());
}
+ @Test
+ void
testFailedCopyWithRetriableExceptionShouldNotDeleteTheDanglingSegment() throws
Exception {
+ long oldSegmentStartOffset = 0L;
+ long nextSegmentStartOffset = 150L;
+ long lastStableOffset = 150L;
+ long logEndOffset = 150L;
+
+ when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(12L);
+ when(mockLog.onlyLocalLogSegmentsCount()).thenReturn(2L);
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+ // leader epoch preparation
+ checkpoint.write(totalEpochEntries);
+ LeaderEpochFileCache cache = new
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint,
scheduler);
+ when(mockLog.leaderEpochCache()).thenReturn(cache);
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
anyInt())).thenReturn(Optional.of(-1L));
+
+ File tempFile = TestUtils.tempFile();
+ File mockProducerSnapshotIndex = TestUtils.tempFile();
+ File tempDir = TestUtils.tempDirectory();
+ // create 2 log segments, with 0 and 150 as log start offset
+ LogSegment oldSegment = mock(LogSegment.class);
+ LogSegment activeSegment = mock(LogSegment.class);
+
+ when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+ when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+ when(activeSegment.size()).thenReturn(2);
+ verify(oldSegment, times(0)).readNextOffset();
+ verify(activeSegment, times(0)).readNextOffset();
+
+ FileRecords fileRecords = mock(FileRecords.class);
+ when(oldSegment.log()).thenReturn(fileRecords);
+ when(fileRecords.file()).thenReturn(tempFile);
+ when(fileRecords.sizeInBytes()).thenReturn(10);
+ when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+ when(mockLog.activeSegment()).thenReturn(activeSegment);
+ when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+ when(mockLog.logSegments(anyLong(),
anyLong())).thenReturn(List.of(oldSegment, activeSegment));
+
+ ProducerStateManager mockStateManager =
mock(ProducerStateManager.class);
+ when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+ when(mockLog.lastStableOffset()).thenReturn(lastStableOffset);
+ when(mockLog.logEndOffset()).thenReturn(logEndOffset);
+
+ OffsetIndex idx =
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir,
oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
+ TimeIndex timeIdx =
LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset,
""), oldSegmentStartOffset, 1500).get();
+ File txnFile = UnifiedLog.transactionIndexFile(tempDir,
oldSegmentStartOffset, "");
+ txnFile.createNewFile();
+ TransactionIndex txnIndex = new
TransactionIndex(oldSegmentStartOffset, txnFile);
+ when(oldSegment.timeIndex()).thenReturn(timeIdx);
+ when(oldSegment.offsetIndex()).thenReturn(idx);
+ when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+ CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
+ dummyFuture.complete(null);
+
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+
when(rlmCopyQuotaManager.getThrottleTimeMs()).thenReturn(quotaAvailableThrottleTime);
+
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
+
+ // throw retriable exception when copyLogSegmentData
+
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
any(LogSegmentData.class)))
+ .thenThrow(new RetriableRemoteStorageException("test-retriable"));
+ RemoteLogManager.RLMCopyTask task = remoteLogManager.new
RLMCopyTask(leaderTopicIdPartition, 128);
+ assertThrows(RetriableRemoteStorageException.class, () ->
task.copyLogSegmentsToRemote(mockLog));
+
+ ArgumentCaptor<RemoteLogSegmentMetadata> remoteLogSegmentMetadataArg =
ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
+
verify(remoteLogMetadataManager).addRemoteLogSegmentMetadata(remoteLogSegmentMetadataArg.capture());
+ // verify the segment is not deleted for retriable exception
+ verify(remoteStorageManager,
never()).deleteLogSegmentData(eq(remoteLogSegmentMetadataArg.getValue()));
+ verify(remoteLogMetadataManager,
never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
+
+ // Verify the metrics
+ // Retriable exceptions should not count as failures for copy
+ assertEquals(1,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
+ assertEquals(0,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyBytesRate().count());
+ assertEquals(0,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
+ // Verify aggregate metrics
+ assertEquals(1,
brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
+ assertEquals(0,
brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count());
+ assertEquals(0,
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
+ assertEquals(10,
brokerTopicStats.allTopicsStats().remoteCopyLagBytesAggrMetric().value());
+ assertEquals(1,
brokerTopicStats.allTopicsStats().remoteCopyLagSegmentsAggrMetric().value());
+ }
+
@Test
void testRemoteLogManagerTasksAvgIdlePercentAndMetadataCountMetrics()
throws Exception {
long oldSegmentStartOffset = 0L;
@@ -2401,7 +2487,7 @@ public class RemoteLogManagerTest {
Thread copyThread = new Thread(() -> {
try {
copyTask.copyLogSegmentsToRemote(mockLog);
- } catch (InterruptedException e) {
+ } catch (InterruptedException | RetriableRemoteStorageException e)
{
throw new RuntimeException(e);
}
});
@@ -2840,6 +2926,61 @@ public class RemoteLogManagerTest {
verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(0));
}
+ @ParameterizedTest(name =
"testDeleteSegmentFailureWithRetriableExceptionShouldNotUpdateMetrics
retentionSize={0} retentionMs={1}")
+ @CsvSource(value = {"0, -1", "-1, 0"})
+ public void
testDeleteSegmentFailureWithRetriableExceptionShouldNotUpdateMetrics(long
retentionSize,
+
long retentionMs) throws RemoteStorageException, ExecutionException,
InterruptedException {
+ Map<String, Long> logProps = new HashMap<>();
+ logProps.put("retention.bytes", retentionSize);
+ logProps.put("retention.ms", retentionMs);
+ LogConfig mockLogConfig = new LogConfig(logProps);
+ when(mockLog.config()).thenReturn(mockLogConfig);
+
+ List<EpochEntry> epochEntries = List.of(epochEntry0);
+ checkpoint.write(epochEntries);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint,
scheduler);
+ when(mockLog.leaderEpochCache()).thenReturn(cache);
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+ when(mockLog.logEndOffset()).thenReturn(200L);
+
+ List<RemoteLogSegmentMetadata> metadataList =
+ listRemoteLogSegmentMetadata(leaderTopicIdPartition, 1, 100, 1024,
RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+ .thenReturn(metadataList.iterator());
+
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
+ .thenAnswer(ans -> metadataList.iterator());
+
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
+ .thenReturn(CompletableFuture.runAsync(() -> { }));
+
+ // Verify the metrics for remote deletes and for failures is zero
before attempt to delete segments
+ assertEquals(0,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteDeleteRequestRate().count());
+ assertEquals(0,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteDeleteRequestRate().count());
+ // Verify aggregate metrics
+ assertEquals(0,
brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().count());
+ assertEquals(0,
brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
+
+ RemoteLogManager.RLMExpirationTask task = remoteLogManager.new
RLMExpirationTask(leaderTopicIdPartition);
+ doThrow(new RetriableRemoteStorageException("Failed to delete segment
with retriable
exception")).when(remoteStorageManager).deleteLogSegmentData(any());
+ assertThrows(RetriableRemoteStorageException.class,
task::cleanupExpiredRemoteLogSegments);
+
+ assertEquals(100L, currentLogStartOffset.get());
+ verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(0));
+
+ // Verify the metric for remote delete is updated correctly
+ assertEquals(1,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteDeleteRequestRate().count());
+ // Verify we did not report failure for remote deletes with retriable
exception
+ assertEquals(0,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteDeleteRequestRate().count());
+ // Verify aggregate metrics
+ assertEquals(1,
brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().count());
+ assertEquals(0,
brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
+
+ // make sure we'll retry the deletion in next run
+ doNothing().when(remoteStorageManager).deleteLogSegmentData(any());
+ task.cleanupExpiredRemoteLogSegments();
+ verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(0));
+ }
+
@ParameterizedTest(name = "testDeleteLogSegmentDueToRetentionSizeBreach
segmentCount={0} deletableSegmentCount={1}")
@CsvSource(value = {"50, 0", "50, 1", "50, 23", "50, 50"})
public void testDeleteLogSegmentDueToRetentionSizeBreach(int segmentCount,