This is an automated email from the ASF dual-hosted git repository.
mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 239dce3e041 KAFKA-19291: Increase the timeout of remote storage share
fetch requests in purgatory (#19757)
239dce3e041 is described below
commit 239dce3e0415061d070b71874ece23ae5df14687
Author: Abhinav Dixit <[email protected]>
AuthorDate: Thu May 22 12:11:33 2025 +0530
KAFKA-19291: Increase the timeout of remote storage share fetch requests in
purgatory (#19757)
### About
Consumer groups have a different timeout `REMOTE_FETCH_MAX_WAIT_MS_PROP`
in delayed fetch purgatory for fetch requests having remote storage
fetch ([code
link](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1669)).
This is done before the request enters the purgatory, so its easy to
change. At the moment share groups can only have a `waitTimeMs` `of
shareFetch.fetchParams().maxWaitMs` (default value `500ms`) for delayed
share fetch purgatory regardless of whether they are remote
storage/local log fetch.
This PR introduces a way to increase the timeout of remote storage fetch
requests if a remote storage fetch request couldn't complete within
`shareFetch.fetchParams().maxWaitMs`, then we create a timer task which
can be interrupted whenever `pendingFetches` is finished. The change has
been done to avoid the expiration of remote storage share fetch
requests.
### Testing
The code has been tested with the help of unit tests and
`LocalTieredStorage.java`
Reviewers: Apoorv Mittal <[email protected]>
---
.../java/kafka/server/share/DelayedShareFetch.java | 77 ++++++++-
.../kafka/server/share/PendingRemoteFetches.java | 8 +
.../kafka/server/share/SharePartitionManager.java | 12 +-
.../src/main/scala/kafka/server/BrokerServer.scala | 1 +
.../kafka/server/share/DelayedShareFetchTest.java | 178 ++++++++++++++++++++-
.../server/share/SharePartitionManagerTest.java | 2 +
.../unit/kafka/server/ReplicaManagerTest.scala | 3 +-
7 files changed, 274 insertions(+), 7 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
index addfc691f1a..0a46e834eb7 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
@@ -40,6 +40,7 @@ import
org.apache.kafka.server.share.fetch.ShareFetchPartitionData;
import org.apache.kafka.server.share.metrics.ShareGroupMetrics;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchPartitionData;
+import org.apache.kafka.server.util.timer.TimerTask;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.LogOffsetSnapshot;
@@ -64,6 +65,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
@@ -107,6 +109,8 @@ public class DelayedShareFetch extends DelayedOperation {
private LinkedHashMap<TopicIdPartition, LogReadResult>
localPartitionsAlreadyFetched;
private Optional<PendingRemoteFetches> pendingRemoteFetchesOpt;
private Optional<Exception> remoteStorageFetchException;
+ private final AtomicBoolean outsidePurgatoryCallbackLock;
+ private final long remoteFetchMaxWaitMs;
/**
* This function constructs an instance of delayed share fetch operation
for completing share fetch
@@ -118,6 +122,7 @@ public class DelayedShareFetch extends DelayedOperation {
* @param sharePartitions The share partitions referenced in the share
fetch request.
* @param shareGroupMetrics The share group metrics to record the metrics.
* @param time The system time.
+ * @param remoteFetchMaxWaitMs The max wait time for a share fetch request
having remote storage fetch.
*/
public DelayedShareFetch(
ShareFetch shareFetch,
@@ -125,7 +130,8 @@ public class DelayedShareFetch extends DelayedOperation {
BiConsumer<SharePartitionKey, Throwable> exceptionHandler,
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions,
ShareGroupMetrics shareGroupMetrics,
- Time time
+ Time time,
+ long remoteFetchMaxWaitMs
) {
this(shareFetch,
replicaManager,
@@ -135,7 +141,8 @@ public class DelayedShareFetch extends DelayedOperation {
shareGroupMetrics,
time,
Optional.empty(),
- Uuid.randomUuid()
+ Uuid.randomUuid(),
+ remoteFetchMaxWaitMs
);
}
@@ -151,6 +158,7 @@ public class DelayedShareFetch extends DelayedOperation {
* @param shareGroupMetrics The share group metrics to record the metrics.
* @param time The system time.
* @param pendingRemoteFetchesOpt Optional containing an in-flight remote
fetch object or an empty optional.
+ * @param remoteFetchMaxWaitMs The max wait time for a share fetch request
having remote storage fetch.
*/
DelayedShareFetch(
ShareFetch shareFetch,
@@ -161,7 +169,8 @@ public class DelayedShareFetch extends DelayedOperation {
ShareGroupMetrics shareGroupMetrics,
Time time,
Optional<PendingRemoteFetches> pendingRemoteFetchesOpt,
- Uuid fetchId
+ Uuid fetchId,
+ long remoteFetchMaxWaitMs
) {
super(shareFetch.fetchParams().maxWaitMs, Optional.empty());
this.shareFetch = shareFetch;
@@ -177,6 +186,8 @@ public class DelayedShareFetch extends DelayedOperation {
this.pendingRemoteFetchesOpt = pendingRemoteFetchesOpt;
this.remoteStorageFetchException = Optional.empty();
this.fetchId = fetchId;
+ this.outsidePurgatoryCallbackLock = new AtomicBoolean(false);
+ this.remoteFetchMaxWaitMs = remoteFetchMaxWaitMs;
// Register metrics for DelayedShareFetch.
KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.server",
"DelayedShareFetchMetrics");
this.expiredRequestMeter = metricsGroup.newMeter(EXPIRES_PER_SEC,
"requests", TimeUnit.SECONDS);
@@ -205,6 +216,12 @@ public class DelayedShareFetch extends DelayedOperation {
if (remoteStorageFetchException.isPresent()) {
completeErroneousRemoteShareFetchRequest();
} else if (pendingRemoteFetchesOpt.isPresent()) {
+ if (maybeRegisterCallbackPendingRemoteFetch()) {
+ log.trace("Registered remote storage fetch callback for
group {}, member {}, "
+ + "topic partitions {}", shareFetch.groupId(),
shareFetch.memberId(),
+ partitionsAcquired.keySet());
+ return;
+ }
completeRemoteStorageShareFetchRequest();
} else {
completeLocalLogShareFetchRequest();
@@ -626,6 +643,16 @@ public class DelayedShareFetch extends DelayedOperation {
return pendingRemoteFetchesOpt.orElse(null);
}
+ // Visible for testing.
+ boolean outsidePurgatoryCallbackLock() {
+ return outsidePurgatoryCallbackLock.get();
+ }
+
+ // Only used for testing purpose.
+ void updatePartitionsAcquired(LinkedHashMap<TopicIdPartition, Long>
partitionsAcquired) {
+ this.partitionsAcquired = partitionsAcquired;
+ }
+
// Visible for testing.
Meter expiredRequestMeter() {
return expiredRequestMeter;
@@ -666,6 +693,28 @@ public class DelayedShareFetch extends DelayedOperation {
return maybeCompletePendingRemoteFetch();
}
+ private boolean maybeRegisterCallbackPendingRemoteFetch() {
+ log.trace("Registering callback pending remote fetch");
+ PendingRemoteFetches pendingFetch = pendingRemoteFetchesOpt.get();
+ if (!pendingFetch.isDone() && shareFetch.fetchParams().maxWaitMs <
remoteFetchMaxWaitMs) {
+ TimerTask timerTask = new PendingRemoteFetchTimerTask();
+ pendingFetch.invokeCallbackOnCompletion(((ignored, throwable) -> {
+ timerTask.cancel();
+ log.trace("Invoked remote storage fetch callback for group {},
member {}, "
+ + "topic partitions {}", shareFetch.groupId(),
shareFetch.memberId(),
+ partitionsAcquired.keySet());
+ if (throwable != null) {
+ log.error("Remote storage fetch failed for group {},
member {}, topic partitions {}",
+ shareFetch.groupId(), shareFetch.memberId(),
sharePartitions.keySet(), throwable);
+ }
+ completeRemoteShareFetchRequestOutsidePurgatory();
+ }));
+ replicaManager.addShareFetchTimerRequest(timerTask);
+ return true;
+ }
+ return false;
+ }
+
/**
* Throws an exception if a task for remote storage fetch could not be
scheduled successfully else updates pendingRemoteFetchesOpt.
* @param remoteStorageFetchInfoMap - The remote storage fetch information.
@@ -904,4 +953,26 @@ public class DelayedShareFetch extends DelayedOperation {
}
return completedByMe;
}
+
+ private void completeRemoteShareFetchRequestOutsidePurgatory() {
+ if (outsidePurgatoryCallbackLock.compareAndSet(false, true)) {
+ completeRemoteStorageShareFetchRequest();
+ }
+ }
+
+ private class PendingRemoteFetchTimerTask extends TimerTask {
+
+ public PendingRemoteFetchTimerTask() {
+ super(remoteFetchMaxWaitMs - shareFetch.fetchParams().maxWaitMs);
+ }
+
+ @Override
+ public void run() {
+ log.trace("Expired remote storage fetch callback for group {},
member {}, "
+ + "topic partitions {}", shareFetch.groupId(),
shareFetch.memberId(),
+ partitionsAcquired.keySet());
+ expiredRequestMeter.mark();
+ completeRemoteShareFetchRequestOutsidePurgatory();
+ }
+ }
}
diff --git a/core/src/main/java/kafka/server/share/PendingRemoteFetches.java
b/core/src/main/java/kafka/server/share/PendingRemoteFetches.java
index 2eb92672dc5..c3ac9c3b553 100644
--- a/core/src/main/java/kafka/server/share/PendingRemoteFetches.java
+++ b/core/src/main/java/kafka/server/share/PendingRemoteFetches.java
@@ -23,10 +23,12 @@ import
org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
+import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
/**
* This class is used to store the remote storage fetch information for topic
partitions in a share fetch request.
@@ -48,6 +50,12 @@ public class PendingRemoteFetches {
return true;
}
+ public void invokeCallbackOnCompletion(BiConsumer<Void, Throwable>
callback) {
+ List<CompletableFuture<RemoteLogReadResult>> remoteFetchResult = new
ArrayList<>();
+ remoteFetches.forEach(remoteFetch ->
remoteFetchResult.add(remoteFetch.remoteFetchResult()));
+ CompletableFuture.allOf(remoteFetchResult.toArray(new
CompletableFuture<?>[0])).whenComplete(callback);
+ }
+
public List<RemoteFetch> remoteFetches() {
return remoteFetches;
}
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index 1c7edd1b0af..5f0bf1fa239 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -130,6 +130,10 @@ public class SharePartitionManager implements
AutoCloseable {
* The max delivery count is the maximum number of times a message can be
delivered before it is considered to be archived.
*/
private final int maxDeliveryCount;
+ /**
+ * The max wait time for a share fetch request having remote storage fetch.
+ */
+ private final long remoteFetchMaxWaitMs;
/**
* The persister is used to persist the share partition state.
@@ -153,6 +157,7 @@ public class SharePartitionManager implements AutoCloseable
{
int defaultRecordLockDurationMs,
int maxDeliveryCount,
int maxInFlightMessages,
+ long remoteFetchMaxWaitMs,
Persister persister,
GroupConfigManager groupConfigManager,
BrokerTopicStats brokerTopicStats
@@ -164,6 +169,7 @@ public class SharePartitionManager implements AutoCloseable
{
defaultRecordLockDurationMs,
maxDeliveryCount,
maxInFlightMessages,
+ remoteFetchMaxWaitMs,
persister,
groupConfigManager,
new ShareGroupMetrics(time),
@@ -179,6 +185,7 @@ public class SharePartitionManager implements AutoCloseable
{
int defaultRecordLockDurationMs,
int maxDeliveryCount,
int maxInFlightMessages,
+ long remoteFetchMaxWaitMs,
Persister persister,
GroupConfigManager groupConfigManager,
ShareGroupMetrics shareGroupMetrics,
@@ -193,6 +200,7 @@ public class SharePartitionManager implements AutoCloseable
{
new SystemTimer("share-group-lock-timeout")),
maxDeliveryCount,
maxInFlightMessages,
+ remoteFetchMaxWaitMs,
persister,
groupConfigManager,
shareGroupMetrics,
@@ -210,6 +218,7 @@ public class SharePartitionManager implements AutoCloseable
{
Timer timer,
int maxDeliveryCount,
int maxInFlightMessages,
+ long remoteFetchMaxWaitMs,
Persister persister,
GroupConfigManager groupConfigManager,
ShareGroupMetrics shareGroupMetrics,
@@ -223,6 +232,7 @@ public class SharePartitionManager implements AutoCloseable
{
this.timer = timer;
this.maxDeliveryCount = maxDeliveryCount;
this.maxInFlightMessages = maxInFlightMessages;
+ this.remoteFetchMaxWaitMs = remoteFetchMaxWaitMs;
this.persister = persister;
this.groupConfigManager = groupConfigManager;
this.shareGroupMetrics = shareGroupMetrics;
@@ -683,7 +693,7 @@ public class SharePartitionManager implements AutoCloseable
{
// Add the share fetch to the delayed share fetch purgatory to process
the fetch request.
// The request will be added irrespective of whether the share
partition is initialized or not.
// Once the share partition is initialized, the delayed share fetch
will be completed.
- addDelayedShareFetch(new DelayedShareFetch(shareFetch, replicaManager,
fencedSharePartitionHandler(), sharePartitions, shareGroupMetrics, time),
delayedShareFetchWatchKeys);
+ addDelayedShareFetch(new DelayedShareFetch(shareFetch, replicaManager,
fencedSharePartitionHandler(), sharePartitions, shareGroupMetrics, time,
remoteFetchMaxWaitMs), delayedShareFetchWatchKeys);
}
private SharePartition getOrCreateSharePartition(SharePartitionKey
sharePartitionKey) {
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index b4be10656e2..61892ea66c3 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -441,6 +441,7 @@ class BrokerServer(
config.shareGroupConfig.shareGroupRecordLockDurationMs,
config.shareGroupConfig.shareGroupDeliveryCountLimit,
config.shareGroupConfig.shareGroupPartitionMaxRecordLocks,
+ config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong,
persister,
groupConfigManager,
brokerTopicStats
diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
index 1ec8cccffa9..498047b890a 100644
--- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
+++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
@@ -48,6 +48,7 @@ import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper;
import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.LogOffsetSnapshot;
@@ -81,6 +82,7 @@ import scala.jdk.javaapi.CollectionConverters;
import static kafka.server.share.PendingRemoteFetches.RemoteFetch;
import static
kafka.server.share.SharePartitionManagerTest.DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL;
+import static
kafka.server.share.SharePartitionManagerTest.REMOTE_FETCH_MAX_WAIT_MS;
import static kafka.server.share.SharePartitionManagerTest.buildLogReadResult;
import static
kafka.server.share.SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch;
import static
org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createShareAcquiredRecords;
@@ -1427,6 +1429,13 @@ public class DelayedShareFetchTest {
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
when(replicaManager.getPartitionOrException(tp2.topicPartition())).thenThrow(mock(KafkaStorageException.class));
+ // Mock the behaviour of replica manager such that remote storage
fetch completion timer task completes on adding it to the watch queue.
+ doAnswer(invocationOnMock -> {
+ TimerTask timerTask = invocationOnMock.getArgument(0);
+ timerTask.run();
+ return null;
+ }).when(replicaManager).addShareFetchTimerRequest(any());
+
Uuid fetchId = Uuid.randomUuid();
DelayedShareFetch delayedShareFetch =
spy(DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetch)
@@ -1777,6 +1786,165 @@ public class DelayedShareFetchTest {
delayedShareFetch.lock().unlock();
}
+ @Test
+ public void
testRemoteStorageFetchCompletionPostRegisteringCallbackByPendingFetchesCompletion()
{
+ ReplicaManager replicaManager = mock(ReplicaManager.class);
+ TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo", 0));
+ SharePartition sp0 = mock(SharePartition.class);
+
+ when(sp0.canAcquireRecords()).thenReturn(true);
+ when(sp0.nextFetchOffset()).thenReturn(10L);
+
+ LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new
LinkedHashMap<>();
+ sharePartitions.put(tp0, sp0);
+
+ CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+ ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp",
Uuid.randomUuid().toString(),
+ future, List.of(tp0), BATCH_SIZE, MAX_FETCH_RECORDS,
+ BROKER_TOPIC_STATS);
+
+ PendingRemoteFetches pendingRemoteFetches =
mock(PendingRemoteFetches.class);
+ Uuid fetchId = Uuid.randomUuid();
+ DelayedShareFetch delayedShareFetch =
spy(DelayedShareFetchBuilder.builder()
+ .withShareFetchData(shareFetch)
+ .withReplicaManager(replicaManager)
+ .withSharePartitions(sharePartitions)
+
.withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM))
+ .withPendingRemoteFetches(pendingRemoteFetches)
+ .withFetchId(fetchId)
+ .build());
+
+ LinkedHashMap<TopicIdPartition, Long> partitionsAcquired = new
LinkedHashMap<>();
+ partitionsAcquired.put(tp0, 10L);
+
+ // Manually update acquired partitions.
+ delayedShareFetch.updatePartitionsAcquired(partitionsAcquired);
+
+ // Mock remote fetch result.
+ RemoteFetch remoteFetch = mock(RemoteFetch.class);
+ when(remoteFetch.topicIdPartition()).thenReturn(tp0);
+
when(remoteFetch.remoteFetchResult()).thenReturn(CompletableFuture.completedFuture(
+ new RemoteLogReadResult(Optional.of(REMOTE_FETCH_INFO),
Optional.empty()))
+ );
+ when(remoteFetch.logReadResult()).thenReturn(new LogReadResult(
+ REMOTE_FETCH_INFO,
+ Option.empty(),
+ -1L,
+ -1L,
+ -1L,
+ -1L,
+ -1L,
+ Option.empty(),
+ Option.empty(),
+ Option.empty()
+ ));
+
when(pendingRemoteFetches.remoteFetches()).thenReturn(List.of(remoteFetch));
+ when(pendingRemoteFetches.isDone()).thenReturn(false);
+
+ // Make sure that the callback is called to complete remote storage
share fetch result.
+ doAnswer(invocationOnMock -> {
+ BiConsumer<Void, Throwable> callback =
invocationOnMock.getArgument(0);
+ callback.accept(mock(Void.class), null);
+ return null;
+ }).when(pendingRemoteFetches).invokeCallbackOnCompletion(any());
+
+ when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(
+ createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
+
+ assertFalse(delayedShareFetch.isCompleted());
+ delayedShareFetch.forceComplete();
+ assertTrue(delayedShareFetch.isCompleted());
+ // the future of shareFetch completes.
+ assertTrue(shareFetch.isCompleted());
+ assertEquals(Set.of(tp0), future.join().keySet());
+ // Verify the locks are released for tp0.
+ Mockito.verify(delayedShareFetch,
times(1)).releasePartitionLocks(Set.of(tp0));
+ assertTrue(delayedShareFetch.outsidePurgatoryCallbackLock());
+ assertTrue(delayedShareFetch.lock().tryLock());
+ delayedShareFetch.lock().unlock();
+ }
+
+ @Test
+ public void
testRemoteStorageFetchCompletionPostRegisteringCallbackByTimerTaskCompletion() {
+ ReplicaManager replicaManager = mock(ReplicaManager.class);
+ TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo", 0));
+ SharePartition sp0 = mock(SharePartition.class);
+
+ when(sp0.canAcquireRecords()).thenReturn(true);
+ when(sp0.nextFetchOffset()).thenReturn(10L);
+
+ LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new
LinkedHashMap<>();
+ sharePartitions.put(tp0, sp0);
+
+ CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+ ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp",
Uuid.randomUuid().toString(),
+ future, List.of(tp0), BATCH_SIZE, MAX_FETCH_RECORDS,
+ BROKER_TOPIC_STATS);
+
+ PendingRemoteFetches pendingRemoteFetches =
mock(PendingRemoteFetches.class);
+ Uuid fetchId = Uuid.randomUuid();
+ DelayedShareFetch delayedShareFetch =
spy(DelayedShareFetchBuilder.builder()
+ .withShareFetchData(shareFetch)
+ .withReplicaManager(replicaManager)
+ .withSharePartitions(sharePartitions)
+
.withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM))
+ .withPendingRemoteFetches(pendingRemoteFetches)
+ .withFetchId(fetchId)
+ .build());
+
+ LinkedHashMap<TopicIdPartition, Long> partitionsAcquired = new
LinkedHashMap<>();
+ partitionsAcquired.put(tp0, 10L);
+
+ // Manually update acquired partitions.
+ delayedShareFetch.updatePartitionsAcquired(partitionsAcquired);
+
+ // Mock remote fetch result.
+ RemoteFetch remoteFetch = mock(RemoteFetch.class);
+ when(remoteFetch.topicIdPartition()).thenReturn(tp0);
+
when(remoteFetch.remoteFetchResult()).thenReturn(CompletableFuture.completedFuture(
+ new RemoteLogReadResult(Optional.of(REMOTE_FETCH_INFO),
Optional.empty()))
+ );
+ when(remoteFetch.logReadResult()).thenReturn(new LogReadResult(
+ REMOTE_FETCH_INFO,
+ Option.empty(),
+ -1L,
+ -1L,
+ -1L,
+ -1L,
+ -1L,
+ Option.empty(),
+ Option.empty(),
+ Option.empty()
+ ));
+
when(pendingRemoteFetches.remoteFetches()).thenReturn(List.of(remoteFetch));
+ when(pendingRemoteFetches.isDone()).thenReturn(false);
+
+ // Make sure that the callback to complete remote storage share fetch
result is not called.
+ doAnswer(invocationOnMock ->
null).when(pendingRemoteFetches).invokeCallbackOnCompletion(any());
+
+ // Mock the behaviour of replica manager such that remote storage
fetch completion timer task completes on adding it to the watch queue.
+ doAnswer(invocationOnMock -> {
+ TimerTask timerTask = invocationOnMock.getArgument(0);
+ timerTask.run();
+ return null;
+ }).when(replicaManager).addShareFetchTimerRequest(any());
+
+ when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(
+ createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
+
+ assertFalse(delayedShareFetch.isCompleted());
+ delayedShareFetch.forceComplete();
+ assertTrue(delayedShareFetch.isCompleted());
+ // the future of shareFetch completes.
+ assertTrue(shareFetch.isCompleted());
+ assertEquals(Set.of(tp0), future.join().keySet());
+ // Verify the locks are released for tp0.
+ Mockito.verify(delayedShareFetch,
times(1)).releasePartitionLocks(Set.of(tp0));
+ assertTrue(delayedShareFetch.outsidePurgatoryCallbackLock());
+ assertTrue(delayedShareFetch.lock().tryLock());
+ delayedShareFetch.lock().unlock();
+ }
+
static void mockTopicIdPartitionToReturnDataEqualToMinBytes(ReplicaManager
replicaManager, TopicIdPartition topicIdPartition, int minBytes) {
LogOffsetMetadata hwmOffsetMetadata = new LogOffsetMetadata(1, 1,
minBytes);
LogOffsetSnapshot endOffsetSnapshot = new LogOffsetSnapshot(1,
mock(LogOffsetMetadata.class),
@@ -1847,7 +2015,7 @@ public class DelayedShareFetchTest {
private LinkedHashMap<TopicIdPartition, SharePartition>
sharePartitions = mock(LinkedHashMap.class);
private PartitionMaxBytesStrategy partitionMaxBytesStrategy =
mock(PartitionMaxBytesStrategy.class);
private Time time = new MockTime();
- private final Optional<PendingRemoteFetches> pendingRemoteFetches =
Optional.empty();
+ private Optional<PendingRemoteFetches> pendingRemoteFetches =
Optional.empty();
private ShareGroupMetrics shareGroupMetrics =
mock(ShareGroupMetrics.class);
private Uuid fetchId = Uuid.randomUuid();
@@ -1886,6 +2054,11 @@ public class DelayedShareFetchTest {
return this;
}
+ private DelayedShareFetchBuilder
withPendingRemoteFetches(PendingRemoteFetches pendingRemoteFetches) {
+ this.pendingRemoteFetches = Optional.of(pendingRemoteFetches);
+ return this;
+ }
+
private DelayedShareFetchBuilder withFetchId(Uuid fetchId) {
this.fetchId = fetchId;
return this;
@@ -1905,7 +2078,8 @@ public class DelayedShareFetchTest {
shareGroupMetrics,
time,
pendingRemoteFetches,
- fetchId);
+ fetchId,
+ REMOTE_FETCH_MAX_WAIT_MS);
}
}
}
diff --git
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 0f821d4423a..d4c68fe555b 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -158,6 +158,7 @@ public class SharePartitionManagerTest {
private static final String CONNECTION_ID = "id-1";
static final int DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL = 1000;
+ static final long REMOTE_FETCH_MAX_WAIT_MS = 6000L;
private MockTime time;
private ReplicaManager mockReplicaManager;
@@ -3242,6 +3243,7 @@ public class SharePartitionManagerTest {
timer,
MAX_DELIVERY_COUNT,
MAX_IN_FLIGHT_MESSAGES,
+ REMOTE_FETCH_MAX_WAIT_MS,
persister,
mock(GroupConfigManager.class),
shareGroupMetrics,
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index d33a68a8348..f9782e713ee 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -6123,7 +6123,8 @@ class ReplicaManagerTest {
mock(classOf[BiConsumer[SharePartitionKey, Throwable]]),
sharePartitions,
mock(classOf[ShareGroupMetrics]),
- time))
+ time,
+ 500))
val delayedShareFetchWatchKeys : util.List[DelayedShareFetchKey] = new
util.ArrayList[DelayedShareFetchKey]
topicPartitions.forEach((topicIdPartition: TopicIdPartition) =>
delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId,
topicIdPartition.topicId, topicIdPartition.partition)))