junrao commented on code in PR #19197:
URL: https://github.com/apache/kafka/pull/19197#discussion_r1994095620
##########
core/src/main/scala/kafka/server/DelayedRemoteFetch.scala:
##########
@@ -88,8 +90,9 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void],
override def onExpiration(): Unit = {
// cancel the remote storage read task, if it has not been executed yet
- val cancelled = remoteFetchTask.cancel(true)
- if (!cancelled) debug(s"Remote fetch task for RemoteStorageFetchInfo:
$remoteFetchInfo could not be cancelled and its isDone value is
${remoteFetchTask.isDone}")
+ val cancelled = remoteFetchTask.cancel(false)
Review Comment:
Hmm, the original logic doesn't look quite right. When a delayed task
expires due to the timeout, we first call `onComplete()` and then
`onExpiration()`. So cancelling the task in `onExpiration()` is too late since
we call `remoteFetchResult.get` in `onComplete()`.
To address this issue, we could call `remoteFetchTask.cancel()` in
`onComplete()` if `remoteFetchResult.isDone` is false.
##########
core/src/main/java/kafka/log/remote/RemoteLogReader.java:
##########
@@ -57,8 +59,18 @@ public RemoteLogReader(RemoteStorageFetchInfo fetchInfo,
this.remoteReadTimer = remoteReadTimer;
}
+ public void cancel() {
+ LOGGER.debug("Cancelling remote log reader for topic partition {}",
fetchInfo.topicPartition);
+ callback.accept(new RemoteLogReadResult(Optional.empty(),
Optional.of(new InterruptedException("Cancelled remote log reader"))));
Review Comment:
1. Hmm, doing this here would mean that we could call the callback twice?
2. Another problem is that the caller is from the
DelayedRemoteFetch.onComplete(). The callback calls
`delayedRemoteFetchPurgatory.checkAndComplete(key)`. Ideally, we don't want to
call the purgatory while we are inside the purgatory to avoid potential
deadlocks and infinite stacks.
##########
core/src/main/scala/kafka/server/DelayedRemoteFetch.scala:
##########
@@ -36,6 +37,7 @@ import scala.collection._
* in the remote fetch operation purgatory
*/
class DelayedRemoteFetch(remoteFetchTask: Future[Void],
+ remoteLogReader: RemoteLogReader,
Review Comment:
This is an existing issue. It would be better if we could consolidate
remoteFetchTask and remoteFetchResult into a single future.
##########
core/src/main/java/kafka/log/remote/RemoteLogReader.java:
##########
@@ -57,8 +59,18 @@ public RemoteLogReader(RemoteStorageFetchInfo fetchInfo,
this.remoteReadTimer = remoteReadTimer;
}
+ public void cancel() {
+ LOGGER.debug("Cancelling remote log reader for topic partition {}",
fetchInfo.topicPartition);
+ callback.accept(new RemoteLogReadResult(Optional.empty(),
Optional.of(new InterruptedException("Cancelled remote log reader"))));
+ this.cancelled = true;
+ }
+
@Override
public Void call() {
+ if (cancelled) {
Review Comment:
We probably want to pass `cancelled` to `rlm.read()` so that we could
complete the task early if `cancelled` is true.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]