This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 08a93fe12ab KAFKA-14523: Move DelayedRemoteListOffsets to the storage
module (#19285)
08a93fe12ab is described below
commit 08a93fe12ab1f29010341d47e421170b980f904c
Author: Mickael Maison <[email protected]>
AuthorDate: Sat Apr 5 13:51:13 2025 +0200
KAFKA-14523: Move DelayedRemoteListOffsets to the storage module (#19285)
Decouple RemoteLogManager and ReplicaManager.
Reviewers: Chia-Ping Tsai <[email protected]>
---
checkstyle/import-control-storage.xml | 4 +
.../java/kafka/log/remote/RemoteLogManager.java | 2 +-
.../kafka/server/DelayedRemoteListOffsets.scala | 168 -------------
core/src/main/scala/kafka/server/KafkaApis.scala | 12 +-
.../main/scala/kafka/server/ReplicaManager.scala | 17 +-
.../server/DelayedRemoteListOffsetsTest.scala | 258 --------------------
.../AbstractCoordinatorConcurrencyTest.scala | 2 +-
.../test/scala/unit/kafka/log/UnifiedLogTest.scala | 4 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 21 +-
.../unit/kafka/server/ReplicaManagerTest.scala | 2 +-
.../server/purgatory/DelayedRemoteListOffsets.java | 194 +++++++++++++++
.../purgatory}/ListOffsetsPartitionStatus.java | 2 +-
.../purgatory/DelayedRemoteListOffsetsTest.java | 261 +++++++++++++++++++++
13 files changed, 492 insertions(+), 455 deletions(-)
diff --git a/checkstyle/import-control-storage.xml
b/checkstyle/import-control-storage.xml
index ce52e81ed0c..d96311d2897 100644
--- a/checkstyle/import-control-storage.xml
+++ b/checkstyle/import-control-storage.xml
@@ -49,7 +49,11 @@
<subpackage name="server">
+ <allow pkg="com.yammer.metrics.core" />
<allow pkg="org.apache.kafka.common" />
+ <allow pkg="org.apache.kafka.server.metrics" />
+ <allow pkg="org.apache.kafka.server.util.timer" />
+ <allow pkg="org.apache.kafka.storage.internals.log" />
<subpackage name="log">
<allow pkg="com.fasterxml.jackson" />
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index cb220baa2cd..c0da34b4d2e 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -17,7 +17,6 @@
package kafka.log.remote;
import kafka.cluster.Partition;
-import kafka.server.DelayedRemoteListOffsets;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.KafkaException;
@@ -66,6 +65,7 @@ import
org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
+import org.apache.kafka.server.purgatory.DelayedRemoteListOffsets;
import org.apache.kafka.server.purgatory.TopicPartitionOperationKey;
import org.apache.kafka.server.quota.QuotaType;
import org.apache.kafka.server.storage.log.FetchIsolation;
diff --git a/core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala
b/core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala
deleted file mode 100644
index a84b78ff25c..00000000000
--- a/core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import com.yammer.metrics.core.Meter
-import kafka.utils.{Logging, Pool}
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.ApiException
-import
org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse,
ListOffsetsTopicResponse}
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.ListOffsetsResponse
-import org.apache.kafka.server.ListOffsetsPartitionStatus
-import org.apache.kafka.server.metrics.KafkaMetricsGroup
-import org.apache.kafka.server.purgatory.DelayedOperation
-import
org.apache.kafka.storage.internals.log.OffsetResultHolder.FileRecordsOrError
-
-import java.util.Optional
-import java.util.concurrent.TimeUnit
-import scala.collection.{Map, mutable}
-import scala.jdk.CollectionConverters._
-
-class DelayedRemoteListOffsets(delayMs: Long,
- version: Int,
- statusByPartition: mutable.Map[TopicPartition,
ListOffsetsPartitionStatus],
- replicaManager: ReplicaManager,
- responseCallback:
List[ListOffsetsTopicResponse] => Unit)
- extends DelayedOperation(delayMs) with Logging {
- // Mark the status as completed, if there is no async task to track.
- // If there is a task to track, then build the response as REQUEST_TIMED_OUT
by default.
- statusByPartition.foreachEntry { (topicPartition, status) =>
- status.completed(status.futureHolderOpt.isEmpty)
- if (status.futureHolderOpt.isPresent) {
-
status.responseOpt(Optional.of(buildErrorResponse(Errors.REQUEST_TIMED_OUT,
topicPartition.partition())))
- }
- trace(s"Initial partition status for $topicPartition is $status")
- }
-
- /**
- * Call-back to execute when a delayed operation gets expired and hence
forced to complete.
- */
- override def onExpiration(): Unit = {
- statusByPartition.foreachEntry { (topicPartition, status) =>
- if (!status.completed) {
- debug(s"Expiring list offset request for partition $topicPartition
with status $status")
- status.futureHolderOpt.ifPresent(futureHolder =>
futureHolder.jobFuture.cancel(true))
- DelayedRemoteListOffsetsMetrics.recordExpiration(topicPartition)
- }
- }
- }
-
- /**
- * Process for completing an operation; This function needs to be defined
- * in subclasses and will be called exactly once in forceComplete()
- */
- override def onComplete(): Unit = {
- val responseTopics = statusByPartition.groupBy(e => e._1.topic()).map {
- case (topic, status) =>
- new
ListOffsetsTopicResponse().setName(topic).setPartitions(status.values.flatMap(s
=> Some(s.responseOpt.get())).toList.asJava)
- }.toList
- responseCallback(responseTopics)
- }
-
- /**
- * Try to complete the delayed operation by first checking if the operation
- * can be completed by now. If yes execute the completion logic by calling
- * forceComplete() and return true iff forceComplete returns true; otherwise
return false
- *
- * This function needs to be defined in subclasses
- */
- override def tryComplete(): Boolean = {
- var completable = true
- statusByPartition.foreachEntry { (partition, status) =>
- if (!status.completed) {
- try {
- replicaManager.getPartitionOrException(partition)
- } catch {
- case e: ApiException =>
- status.futureHolderOpt.ifPresent { futureHolder =>
- futureHolder.jobFuture.cancel(false)
- futureHolder.taskFuture.complete(new
FileRecordsOrError(Optional.of(e), Optional.empty()))
- }
- }
-
- status.futureHolderOpt.ifPresent { futureHolder =>
- if (futureHolder.taskFuture.isDone) {
- val taskFuture = futureHolder.taskFuture.get()
- val response = {
- if (taskFuture.hasException) {
-
buildErrorResponse(Errors.forException(taskFuture.exception().get()),
partition.partition())
- } else if (!taskFuture.hasTimestampAndOffset) {
- val error = status.maybeOffsetsError
- .map(e => if (version >= 5) Errors.forException(e) else
Errors.LEADER_NOT_AVAILABLE)
- .orElse(Errors.NONE)
- buildErrorResponse(error, partition.partition())
- } else {
- var partitionResponse = buildErrorResponse(Errors.NONE,
partition.partition())
- val found = taskFuture.timestampAndOffset().get()
- if (status.lastFetchableOffset.isPresent && found.offset >=
status.lastFetchableOffset.get) {
- if (status.maybeOffsetsError.isPresent) {
- val error = if (version >= 5)
Errors.forException(status.maybeOffsetsError.get) else
Errors.LEADER_NOT_AVAILABLE
- partitionResponse.setErrorCode(error.code())
- }
- } else {
- partitionResponse = new ListOffsetsPartitionResponse()
- .setPartitionIndex(partition.partition())
- .setErrorCode(Errors.NONE.code())
- .setTimestamp(found.timestamp)
- .setOffset(found.offset)
-
- if (found.leaderEpoch.isPresent && version >= 4) {
- partitionResponse.setLeaderEpoch(found.leaderEpoch.get)
- }
- }
- partitionResponse
- }
- }
- status.responseOpt(Optional.of(response))
- status.completed(true)
- }
- completable = completable && futureHolder.taskFuture.isDone
- }
- }
- }
- if (completable) {
- forceComplete()
- } else {
- false
- }
- }
-
- private def buildErrorResponse(e: Errors, partitionIndex: Int):
ListOffsetsPartitionResponse = {
- new ListOffsetsPartitionResponse()
- .setPartitionIndex(partitionIndex)
- .setErrorCode(e.code)
- .setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP)
- .setOffset(ListOffsetsResponse.UNKNOWN_OFFSET)
- }
-}
-
-object DelayedRemoteListOffsetsMetrics {
- private val metricsGroup = new
KafkaMetricsGroup(DelayedRemoteListOffsetsMetrics.getClass)
- private[server] val aggregateExpirationMeter =
metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS)
- private val partitionExpirationMeterFactory = (key: TopicPartition) =>
- metricsGroup.newMeter("ExpiresPerSec",
- "requests",
- TimeUnit.SECONDS,
- Map("topic" -> key.topic, "partition" -> key.partition.toString).asJava)
- private[server] val partitionExpirationMeters = new Pool[TopicPartition,
Meter](valueFactory = Some(partitionExpirationMeterFactory))
-
- def recordExpiration(partition: TopicPartition): Unit = {
- aggregateExpirationMeter.mark()
- partitionExpirationMeters.getAndMaybePut(partition).mark()
- }
-}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 966607f11e1..8356bc4e732 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -769,18 +769,20 @@ class KafkaApis(val requestChannel: RequestChannel,
.setName(topic.name)
.setPartitions(topic.partitions.asScala.map(partition =>
buildErrorResponse(Errors.TOPIC_AUTHORIZATION_FAILED,
partition)).asJava)
- )
+ ).asJava
- def sendResponseCallback(response: Seq[ListOffsetsTopicResponse]): Unit = {
- val mergedResponses = response ++ unauthorizedResponseStatus
+ def sendResponseCallback(response:
util.Collection[ListOffsetsTopicResponse]): Void = {
+ val mergedResponses = new util.ArrayList(response)
+ mergedResponses.addAll(unauthorizedResponseStatus)
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new ListOffsetsResponse(new ListOffsetsResponseData()
.setThrottleTimeMs(requestThrottleMs)
- .setTopics(mergedResponses.asJava)))
+ .setTopics(mergedResponses)))
+ null
}
if (authorizedRequestInfo.isEmpty) {
- sendResponseCallback(Seq.empty)
+ sendResponseCallback(util.List.of)
} else {
replicaManager.fetchOffset(authorizedRequestInfo,
offsetRequest.duplicatePartitions().asScala,
offsetRequest.isolationLevel(), offsetRequest.replicaId(), clientId,
correlationId, version,
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 6ff88869d93..9b5c01a85c9 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -53,13 +53,13 @@ import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal,
StopPartition, TopicOptionalIdPartition}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.network.BrokerEndPoint
-import org.apache.kafka.server.purgatory.{DelayedDeleteRecords,
DelayedOperationPurgatory, DeleteRecordsPartitionStatus,
TopicPartitionOperationKey}
+import org.apache.kafka.server.purgatory.{DelayedDeleteRecords,
DelayedOperationPurgatory, DelayedRemoteListOffsets,
DeleteRecordsPartitionStatus, ListOffsetsPartitionStatus,
TopicPartitionOperationKey}
import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey,
DelayedShareFetchPartitionKey}
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
-import org.apache.kafka.server.{ActionQueue, DelayedActionQueue,
ListOffsetsPartitionStatus, common}
+import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, common}
import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints,
OffsetCheckpointFile, OffsetCheckpoints}
-import org.apache.kafka.storage.internals.log._
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo,
LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel,
LogOffsetMetadata, LogReadInfo, OffsetResultHolder, RecordValidationException,
RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import java.io.File
@@ -70,6 +70,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.Lock
import java.util.concurrent.{CompletableFuture, Future,
RejectedExecutionException, TimeUnit}
import java.util.{Collections, Optional, OptionalInt, OptionalLong}
+import java.util.function.Consumer
import scala.collection.{Map, Seq, Set, immutable, mutable}
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.{RichOption, RichOptional}
@@ -841,7 +842,7 @@ class ReplicaManager(val config: KafkaConfig,
)
val retryTimeoutMs =
Math.min(config.addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMaxMs(),
config.requestTimeoutMs)
- val addPartitionsRetryBackoffMs =
config.addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMs
+ val addPartitionsRetryBackoffMs =
config.addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMs()
val startVerificationTimeMs = time.milliseconds
def maybeRetryOnConcurrentTransactions(results: (Map[TopicPartition,
Errors], Map[TopicPartition, VerificationGuard])): Unit = {
if (time.milliseconds() - startVerificationTimeMs >= retryTimeoutMs) {
@@ -1470,7 +1471,7 @@ class ReplicaManager(val config: KafkaConfig,
correlationId: Int,
version: Short,
buildErrorResponse: (Errors, ListOffsetsPartition) =>
ListOffsetsPartitionResponse,
- responseCallback: List[ListOffsetsTopicResponse] => Unit,
+ responseCallback:
Consumer[util.Collection[ListOffsetsTopicResponse]],
timeoutMs: Int = 0): Unit = {
val statusByPartition = mutable.Map[TopicPartition,
ListOffsetsPartitionStatus]()
topics.foreach { topic =>
@@ -1569,7 +1570,7 @@ class ReplicaManager(val config: KafkaConfig,
if (delayedRemoteListOffsetsRequired(statusByPartition)) {
val delayMs: Long = if (timeoutMs > 0) timeoutMs else
config.remoteLogManagerConfig.remoteListOffsetsRequestTimeoutMs()
// create delayed remote list offsets operation
- val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs,
version, statusByPartition, this, responseCallback)
+ val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs,
version, statusByPartition.asJava, tp => getPartitionOrException(tp),
responseCallback)
// create a list of (topic, partition) pairs to use as keys for this
delayed remote list offsets operation
val listOffsetsRequestKeys = statusByPartition.keys.map(new
TopicPartitionOperationKey(_)).toList
// try to complete the request immediately, otherwise put it into the
purgatory
@@ -1580,7 +1581,7 @@ class ReplicaManager(val config: KafkaConfig,
case (topic, status) =>
new
ListOffsetsTopicResponse().setName(topic).setPartitions(status.values.flatMap(s
=> Some(s.responseOpt.get())).toList.asJava)
}.toList
- responseCallback(responseTopics)
+ responseCallback.accept(responseTopics.asJava)
}
}
@@ -1899,7 +1900,7 @@ class ReplicaManager(val config: KafkaConfig,
createLogReadResult(highWatermark, leaderLogStartOffset,
leaderLogEndOffset,
new OffsetMovedToTieredStorageException("Given offset" + offset + "
is moved to tiered storage"))
} else {
- val throttleTimeMs = remoteLogManager.get.getFetchThrottleTimeMs()
+ val throttleTimeMs = remoteLogManager.get.getFetchThrottleTimeMs
val fetchDataInfo = if (throttleTimeMs > 0) {
// Record the throttle time for the remote log fetches
remoteLogManager.get.fetchThrottleTimeSensor().record(throttleTimeMs,
time.milliseconds())
diff --git
a/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala
b/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala
deleted file mode 100644
index f40338d3264..00000000000
---
a/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala
+++ /dev/null
@@ -1,258 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.NotLeaderOrFollowerException
-import
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
-import org.apache.kafka.common.requests.ListOffsetsResponse
-import org.apache.kafka.server.ListOffsetsPartitionStatus
-import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory,
TopicPartitionOperationKey}
-import org.apache.kafka.server.util.timer.MockTimer
-import org.apache.kafka.storage.internals.log.{AsyncOffsetReadFutureHolder,
OffsetResultHolder}
-import org.junit.jupiter.api.{AfterEach, Test}
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.mockito.ArgumentMatchers.anyBoolean
-import org.mockito.Mockito.{mock, when}
-
-import java.util.Optional
-import java.util.concurrent.CompletableFuture
-import scala.collection.mutable
-import scala.concurrent.TimeoutException
-import scala.jdk.CollectionConverters._
-
-class DelayedRemoteListOffsetsTest {
-
- val delayMs = 10
- val timer = new MockTimer()
- val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
- type T = OffsetResultHolder.FileRecordsOrError
- val purgatory =
- new DelayedOperationPurgatory[DelayedRemoteListOffsets]("test-purgatory",
timer, 0, 10, true, true)
-
- @AfterEach
- def afterEach(): Unit = {
- purgatory.shutdown()
- }
-
- @Test
- def testResponseOnRequestExpiration(): Unit = {
- var numResponse = 0
- val responseCallback = (response: List[ListOffsetsTopicResponse]) => {
- response.foreach { topic =>
- topic.partitions().forEach { partition =>
- assertEquals(Errors.REQUEST_TIMED_OUT.code(), partition.errorCode())
- assertEquals(ListOffsetsResponse.UNKNOWN_TIMESTAMP,
partition.timestamp())
- assertEquals(ListOffsetsResponse.UNKNOWN_OFFSET, partition.offset())
- assertEquals(-1, partition.leaderEpoch())
- numResponse += 1
- }
- }
- }
-
- var cancelledCount = 0
- val jobFuture = mock(classOf[CompletableFuture[Void]])
- val holder: AsyncOffsetReadFutureHolder[T] =
mock(classOf[AsyncOffsetReadFutureHolder[T]])
- when(holder.taskFuture).thenAnswer(_ => new CompletableFuture[T]())
- when(holder.jobFuture).thenReturn(jobFuture)
- when(jobFuture.cancel(anyBoolean())).thenAnswer(_ => {
- cancelledCount += 1
- true
- })
-
- val statusByPartition = mutable.Map(
- new TopicPartition("test", 0) ->
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
- new TopicPartition("test", 1) ->
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
- new TopicPartition("test1", 0) ->
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
- )
-
- val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs,
version = 5, statusByPartition, replicaManager, responseCallback)
- val listOffsetsRequestKeys = statusByPartition.keys.map(new
TopicPartitionOperationKey(_)).toList.asJava
- assertEquals(0,
DelayedRemoteListOffsetsMetrics.aggregateExpirationMeter.count())
- assertEquals(0,
DelayedRemoteListOffsetsMetrics.partitionExpirationMeters.size)
- purgatory.tryCompleteElseWatch(delayedRemoteListOffsets,
listOffsetsRequestKeys)
-
- Thread.sleep(100)
- assertEquals(3, listOffsetsRequestKeys.size)
- assertEquals(listOffsetsRequestKeys.size, cancelledCount)
- assertEquals(listOffsetsRequestKeys.size, numResponse)
- assertEquals(listOffsetsRequestKeys.size,
DelayedRemoteListOffsetsMetrics.aggregateExpirationMeter.count())
- listOffsetsRequestKeys.forEach(key => {
- val tp = new TopicPartition(key.topic, key.partition)
- assertEquals(1,
DelayedRemoteListOffsetsMetrics.partitionExpirationMeters.get(tp).count())
- })
- }
-
- @Test
- def testResponseOnSuccess(): Unit = {
- var numResponse = 0
- val responseCallback = (response: List[ListOffsetsTopicResponse]) => {
- response.foreach { topic =>
- topic.partitions().forEach { partition =>
- assertEquals(Errors.NONE.code(), partition.errorCode())
- assertEquals(100L, partition.timestamp())
- assertEquals(100L, partition.offset())
- assertEquals(50, partition.leaderEpoch())
- numResponse += 1
- }
- }
- }
-
- val timestampAndOffset = new TimestampAndOffset(100L, 100L,
Optional.of(50))
- val taskFuture = new CompletableFuture[T]()
- taskFuture.complete(new
OffsetResultHolder.FileRecordsOrError(Optional.empty(),
Optional.of(timestampAndOffset)))
-
- var cancelledCount = 0
- val jobFuture = mock(classOf[CompletableFuture[Void]])
- val holder: AsyncOffsetReadFutureHolder[T] =
mock(classOf[AsyncOffsetReadFutureHolder[T]])
- when(holder.taskFuture).thenAnswer(_ => taskFuture)
- when(holder.jobFuture).thenReturn(jobFuture)
- when(jobFuture.cancel(anyBoolean())).thenAnswer(_ => {
- cancelledCount += 1
- true
- })
-
- val statusByPartition = mutable.Map(
- new TopicPartition("test", 0) ->
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
- new TopicPartition("test", 1) ->
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
- new TopicPartition("test1", 0) ->
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build()
- )
-
- val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs,
version = 5, statusByPartition, replicaManager, responseCallback)
- val listOffsetsRequestKeys = statusByPartition.keys.map(new
TopicPartitionOperationKey(_)).toList.asJava
- purgatory.tryCompleteElseWatch(delayedRemoteListOffsets,
listOffsetsRequestKeys)
-
- assertEquals(0, cancelledCount)
- assertEquals(listOffsetsRequestKeys.size, numResponse)
- }
-
- @Test
- def testResponseOnPartialError(): Unit = {
- var numResponse = 0
- val responseCallback = (response: List[ListOffsetsTopicResponse]) => {
- response.foreach { topic =>
- topic.partitions().forEach { partition =>
- if (topic.name().equals("test1")) {
- assertEquals(Errors.UNKNOWN_SERVER_ERROR.code(),
partition.errorCode())
- assertEquals(ListOffsetsResponse.UNKNOWN_TIMESTAMP,
partition.timestamp())
- assertEquals(ListOffsetsResponse.UNKNOWN_OFFSET,
partition.offset())
- assertEquals(-1, partition.leaderEpoch())
- } else {
- assertEquals(Errors.NONE.code(), partition.errorCode())
- assertEquals(100L, partition.timestamp())
- assertEquals(100L, partition.offset())
- assertEquals(50, partition.leaderEpoch())
- }
- numResponse += 1
- }
- }
- }
-
- val timestampAndOffset = new TimestampAndOffset(100L, 100L,
Optional.of(50))
- val taskFuture = new CompletableFuture[T]()
- taskFuture.complete(new
OffsetResultHolder.FileRecordsOrError(Optional.empty(),
Optional.of(timestampAndOffset)))
-
- var cancelledCount = 0
- val jobFuture = mock(classOf[CompletableFuture[Void]])
- val holder: AsyncOffsetReadFutureHolder[T] =
mock(classOf[AsyncOffsetReadFutureHolder[T]])
- when(holder.taskFuture).thenAnswer(_ => taskFuture)
- when(holder.jobFuture).thenReturn(jobFuture)
- when(jobFuture.cancel(anyBoolean())).thenAnswer(_ => {
- cancelledCount += 1
- true
- })
-
- val errorFutureHolder: AsyncOffsetReadFutureHolder[T] =
mock(classOf[AsyncOffsetReadFutureHolder[T]])
- val errorTaskFuture = new CompletableFuture[T]()
- errorTaskFuture.complete(new
OffsetResultHolder.FileRecordsOrError(Optional.of(new TimeoutException("Timed
out!")), Optional.empty()))
- when(errorFutureHolder.taskFuture).thenAnswer(_ => errorTaskFuture)
- when(errorFutureHolder.jobFuture).thenReturn(jobFuture)
-
- val statusByPartition = mutable.Map(
- new TopicPartition("test", 0) ->
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
- new TopicPartition("test", 1) ->
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
- new TopicPartition("test1", 0) ->
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(errorFutureHolder)).build()
- )
-
- val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs,
version = 5, statusByPartition, replicaManager, responseCallback)
- val listOffsetsRequestKeys = statusByPartition.keys.map(new
TopicPartitionOperationKey(_)).toList.asJava
- purgatory.tryCompleteElseWatch(delayedRemoteListOffsets,
listOffsetsRequestKeys)
-
- assertEquals(0, cancelledCount)
- assertEquals(listOffsetsRequestKeys.size, numResponse)
- }
-
- @Test
- def testPartialResponseWhenNotLeaderOrFollowerExceptionOnOnePartition():
Unit = {
- var numResponse = 0
- val responseCallback = (response: List[ListOffsetsTopicResponse]) => {
- response.foreach { topic =>
- topic.partitions().forEach { partition =>
- if (topic.name().equals("test1") && partition.partitionIndex() == 0)
{
- assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code(),
partition.errorCode())
- assertEquals(ListOffsetsResponse.UNKNOWN_TIMESTAMP,
partition.timestamp())
- assertEquals(ListOffsetsResponse.UNKNOWN_OFFSET,
partition.offset())
- assertEquals(-1, partition.leaderEpoch())
- } else {
- assertEquals(Errors.NONE.code(), partition.errorCode())
- assertEquals(100L, partition.timestamp())
- assertEquals(100L, partition.offset())
- assertEquals(50, partition.leaderEpoch())
- }
- numResponse += 1
- }
- }
- }
-
- val timestampAndOffset = new TimestampAndOffset(100L, 100L,
Optional.of(50))
- val taskFuture = new CompletableFuture[T]()
- taskFuture.complete(new
OffsetResultHolder.FileRecordsOrError(Optional.empty(),
Optional.of(timestampAndOffset)))
-
- var cancelledCount = 0
- val jobFuture = mock(classOf[CompletableFuture[Void]])
- val holder: AsyncOffsetReadFutureHolder[T] =
mock(classOf[AsyncOffsetReadFutureHolder[T]])
- when(holder.taskFuture).thenAnswer(_ => taskFuture)
- when(holder.jobFuture).thenReturn(jobFuture)
- when(jobFuture.cancel(anyBoolean())).thenAnswer(_ => {
- cancelledCount += 1
- true
- })
-
- when(replicaManager.getPartitionOrException(new TopicPartition("test1",
0)))
- .thenThrow(new NotLeaderOrFollowerException("Not leader or follower!"))
- val errorFutureHolder: AsyncOffsetReadFutureHolder[T] =
mock(classOf[AsyncOffsetReadFutureHolder[T]])
- val errorTaskFuture = new CompletableFuture[T]()
- when(errorFutureHolder.taskFuture).thenAnswer(_ => errorTaskFuture)
- when(errorFutureHolder.jobFuture).thenReturn(jobFuture)
-
- val statusByPartition = mutable.Map(
- new TopicPartition("test", 0) ->
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
- new TopicPartition("test", 1) ->
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
- new TopicPartition("test1", 0) ->
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(errorFutureHolder)).build(),
- new TopicPartition("test1", 1) ->
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build()
- )
-
- val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs,
version = 5, statusByPartition, replicaManager, responseCallback)
- val listOffsetsRequestKeys = statusByPartition.keys.map(new
TopicPartitionOperationKey(_)).toList.asJava
- purgatory.tryCompleteElseWatch(delayedRemoteListOffsets,
listOffsetsRequestKeys)
-
- assertEquals(1, cancelledCount)
- assertEquals(listOffsetsRequestKeys.size, numResponse)
- }
-}
diff --git
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
index a3f9354fd8b..9149fec2eda 100644
---
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -33,7 +33,7 @@ import org.apache.kafka.common.record.{MemoryRecords,
RecordBatch, RecordValidat
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.common.RequestLocal
-import org.apache.kafka.server.purgatory.{DelayedDeleteRecords,
DelayedOperationPurgatory, TopicPartitionOperationKey}
+import org.apache.kafka.server.purgatory.{DelayedDeleteRecords,
DelayedOperationPurgatory, DelayedRemoteListOffsets, TopicPartitionOperationKey}
import org.apache.kafka.server.util.timer.{MockTimer, Timer}
import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig,
UnifiedLog, VerificationGuard}
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index a94de9a14d8..257f777885b 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -18,7 +18,7 @@
package kafka.log
import kafka.log.remote.RemoteLogManager
-import kafka.server.{DelayedRemoteListOffsets, KafkaConfig}
+import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig
@@ -38,7 +38,7 @@ import org.apache.kafka.server.config.KRaftConfigs
import
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import
org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager,
NoOpRemoteStorageManager, RemoteLogManagerConfig}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
-import org.apache.kafka.server.purgatory.DelayedOperationPurgatory
+import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory,
DelayedRemoteListOffsets}
import org.apache.kafka.server.storage.log.{FetchIsolation,
UnexpectedAppendOffsetException}
import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler}
import
org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile,
PartitionMetadataFile}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index cf400517c25..e490be540e2 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -114,6 +114,7 @@ import java.time.Duration
import java.util
import java.util.Arrays.asList
import java.util.concurrent.{CompletableFuture, TimeUnit}
+import java.util.function.Consumer
import java.util.{Collections, Comparator, Optional, OptionalInt,
OptionalLong, Properties}
import scala.collection.{Map, Seq, mutable}
import scala.jdk.CollectionConverters._
@@ -3396,16 +3397,16 @@ class KafkaApisTest extends Logging {
ArgumentMatchers.anyInt(), // correlationId
ArgumentMatchers.anyShort(), // version
ArgumentMatchers.any[(Errors, ListOffsetsPartition) =>
ListOffsetsPartitionResponse](),
- ArgumentMatchers.any[List[ListOffsetsTopicResponse] => Unit](),
+
ArgumentMatchers.any[Consumer[util.Collection[ListOffsetsTopicResponse]]],
ArgumentMatchers.anyInt() // timeoutMs
)).thenAnswer(ans => {
- val callback = ans.getArgument[List[ListOffsetsTopicResponse] => Unit](8)
+ val callback =
ans.getArgument[Consumer[util.List[ListOffsetsTopicResponse]]](8)
val partitionResponse = new ListOffsetsPartitionResponse()
.setErrorCode(error.code())
.setOffset(ListOffsetsResponse.UNKNOWN_OFFSET)
.setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP)
.setPartitionIndex(tp.partition())
- callback(List(new
ListOffsetsTopicResponse().setName(tp.topic()).setPartitions(List(partitionResponse).asJava)))
+ callback.accept(util.List.of(new
ListOffsetsTopicResponse().setName(tp.topic()).setPartitions(List(partitionResponse).asJava)))
})
val targetTimes = List(new ListOffsetsTopic()
@@ -3503,7 +3504,7 @@ class KafkaApisTest extends Logging {
// 2 topics returned for authorization in during handle
val topicsReturnedFromMetadataCacheForAuthorization =
util.Set.of("remaining-topic", "later-deleted-topic")
-
when(metadataCache.getAllTopics()).thenReturn(topicsReturnedFromMetadataCacheForAuthorization)
+
when(metadataCache.getAllTopics).thenReturn(topicsReturnedFromMetadataCacheForAuthorization)
// 1 topic is deleted from metadata right at the time between
authorization and the next getTopicMetadata() call
when(metadataCache.getTopicMetadata(
ArgumentMatchers.eq(topicsReturnedFromMetadataCacheForAuthorization),
@@ -8857,11 +8858,11 @@ class KafkaApisTest extends Logging {
ArgumentMatchers.anyInt(), // correlationId
ArgumentMatchers.anyShort(), // version
ArgumentMatchers.any[(Errors, ListOffsetsPartition) =>
ListOffsetsPartitionResponse](),
- ArgumentMatchers.any[List[ListOffsetsTopicResponse] => Unit](),
+
ArgumentMatchers.any[Consumer[util.Collection[ListOffsetsTopicResponse]]],
ArgumentMatchers.anyInt() // timeoutMs
)).thenAnswer(ans => {
val version = ans.getArgument[Short](6)
- val callback = ans.getArgument[List[ListOffsetsTopicResponse] => Unit](8)
+ val callback =
ans.getArgument[Consumer[util.List[ListOffsetsTopicResponse]]](8)
val errorCode = if
(ReplicaManager.isListOffsetsTimestampUnsupported(timestamp, version))
Errors.UNSUPPORTED_VERSION.code()
else
@@ -8871,7 +8872,7 @@ class KafkaApisTest extends Logging {
.setOffset(ListOffsetsResponse.UNKNOWN_OFFSET)
.setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP)
.setPartitionIndex(tp.partition())
- callback(List(new
ListOffsetsTopicResponse().setName(tp.topic()).setPartitions(List(partitionResponse).asJava)))
+ callback.accept(util.List.of(new
ListOffsetsTopicResponse().setName(tp.topic()).setPartitions(List(partitionResponse).asJava)))
})
val data = new
ListOffsetsRequestData().setTopics(targetTimes).setReplicaId(ListOffsetsRequest.CONSUMER_REPLICA_ID)
@@ -8909,16 +8910,16 @@ class KafkaApisTest extends Logging {
ArgumentMatchers.anyInt(), // correlationId
ArgumentMatchers.anyShort(), // version
ArgumentMatchers.any[(Errors, ListOffsetsPartition) =>
ListOffsetsPartitionResponse](),
- ArgumentMatchers.any[List[ListOffsetsTopicResponse] => Unit](),
+
ArgumentMatchers.any[Consumer[util.Collection[ListOffsetsTopicResponse]]],
ArgumentMatchers.anyInt() // timeoutMs
)).thenAnswer(ans => {
- val callback = ans.getArgument[List[ListOffsetsTopicResponse] => Unit](8)
+ val callback =
ans.getArgument[Consumer[util.List[ListOffsetsTopicResponse]]](8)
val partitionResponse = new ListOffsetsPartitionResponse()
.setErrorCode(Errors.NONE.code())
.setOffset(latestOffset)
.setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP)
.setPartitionIndex(tp.partition())
- callback(List(new
ListOffsetsTopicResponse().setName(tp.topic()).setPartitions(List(partitionResponse).asJava)))
+ callback.accept(util.List.of(new
ListOffsetsTopicResponse().setName(tp.topic()).setPartitions(List(partitionResponse).asJava)))
})
val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true,
isolationLevel)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index e014f255d40..5ae1262df40 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -63,7 +63,7 @@ import org.apache.kafka.server.config.{KRaftConfigs,
ReplicationConfigs, ServerL
import org.apache.kafka.server.log.remote.storage._
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.network.BrokerEndPoint
-import org.apache.kafka.server.purgatory.{DelayedDeleteRecords,
DelayedOperationPurgatory}
+import org.apache.kafka.server.purgatory.{DelayedDeleteRecords,
DelayedOperationPurgatory, DelayedRemoteListOffsets}
import org.apache.kafka.server.share.SharePartitionKey
import org.apache.kafka.server.share.fetch.{DelayedShareFetchGroupKey,
DelayedShareFetchKey, ShareFetch}
import org.apache.kafka.server.share.metrics.ShareGroupMetrics
diff --git
a/storage/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsets.java
b/storage/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsets.java
new file mode 100644
index 00000000000..200fb2262ac
--- /dev/null
+++
b/storage/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsets.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.purgatory;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.message.ListOffsetsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.storage.internals.log.OffsetResultHolder;
+
+import com.yammer.metrics.core.Meter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public class DelayedRemoteListOffsets extends DelayedOperation {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DelayedRemoteListOffsets.class);
+
+ // For compatibility, metrics are defined to be under
`kafka.server.DelayedRemoteListOffsetsMetrics` class
+ private static final KafkaMetricsGroup METRICS_GROUP = new
KafkaMetricsGroup("kafka.server", "DelayedRemoteListOffsetsMetrics");
+ static final Meter AGGREGATE_EXPIRATION_METER =
METRICS_GROUP.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS);
+ static final Map<TopicPartition, Meter> PARTITION_EXPIRATION_METERS = new
ConcurrentHashMap<>();
+
+ private final int version;
+ private final Map<TopicPartition, ListOffsetsPartitionStatus>
statusByPartition;
+ private final Consumer<TopicPartition> partitionOrException;
+ private final
Consumer<Collection<ListOffsetsResponseData.ListOffsetsTopicResponse>>
responseCallback;
+
+ public DelayedRemoteListOffsets(long delayMs,
+ int version,
+ Map<TopicPartition,
ListOffsetsPartitionStatus> statusByPartition,
+ Consumer<TopicPartition>
partitionOrException,
+
Consumer<Collection<ListOffsetsResponseData.ListOffsetsTopicResponse>>
responseCallback) {
+ super(delayMs);
+ this.version = version;
+ this.statusByPartition = statusByPartition;
+ this.partitionOrException = partitionOrException;
+ this.responseCallback = responseCallback;
+ // Mark the status as completed, if there is no async task to track.
+ // If there is a task to track, then build the response as
REQUEST_TIMED_OUT by default.
+ statusByPartition.forEach((topicPartition, status) -> {
+ status.completed(status.futureHolderOpt().isEmpty());
+ if (status.futureHolderOpt().isPresent()) {
+
status.responseOpt(Optional.of(buildErrorResponse(Errors.REQUEST_TIMED_OUT,
topicPartition.partition())));
+ }
+ LOG.trace("Initial partition status for {} is {}", topicPartition,
status);
+ });
+ }
+
+ /**
+ * Call-back to execute when a delayed operation gets expired and hence
forced to complete.
+ */
+ @Override
+ public void onExpiration() {
+ statusByPartition.forEach((topicPartition, status) -> {
+ if (!status.completed()) {
+ LOG.debug("Expiring list offset request for partition {} with
status {}", topicPartition, status);
+ status.futureHolderOpt().ifPresent(futureHolder ->
futureHolder.jobFuture().cancel(true));
+ recordExpiration(topicPartition);
+ }
+ });
+ }
+
+ /**
+ * Process for completing an operation; This function needs to be defined
+ * in subclasses and will be called exactly once in forceComplete()
+ */
+ @Override
+ public void onComplete() {
+ Map<String, ListOffsetsResponseData.ListOffsetsTopicResponse>
groupedByTopic = new HashMap<>();
+ statusByPartition.forEach((tp, status) -> {
+ ListOffsetsResponseData.ListOffsetsTopicResponse response =
groupedByTopic.computeIfAbsent(tp.topic(), k ->
+ new
ListOffsetsResponseData.ListOffsetsTopicResponse().setName(tp.topic()));
+ status.responseOpt().ifPresent(res ->
response.partitions().add(res));
+ });
+ responseCallback.accept(groupedByTopic.values());
+ }
+
+ /**
+ * Try to complete the delayed operation by first checking if the operation
+ * can be completed by now. If yes execute the completion logic by calling
+ * forceComplete() and return true iff forceComplete returns true;
otherwise return false
+ */
+ @Override
+ public boolean tryComplete() {
+ AtomicBoolean completable = new AtomicBoolean(true);
+ statusByPartition.forEach((partition, status) -> {
+ if (!status.completed()) {
+ try {
+ partitionOrException.accept(partition);
+ } catch (ApiException e) {
+ status.futureHolderOpt().ifPresent(futureHolder -> {
+ futureHolder.jobFuture().cancel(false);
+ futureHolder.taskFuture().complete(new
OffsetResultHolder.FileRecordsOrError(Optional.of(e), Optional.empty()));
+ });
+ }
+
+ status.futureHolderOpt().ifPresent(futureHolder -> {
+ if (futureHolder.taskFuture().isDone()) {
+ ListOffsetsResponseData.ListOffsetsPartitionResponse
response;
+ try {
+ OffsetResultHolder.FileRecordsOrError taskFuture =
futureHolder.taskFuture().get();
+ if (taskFuture.hasException()) {
+ response =
buildErrorResponse(Errors.forException(taskFuture.exception().get()),
partition.partition());
+ } else if (!taskFuture.hasTimestampAndOffset()) {
+ Errors error = status.maybeOffsetsError()
+ .map(e -> version >= 5 ?
Errors.forException(e) : Errors.LEADER_NOT_AVAILABLE)
+ .orElse(Errors.NONE);
+ response = buildErrorResponse(error,
partition.partition());
+ } else {
+
ListOffsetsResponseData.ListOffsetsPartitionResponse partitionResponse =
buildErrorResponse(Errors.NONE, partition.partition());
+ FileRecords.TimestampAndOffset found =
taskFuture.timestampAndOffset().get();
+ if (status.lastFetchableOffset().isPresent()
&& found.offset >= status.lastFetchableOffset().get()) {
+ if
(status.maybeOffsetsError().isPresent()) {
+ Errors error = version >= 5 ?
Errors.forException(status.maybeOffsetsError().get()) :
Errors.LEADER_NOT_AVAILABLE;
+
partitionResponse.setErrorCode(error.code());
+ }
+ } else {
+ partitionResponse = new
ListOffsetsResponseData.ListOffsetsPartitionResponse()
+
.setPartitionIndex(partition.partition())
+ .setErrorCode(Errors.NONE.code())
+ .setTimestamp(found.timestamp)
+ .setOffset(found.offset);
+
+ if (found.leaderEpoch.isPresent() &&
version >= 4) {
+
partitionResponse.setLeaderEpoch(found.leaderEpoch.get());
+ }
+ }
+ response = partitionResponse;
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ response =
buildErrorResponse(Errors.forException(e), partition.partition());
+ }
+ status.responseOpt(Optional.of(response));
+ status.completed(true);
+ }
+ completable.set(completable.get() &&
futureHolder.taskFuture().isDone());
+ });
+ }
+ });
+ if (completable.get()) {
+ return forceComplete();
+ } else {
+ return false;
+ }
+ }
+
+ private ListOffsetsResponseData.ListOffsetsPartitionResponse
buildErrorResponse(Errors e, int partitionIndex) {
+ return new ListOffsetsResponseData.ListOffsetsPartitionResponse()
+ .setPartitionIndex(partitionIndex)
+ .setErrorCode(e.code())
+ .setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP)
+ .setOffset(ListOffsetsResponse.UNKNOWN_OFFSET);
+ }
+
+ private static void recordExpiration(TopicPartition partition) {
+ AGGREGATE_EXPIRATION_METER.mark();
+ PARTITION_EXPIRATION_METERS.computeIfAbsent(partition, tp ->
METRICS_GROUP.newMeter("ExpiresPerSec",
+ "requests",
+ TimeUnit.SECONDS,
+ mkMap(mkEntry("topic", tp.topic()), mkEntry("partition",
String.valueOf(tp.partition()))))).mark();
+ }
+}
diff --git
a/server/src/main/java/org/apache/kafka/server/ListOffsetsPartitionStatus.java
b/storage/src/main/java/org/apache/kafka/server/purgatory/ListOffsetsPartitionStatus.java
similarity index 99%
rename from
server/src/main/java/org/apache/kafka/server/ListOffsetsPartitionStatus.java
rename to
storage/src/main/java/org/apache/kafka/server/purgatory/ListOffsetsPartitionStatus.java
index b489b820ef0..74623ee8026 100644
---
a/server/src/main/java/org/apache/kafka/server/ListOffsetsPartitionStatus.java
+++
b/storage/src/main/java/org/apache/kafka/server/purgatory/ListOffsetsPartitionStatus.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.server;
+package org.apache.kafka.server.purgatory;
import org.apache.kafka.common.errors.ApiException;
import
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
diff --git
a/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsetsTest.java
b/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsetsTest.java
new file mode 100644
index 00000000000..81b9073377a
--- /dev/null
+++
b/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsetsTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.purgatory;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.message.ListOffsetsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.server.util.timer.MockTimer;
+import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder;
+import org.apache.kafka.storage.internals.log.OffsetResultHolder;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@SuppressWarnings("unchecked")
+public class DelayedRemoteListOffsetsTest {
+
+ private final int delayMs = 10;
+ private final MockTimer timer = new MockTimer();
+ private final Consumer<TopicPartition> partitionOrException =
mock(Consumer.class);
+ private final DelayedOperationPurgatory<DelayedRemoteListOffsets>
purgatory =
+ new DelayedOperationPurgatory<>("test-purgatory", timer, 0, 10,
true, true);
+
+ @AfterEach
+ public void afterEach() throws Exception {
+ purgatory.shutdown();
+ }
+
+ @Test
+ public void testResponseOnRequestExpiration() throws InterruptedException {
+ AtomicInteger numResponse = new AtomicInteger(0);
+ Consumer<Collection<ListOffsetsResponseData.ListOffsetsTopicResponse>>
responseCallback = response ->
+ response.forEach(topic ->
+ topic.partitions().forEach(partition -> {
+ assertEquals(Errors.REQUEST_TIMED_OUT.code(),
partition.errorCode());
+ assertEquals(ListOffsetsResponse.UNKNOWN_TIMESTAMP,
partition.timestamp());
+ assertEquals(ListOffsetsResponse.UNKNOWN_OFFSET,
partition.offset());
+ assertEquals(-1, partition.leaderEpoch());
+ numResponse.incrementAndGet();
+ })
+ );
+
+ AtomicInteger cancelledCount = new AtomicInteger(0);
+ CompletableFuture<Void> jobFuture = mock(CompletableFuture.class);
+ AsyncOffsetReadFutureHolder<OffsetResultHolder.FileRecordsOrError>
holder = mock(AsyncOffsetReadFutureHolder.class);
+ when(holder.taskFuture()).thenAnswer(f -> new CompletableFuture<>());
+ when(holder.jobFuture()).thenReturn(jobFuture);
+ when(jobFuture.cancel(anyBoolean())).thenAnswer(f -> {
+ cancelledCount.incrementAndGet();
+ return true;
+ });
+
+ Map<TopicPartition, ListOffsetsPartitionStatus> statusByPartition =
Map.of(
+ new TopicPartition("test", 0),
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
+ new TopicPartition("test", 1),
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
+ new TopicPartition("test1", 0),
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build()
+ );
+
+ DelayedRemoteListOffsets delayedRemoteListOffsets = new
DelayedRemoteListOffsets(delayMs, 5, statusByPartition, partitionOrException,
responseCallback);
+ List<TopicPartitionOperationKey> listOffsetsRequestKeys =
statusByPartition.keySet().stream().map(TopicPartitionOperationKey::new).toList();
+ assertEquals(0,
DelayedRemoteListOffsets.AGGREGATE_EXPIRATION_METER.count());
+ assertEquals(0,
DelayedRemoteListOffsets.PARTITION_EXPIRATION_METERS.size());
+ purgatory.tryCompleteElseWatch(delayedRemoteListOffsets,
listOffsetsRequestKeys);
+
+ Thread.sleep(100);
+ assertEquals(3, listOffsetsRequestKeys.size());
+ assertEquals(cancelledCount.get(), listOffsetsRequestKeys.size());
+ assertEquals(numResponse.get(), listOffsetsRequestKeys.size());
+ assertEquals(listOffsetsRequestKeys.size(),
DelayedRemoteListOffsets.AGGREGATE_EXPIRATION_METER.count());
+ listOffsetsRequestKeys.forEach(key -> {
+ TopicPartition tp = new TopicPartition(key.topic, key.partition);
+ assertEquals(1,
DelayedRemoteListOffsets.PARTITION_EXPIRATION_METERS.get(tp).count());
+ });
+ }
+
+ @Test
+ public void testResponseOnSuccess() {
+ AtomicInteger numResponse = new AtomicInteger(0);
+ Consumer<Collection<ListOffsetsResponseData.ListOffsetsTopicResponse>>
responseCallback = response ->
+ response.forEach(topic ->
+ topic.partitions().forEach(partition -> {
+ assertEquals(Errors.NONE.code(), partition.errorCode());
+ assertEquals(100L, partition.timestamp());
+ assertEquals(100L, partition.offset());
+ assertEquals(50, partition.leaderEpoch());
+ numResponse.incrementAndGet();
+ })
+ );
+
+ FileRecords.TimestampAndOffset timestampAndOffset = new
FileRecords.TimestampAndOffset(100L, 100L, Optional.of(50));
+ CompletableFuture<OffsetResultHolder.FileRecordsOrError> taskFuture =
new CompletableFuture<>();
+ taskFuture.complete(new
OffsetResultHolder.FileRecordsOrError(Optional.empty(),
Optional.of(timestampAndOffset)));
+
+ AtomicInteger cancelledCount = new AtomicInteger(0);
+ CompletableFuture<Void> jobFuture = mock(CompletableFuture.class);
+ AsyncOffsetReadFutureHolder<OffsetResultHolder.FileRecordsOrError>
holder = mock(AsyncOffsetReadFutureHolder.class);
+ when(holder.taskFuture()).thenAnswer(f -> taskFuture);
+ when(holder.jobFuture()).thenReturn(jobFuture);
+ when(jobFuture.cancel(anyBoolean())).thenAnswer(f -> {
+ cancelledCount.incrementAndGet();
+ return true;
+ });
+
+ Map<TopicPartition, ListOffsetsPartitionStatus> statusByPartition =
Map.of(
+ new TopicPartition("test", 0),
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
+ new TopicPartition("test", 1),
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
+ new TopicPartition("test1", 0),
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build()
+ );
+
+ DelayedRemoteListOffsets delayedRemoteListOffsets = new
DelayedRemoteListOffsets(delayMs, 5, statusByPartition, partitionOrException,
responseCallback);
+ List<TopicPartitionOperationKey> listOffsetsRequestKeys =
statusByPartition.keySet().stream().map(TopicPartitionOperationKey::new).toList();
+ purgatory.tryCompleteElseWatch(delayedRemoteListOffsets,
listOffsetsRequestKeys);
+
+ assertEquals(0, cancelledCount.get());
+ assertEquals(numResponse.get(), listOffsetsRequestKeys.size());
+ }
+
+ @Test
+ public void testResponseOnPartialError() {
+ AtomicInteger numResponse = new AtomicInteger(0);
+ Consumer<Collection<ListOffsetsResponseData.ListOffsetsTopicResponse>>
responseCallback = response ->
+ response.forEach(topic ->
+ topic.partitions().forEach(partition -> {
+ if (topic.name().equals("test1")) {
+ assertEquals(Errors.UNKNOWN_SERVER_ERROR.code(),
partition.errorCode());
+ assertEquals(ListOffsetsResponse.UNKNOWN_TIMESTAMP,
partition.timestamp());
+ assertEquals(ListOffsetsResponse.UNKNOWN_OFFSET,
partition.offset());
+ assertEquals(-1, partition.leaderEpoch());
+ } else {
+ assertEquals(Errors.NONE.code(),
partition.errorCode());
+ assertEquals(100L, partition.timestamp());
+ assertEquals(100L, partition.offset());
+ assertEquals(50, partition.leaderEpoch());
+ }
+ numResponse.incrementAndGet();
+ })
+ );
+
+ FileRecords.TimestampAndOffset timestampAndOffset = new
FileRecords.TimestampAndOffset(100L, 100L, Optional.of(50));
+ CompletableFuture<OffsetResultHolder.FileRecordsOrError> taskFuture =
new CompletableFuture<>();
+ taskFuture.complete(new
OffsetResultHolder.FileRecordsOrError(Optional.empty(),
Optional.of(timestampAndOffset)));
+
+ AtomicInteger cancelledCount = new AtomicInteger(0);
+ CompletableFuture<Void> jobFuture = mock(CompletableFuture.class);
+ AsyncOffsetReadFutureHolder<OffsetResultHolder.FileRecordsOrError>
holder = mock(AsyncOffsetReadFutureHolder.class);
+ when(holder.taskFuture()).thenAnswer(f -> taskFuture);
+ when(holder.jobFuture()).thenReturn(jobFuture);
+ when(jobFuture.cancel(anyBoolean())).thenAnswer(f -> {
+ cancelledCount.incrementAndGet();
+ return true;
+ });
+
+ AsyncOffsetReadFutureHolder<OffsetResultHolder.FileRecordsOrError>
errorFutureHolder = mock(AsyncOffsetReadFutureHolder.class);
+ CompletableFuture<OffsetResultHolder.FileRecordsOrError>
errorTaskFuture = new CompletableFuture<>();
+ errorTaskFuture.complete(new
OffsetResultHolder.FileRecordsOrError(Optional.of(new TimeoutException("Timed
out!")), Optional.empty()));
+ when(errorFutureHolder.taskFuture()).thenAnswer(f -> errorTaskFuture);
+ when(errorFutureHolder.jobFuture()).thenReturn(jobFuture);
+
+ Map<TopicPartition, ListOffsetsPartitionStatus> statusByPartition =
Map.of(
+ new TopicPartition("test", 0),
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
+ new TopicPartition("test", 1),
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
+ new TopicPartition("test1", 0),
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(errorFutureHolder)).build()
+ );
+
+ DelayedRemoteListOffsets delayedRemoteListOffsets = new
DelayedRemoteListOffsets(delayMs, 5, statusByPartition, partitionOrException,
responseCallback);
+ List<TopicPartitionOperationKey> listOffsetsRequestKeys =
statusByPartition.keySet().stream().map(TopicPartitionOperationKey::new).toList();
+ purgatory.tryCompleteElseWatch(delayedRemoteListOffsets,
listOffsetsRequestKeys);
+
+ assertEquals(0, cancelledCount.get());
+ assertEquals(numResponse.get(), listOffsetsRequestKeys.size());
+ }
+
+ @Test
+ public void
testPartialResponseWhenNotLeaderOrFollowerExceptionOnOnePartition() {
+ AtomicInteger numResponse = new AtomicInteger(0);
+ Consumer<Collection<ListOffsetsResponseData.ListOffsetsTopicResponse>>
responseCallback = response ->
+ response.forEach(topic ->
+ topic.partitions().forEach(partition -> {
+ if (topic.name().equals("test1") &&
partition.partitionIndex() == 0) {
+ assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code(),
partition.errorCode());
+ assertEquals(ListOffsetsResponse.UNKNOWN_TIMESTAMP,
partition.timestamp());
+ assertEquals(ListOffsetsResponse.UNKNOWN_OFFSET,
partition.offset());
+ assertEquals(-1, partition.leaderEpoch());
+ } else {
+ assertEquals(Errors.NONE.code(),
partition.errorCode());
+ assertEquals(100L, partition.timestamp());
+ assertEquals(100L, partition.offset());
+ assertEquals(50, partition.leaderEpoch());
+ }
+ numResponse.incrementAndGet();
+ })
+ );
+
+ FileRecords.TimestampAndOffset timestampAndOffset = new
FileRecords.TimestampAndOffset(100L, 100L, Optional.of(50));
+ CompletableFuture<OffsetResultHolder.FileRecordsOrError> taskFuture =
new CompletableFuture<>();
+ taskFuture.complete(new
OffsetResultHolder.FileRecordsOrError(Optional.empty(),
Optional.of(timestampAndOffset)));
+
+ AtomicInteger cancelledCount = new AtomicInteger(0);
+ CompletableFuture<Void> jobFuture = mock(CompletableFuture.class);
+ AsyncOffsetReadFutureHolder<OffsetResultHolder.FileRecordsOrError>
holder = mock(AsyncOffsetReadFutureHolder.class);
+ when(holder.taskFuture()).thenAnswer(f -> taskFuture);
+ when(holder.jobFuture()).thenReturn(jobFuture);
+ when(jobFuture.cancel(anyBoolean())).thenAnswer(f -> {
+ cancelledCount.incrementAndGet();
+ return true;
+ });
+
+ doThrow(new NotLeaderOrFollowerException("Not leader or follower!"))
+ .when(partitionOrException).accept(new TopicPartition("test1",
0));
+ AsyncOffsetReadFutureHolder<OffsetResultHolder.FileRecordsOrError>
errorFutureHolder = mock(AsyncOffsetReadFutureHolder.class);
+ CompletableFuture<OffsetResultHolder.FileRecordsOrError>
errorTaskFuture = new CompletableFuture<>();
+ when(errorFutureHolder.taskFuture()).thenAnswer(f -> errorTaskFuture);
+ when(errorFutureHolder.jobFuture()).thenReturn(jobFuture);
+
+ Map<TopicPartition, ListOffsetsPartitionStatus> statusByPartition =
Map.of(
+ new TopicPartition("test", 0),
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
+ new TopicPartition("test", 1),
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
+ new TopicPartition("test1", 0),
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(errorFutureHolder)).build(),
+ new TopicPartition("test1", 1),
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build()
+ );
+
+ DelayedRemoteListOffsets delayedRemoteListOffsets = new
DelayedRemoteListOffsets(delayMs, 5, statusByPartition, partitionOrException,
responseCallback);
+ List<TopicPartitionOperationKey> listOffsetsRequestKeys =
statusByPartition.keySet().stream().map(TopicPartitionOperationKey::new).toList();
+ purgatory.tryCompleteElseWatch(delayedRemoteListOffsets,
listOffsetsRequestKeys);
+
+ assertEquals(1, cancelledCount.get());
+ assertEquals(numResponse.get(), listOffsetsRequestKeys.size());
+ }
+}