This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new c37ac21cc9a KAFKA-15931: Cancel RemoteLogReader gracefully (#19150)
c37ac21cc9a is described below
commit c37ac21cc9ad11e0bec301ae9c84a35edd5392c3
Author: Jorge Esteban Quilcate Otoya <[email protected]>
AuthorDate: Wed Apr 2 00:10:12 2025 +0300
KAFKA-15931: Cancel RemoteLogReader gracefully (#19150)
Backports f24945b519005c0bc7a28db2db7aae6cec158927 to 3.9
Instead of reopening the transaction index, it cancels the RemoteFetchTask
without interrupting it--avoiding to close the TransactionIndex channel.
This will lead to complete the execution of the remote fetch but ignoring
the results. Given that this is considered a rare case, we could live with
this. If it becomes a performance issue, it could be optimized.
Reviewers: Jun Rao <[email protected]>
---
core/src/main/scala/kafka/server/DelayedRemoteFetch.scala | 5 +++--
.../test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala | 2 +-
2 files changed, 4 insertions(+), 3 deletions(-)
diff --git a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
index 58a866aa4a6..f9776fb287d 100644
--- a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
@@ -84,8 +84,9 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void],
}
override def onExpiration(): Unit = {
- // cancel the remote storage read task, if it has not been executed yet
- val cancelled = remoteFetchTask.cancel(true)
+ // cancel the remote storage read task, if it has not been executed yet and
+ // avoid interrupting the task if it is already running as it may force
closing opened/cached resources as transaction index.
+ val cancelled = remoteFetchTask.cancel(false)
if (!cancelled) debug(s"Remote fetch task for RemoteStorageFetchInfo:
$remoteFetchInfo could not be cancelled and its isDone value is
${remoteFetchTask.isDone}")
DelayedRemoteFetchMetrics.expiredRequestMeter.mark()
diff --git
a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
index ea1ffaf0b11..ce758992fea 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
@@ -199,7 +199,7 @@ class DelayedRemoteFetchTest {
delayedRemoteFetch.run()
// Check that the task was cancelled and force-completed
- verify(remoteFetchTask).cancel(true)
+ verify(remoteFetchTask).cancel(false)
assertTrue(delayedRemoteFetch.isCompleted)
// Check that the ExpiresPerSec metric was incremented