This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 617c96cea49 KAFKA-15931: Cancel RemoteLogReader gracefully (#19331)
617c96cea49 is described below

commit 617c96cea49ca9dda52ccc75ab2b6df56b24294c
Author: Jorge Esteban Quilcate Otoya <[email protected]>
AuthorDate: Wed Apr 2 02:22:53 2025 +0300

    KAFKA-15931: Cancel RemoteLogReader gracefully (#19331)
    
    Backports f24945b519005c0bc7a28db2db7aae6cec158927 to 4.0
    
    Instead of reopening the transaction index, it cancels the RemoteFetchTask 
without interrupting it--avoiding to close the TransactionIndex channel.
    
    This will lead to complete the execution of the remote fetch but ignoring 
the results. Given that this is considered a rare case, we could live with 
this. If it becomes a performance issue, it could be optimized.
    
    Reviewers: Jun Rao <[email protected]>
---
 core/src/main/scala/kafka/server/DelayedRemoteFetch.scala            | 5 +++--
 .../test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala | 2 +-
 2 files changed, 4 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala 
b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
index 45bfe69844a..e6bdce63e68 100644
--- a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
@@ -87,8 +87,9 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void],
   }
 
   override def onExpiration(): Unit = {
-    // cancel the remote storage read task, if it has not been executed yet
-    val cancelled = remoteFetchTask.cancel(true)
+    // cancel the remote storage read task, if it has not been executed yet and
+    // avoid interrupting the task if it is already running as it may force 
closing opened/cached resources as transaction index.
+    val cancelled = remoteFetchTask.cancel(false)
     if (!cancelled) debug(s"Remote fetch task for RemoteStorageFetchInfo: 
$remoteFetchInfo could not be cancelled and its isDone value is 
${remoteFetchTask.isDone}")
 
     DelayedRemoteFetchMetrics.expiredRequestMeter.mark()
diff --git 
a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala 
b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
index 264f5310c2d..b3f032e3dba 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
@@ -200,7 +200,7 @@ class DelayedRemoteFetchTest {
     delayedRemoteFetch.run()
 
     // Check that the task was cancelled and force-completed
-    verify(remoteFetchTask).cancel(true)
+    verify(remoteFetchTask).cancel(false)
     assertTrue(delayedRemoteFetch.isCompleted)
 
     // Check that the ExpiresPerSec metric was incremented

Reply via email to