dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r715684131
##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -672,29 +672,22 @@ class KafkaApis(val requestChannel: RequestChannel,
val versionId = request.header.apiVersion
val clientId = request.header.clientId
val fetchRequest = request.body[FetchRequest]
- val (topicIds, topicNames) =
+ val topicNames =
if (fetchRequest.version() >= 13)
- metadataCache.topicIdInfo()
+ metadataCache.topicIdsToNames()
else
- (Collections.emptyMap[String, Uuid](), Collections.emptyMap[Uuid,
String]())
+ Collections.emptyMap[Uuid, String]()
- // If fetchData or forgottenTopics contain an unknown topic ID, return a
top level error.
- var fetchData: util.Map[TopicPartition, FetchRequest.PartitionData] = null
- var forgottenTopics: util.List[TopicPartition] = null
- try {
- fetchData = fetchRequest.fetchData(topicNames)
- forgottenTopics = fetchRequest.forgottenTopics(topicNames)
- } catch {
- case e: UnknownTopicIdException => throw e
- }
+ val fetchData = fetchRequest.fetchData(topicNames)
+ val forgottenTopics = fetchRequest.forgottenTopics(topicNames)
Review comment:
When a session is used, resolving the topic ids is not really necessary
here because we should already have the names in the session or we would
resolve them later anyway. I wonder if it would be better to do this entirely
in the `fetchManager.newConext` based on the context type. Have you considered
something like this?
##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -708,40 +701,45 @@ class KafkaApis(val requestChannel: RequestChannel,
None
}
- val erroneous = mutable.ArrayBuffer[(TopicPartition,
FetchResponseData.PartitionData)]()
- val interesting = mutable.ArrayBuffer[(TopicPartition,
FetchRequest.PartitionData)]()
+ val erroneous = mutable.ArrayBuffer[(TopicIdPartition,
FetchResponseData.PartitionData)]()
+ val interesting = mutable.ArrayBuffer[(TopicIdPartition,
FetchRequest.PartitionData)]()
val sessionTopicIds = mutable.Map[String, Uuid]()
if (fetchRequest.isFromFollower) {
// The follower must have ClusterAction on ClusterResource in order to
fetch partition data.
if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER,
CLUSTER_NAME)) {
- fetchContext.foreachPartition { (topicPartition, topicId, data) =>
- sessionTopicIds.put(topicPartition.topic(), topicId)
- if (!metadataCache.contains(topicPartition))
- erroneous += topicPartition ->
FetchResponse.partitionResponse(topicPartition.partition,
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ fetchContext.foreachPartition { (topicIdPartition, data) =>
+ sessionTopicIds.put(topicIdPartition.topicPartition.topic,
topicIdPartition.topicId)
Review comment:
Do we still need this `sessionTopicIds` mapping if we have the topic id
in the `topicIdPartition`?
##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -708,40 +701,45 @@ class KafkaApis(val requestChannel: RequestChannel,
None
}
- val erroneous = mutable.ArrayBuffer[(TopicPartition,
FetchResponseData.PartitionData)]()
- val interesting = mutable.ArrayBuffer[(TopicPartition,
FetchRequest.PartitionData)]()
+ val erroneous = mutable.ArrayBuffer[(TopicIdPartition,
FetchResponseData.PartitionData)]()
+ val interesting = mutable.ArrayBuffer[(TopicIdPartition,
FetchRequest.PartitionData)]()
val sessionTopicIds = mutable.Map[String, Uuid]()
if (fetchRequest.isFromFollower) {
// The follower must have ClusterAction on ClusterResource in order to
fetch partition data.
if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER,
CLUSTER_NAME)) {
- fetchContext.foreachPartition { (topicPartition, topicId, data) =>
- sessionTopicIds.put(topicPartition.topic(), topicId)
- if (!metadataCache.contains(topicPartition))
- erroneous += topicPartition ->
FetchResponse.partitionResponse(topicPartition.partition,
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ fetchContext.foreachPartition { (topicIdPartition, data) =>
+ sessionTopicIds.put(topicIdPartition.topicPartition.topic,
topicIdPartition.topicId)
+ if (topicIdPartition.topicPartition.topic == null )
+ erroneous += topicIdPartition ->
FetchResponse.partitionResponse(topicIdPartition.topicPartition.partition,
Errors.UNKNOWN_TOPIC_ID)
+ else if (!metadataCache.contains(topicIdPartition.topicPartition))
+ erroneous += topicIdPartition ->
FetchResponse.partitionResponse(topicIdPartition.topicPartition.partition,
Errors.UNKNOWN_TOPIC_OR_PARTITION)
else
- interesting += (topicPartition -> data)
+ interesting += (topicIdPartition -> data)
}
} else {
- fetchContext.foreachPartition { (part, topicId, _) =>
- sessionTopicIds.put(part.topic(), topicId)
- erroneous += part -> FetchResponse.partitionResponse(part.partition,
Errors.TOPIC_AUTHORIZATION_FAILED)
+ fetchContext.foreachPartition { (topicIdPartition, _) =>
+ sessionTopicIds.put(topicIdPartition.topicPartition.topic,
topicIdPartition.topicId)
+ erroneous += topicIdPartition ->
FetchResponse.partitionResponse(topicIdPartition.topicPartition.partition,
Errors.TOPIC_AUTHORIZATION_FAILED)
}
}
} else {
// Regular Kafka consumers need READ permission on each partition they
are fetching.
- val partitionDatas = new mutable.ArrayBuffer[(TopicPartition,
FetchRequest.PartitionData)]
- fetchContext.foreachPartition { (topicPartition, topicId, partitionData)
=>
- partitionDatas += topicPartition -> partitionData
- sessionTopicIds.put(topicPartition.topic(), topicId)
- }
- val authorizedTopics = authHelper.filterByAuthorized(request.context,
READ, TOPIC, partitionDatas)(_._1.topic)
- partitionDatas.foreach { case (topicPartition, data) =>
- if (!authorizedTopics.contains(topicPartition.topic))
- erroneous += topicPartition ->
FetchResponse.partitionResponse(topicPartition.partition,
Errors.TOPIC_AUTHORIZATION_FAILED)
- else if (!metadataCache.contains(topicPartition))
- erroneous += topicPartition ->
FetchResponse.partitionResponse(topicPartition.partition,
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ val partitionDatas = new mutable.ArrayBuffer[(TopicIdPartition,
FetchRequest.PartitionData)]
+ fetchContext.foreachPartition { (topicIdPartition, partitionData) =>
+ partitionDatas += topicIdPartition -> partitionData
+ sessionTopicIds.put(topicIdPartition.topicPartition.topic(),
topicIdPartition.topicId)
Review comment:
Could we direclty check if the topic name is null here and put the
unresolved ones to `erroneous`? This would avoid the filter on the next line.
##########
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##########
@@ -233,7 +233,8 @@ class ReplicaFetcherThread(name: String,
Map.empty
}
} else {
- fetchResponse.responseData(fetchSessionHandler.sessionTopicNames,
clientResponse.requestHeader().apiVersion()).asScala
+ fetchResponse.responseData(fetchSessionHandler.sessionTopicNames,
clientResponse.requestHeader().apiVersion()).asScala.map {
Review comment:
Not related to this line. Don't wee need to update the fetcher to handle
the topic id errors at the partition level?
##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -163,7 +173,8 @@ class CachedPartition(val topic: String,
mustRespond
}
- override def hashCode: Int = Objects.hash(new TopicPartition(topic,
partition), topicId)
+ override def hashCode: Int = if (topicId != Uuid.ZERO_UUID) (31 * partition)
+ topicId.hashCode else
Review comment:
Could we add a scaladoc for this method which explains what we do and
why?
##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -378,53 +378,47 @@ class SessionlessFetchContext(val fetchData:
util.Map[TopicPartition, FetchReque
* @param reqMetadata The request metadata.
* @param fetchData The partition data from the fetch request.
* @param usesTopicIds True if this session should use topic IDs.
- * @param topicIds The map from topic names to topic IDs.
* @param isFromFollower True if this fetch request came from a follower.
*/
class FullFetchContext(private val time: Time,
private val cache: FetchSessionCache,
private val reqMetadata: JFetchMetadata,
- private val fetchData: util.Map[TopicPartition,
FetchRequest.PartitionData],
+ private val fetchData: util.Map[TopicIdPartition,
FetchRequest.PartitionData],
private val usesTopicIds: Boolean,
- private val topicIds: util.Map[String, Uuid],
private val isFromFollower: Boolean) extends
FetchContext {
- override def getFetchOffset(part: TopicPartition): Option[Long] =
+ override def getFetchOffset(part: TopicIdPartition): Option[Long] =
Option(fetchData.get(part)).map(_.fetchOffset)
- override def foreachPartition(fun: (TopicPartition, Uuid,
FetchRequest.PartitionData) => Unit): Unit = {
- fetchData.forEach((tp, data) => fun(tp, topicIds.get(tp.topic), data))
+ override def foreachPartition(fun: (TopicIdPartition,
FetchRequest.PartitionData) => Unit): Unit = {
+ fetchData.forEach((tp, data) => fun(tp, data))
}
override def getResponseSize(updates: FetchSession.RESP_MAP, versionId:
Short): Int = {
- FetchResponse.sizeOf(versionId, updates.entrySet.iterator, topicIds)
+ FetchResponse.sizeOf(versionId, updates.entrySet.iterator)
}
override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP):
FetchResponse = {
var hasInconsistentTopicIds = false
- def createNewSession: (FetchSession.CACHE_MAP, FetchSession.TOPIC_ID_MAP)
= {
+ def createNewSession: FetchSession.CACHE_MAP = {
val cachedPartitions = new FetchSession.CACHE_MAP(updates.size)
- val sessionTopicIds = new util.HashMap[String, Uuid](updates.size)
updates.forEach { (part, respData) =>
if (respData.errorCode() == Errors.INCONSISTENT_TOPIC_ID.code()) {
info(s"Session encountered an inconsistent topic ID for
topicPartition $part.")
hasInconsistentTopicIds = true
}
val reqData = fetchData.get(part)
- val id = topicIds.getOrDefault(part.topic(), Uuid.ZERO_UUID)
- cachedPartitions.mustAdd(new CachedPartition(part, id, reqData,
respData))
- if (id != Uuid.ZERO_UUID)
- sessionTopicIds.put(part.topic, id)
+ cachedPartitions.mustAdd(new CachedPartition(part.topicPartition,
part.topicId, reqData, respData))
}
- (cachedPartitions, sessionTopicIds)
+ cachedPartitions
}
val responseSessionId = cache.maybeCreateSession(time.milliseconds(),
isFromFollower,
updates.size, usesTopicIds, () => createNewSession)
if (hasInconsistentTopicIds) {
- FetchResponse.of(Errors.INCONSISTENT_TOPIC_ID, 0, responseSessionId, new
FetchSession.RESP_MAP, Collections.emptyMap())
+ FetchResponse.of(Errors.INCONSISTENT_TOPIC_ID, 0, responseSessionId, new
FetchSession.RESP_MAP)
Review comment:
Do we still need to return `INCONSISTENT_TOPIC_ID` a top level error?
Fetcher prior to this change would need it, for sure. With this PR, we actually
don't want the fetcher to treat it as a top level error but rather as a
partition error. We need to think/discuss this a little more, I think.
##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -238,47 +247,40 @@ class FetchSession(val id: Int,
def metadata: JFetchMetadata = synchronized { new JFetchMetadata(id, epoch) }
- def getFetchOffset(topicPartition: TopicPartition): Option[Long] =
synchronized {
- Option(partitionMap.find(new CachedPartition(topicPartition,
- sessionTopicIds.getOrDefault(topicPartition.topic(),
Uuid.ZERO_UUID)))).map(_.fetchOffset)
+ def getFetchOffset(topicIdPartition: TopicIdPartition): Option[Long] =
synchronized {
+ Option(partitionMap.find(new
CachedPartition(topicIdPartition.topicPartition,
topicIdPartition.topicId))).map(_.fetchOffset)
}
- type TL = util.ArrayList[TopicPartition]
+ type TL = util.ArrayList[TopicIdPartition]
// Update the cached partition data based on the request.
def update(fetchData: FetchSession.REQ_MAP,
- toForget: util.List[TopicPartition],
+ toForget: util.List[TopicIdPartition],
reqMetadata: JFetchMetadata,
- topicIds: util.Map[String, Uuid]): (TL, TL, TL, TL) =
synchronized {
+ usesTopicIds: Boolean): (TL, TL, TL) = synchronized {
val added = new TL
val updated = new TL
val removed = new TL
- val inconsistentTopicIds = new TL
fetchData.forEach { (topicPart, reqData) =>
- // Get the topic ID on the broker, if it is valid and the topic is new
to the session, add its ID.
- // If the topic already existed, check that its ID is consistent.
- val id = topicIds.getOrDefault(topicPart.topic, Uuid.ZERO_UUID)
- val newCachedPart = new CachedPartition(topicPart, id, reqData)
- if (id != Uuid.ZERO_UUID) {
- val prevSessionTopicId = sessionTopicIds.putIfAbsent(topicPart.topic,
id)
- if (prevSessionTopicId != null && prevSessionTopicId != id)
- inconsistentTopicIds.add(topicPart)
- }
+ val newCachedPart = new CachedPartition(topicPart.topicPartition,
topicPart.topicId, reqData)
val cachedPart = partitionMap.find(newCachedPart)
if (cachedPart == null) {
partitionMap.mustAdd(newCachedPart)
added.add(topicPart)
} else {
cachedPart.updateRequestParams(reqData)
+ if (cachedPart.topic == null)
+ // Update the topic name in place
+ cachedPart.resolveUnknownName(topicPart.topicPartition.topic)
Review comment:
This might not be necessary if we won't resolve topic ids in the request
in all cases (see my previous comment).
##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -708,40 +701,45 @@ class KafkaApis(val requestChannel: RequestChannel,
None
}
- val erroneous = mutable.ArrayBuffer[(TopicPartition,
FetchResponseData.PartitionData)]()
- val interesting = mutable.ArrayBuffer[(TopicPartition,
FetchRequest.PartitionData)]()
+ val erroneous = mutable.ArrayBuffer[(TopicIdPartition,
FetchResponseData.PartitionData)]()
+ val interesting = mutable.ArrayBuffer[(TopicIdPartition,
FetchRequest.PartitionData)]()
val sessionTopicIds = mutable.Map[String, Uuid]()
if (fetchRequest.isFromFollower) {
// The follower must have ClusterAction on ClusterResource in order to
fetch partition data.
if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER,
CLUSTER_NAME)) {
- fetchContext.foreachPartition { (topicPartition, topicId, data) =>
- sessionTopicIds.put(topicPartition.topic(), topicId)
- if (!metadataCache.contains(topicPartition))
- erroneous += topicPartition ->
FetchResponse.partitionResponse(topicPartition.partition,
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ fetchContext.foreachPartition { (topicIdPartition, data) =>
+ sessionTopicIds.put(topicIdPartition.topicPartition.topic,
topicIdPartition.topicId)
+ if (topicIdPartition.topicPartition.topic == null )
+ erroneous += topicIdPartition ->
FetchResponse.partitionResponse(topicIdPartition.topicPartition.partition,
Errors.UNKNOWN_TOPIC_ID)
Review comment:
nit: Could we add an overload to `partitionResponse` which takes a
`TopicIdPartition`? This would reduce the boiler plate code a bit here.
##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -173,8 +184,8 @@ class CachedPartition(val topic: String,
this.eq(that) ||
(that.canEqual(this) &&
this.partition.equals(that.partition) &&
- this.topic.equals(that.topic) &&
- this.topicId.equals(that.topicId))
+ (if (this.topicId != Uuid.ZERO_UUID)
this.topicId.equals(that.topicId)
+ else this.topic.equals(that.topic)))
Review comment:
Side note here: I think that we should implement `override def
elementKeysAreEqual(that: Any): Boolean` from the
`ImplicitLinkedHashCollection.Element` interface to make it clear that we do
this for comparing elements in the collections.
##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -870,12 +868,14 @@ class KafkaApis(val requestChannel: RequestChannel,
// Prepare fetch response from converted data
val response =
- FetchResponse.of(unconvertedFetchResponse.error, throttleTimeMs,
unconvertedFetchResponse.sessionId, convertedData, sessionTopicIds.asJava)
+ FetchResponse.of(unconvertedFetchResponse.error, throttleTimeMs,
unconvertedFetchResponse.sessionId, convertedData)
// record the bytes out metrics only when the response is being sent
response.data().responses().forEach { topicResponse =>
topicResponse.partitions().forEach { data =>
- val tp = new TopicPartition(topicResponse.topic(),
data.partitionIndex())
- brokerTopicStats.updateBytesOut(tp.topic,
fetchRequest.isFromFollower, reassigningPartitions.contains(tp),
FetchResponse.recordsSize(data))
+ val tp = new TopicIdPartition(topicResponse.topicId, new
TopicPartition(topicResponse.topic(), data.partitionIndex()))
+ // If the topic name was not known, we will have no bytes out.
+ if (tp.topicPartition.topic != null)
Review comment:
Should we create `tp` after this check? We could also create a
`TopicPartition` as we don't really use `TopicIdPartition` for the metric.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]