dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r741789472
##########
File path:
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -285,52 +268,57 @@ public FetchRequestData build() {
if (nextMetadata.isFull()) {
if (log.isDebugEnabled()) {
log.debug("Built full fetch {} for node {} with {}.",
- nextMetadata, node,
partitionsToLogString(next.keySet()));
+ nextMetadata, node,
topicPartitionsToLogString(next.keySet()));
}
sessionPartitions = next;
next = null;
+ Map<TopicPartition, PartitionData> toSend =
+ Collections.unmodifiableMap(new
LinkedHashMap<>(sessionPartitions));
// Only add topic IDs to the session if we are using topic IDs.
if (canUseTopicIds) {
- sessionTopicIds = topicIds;
- sessionTopicNames = new HashMap<>(topicIds.size());
- topicIds.forEach((name, id) -> sessionTopicNames.put(id,
name));
+ Map<Uuid, Set<String>> newTopicNames =
sessionPartitions.entrySet().stream().collect(Collectors.groupingByConcurrent(entry
-> entry.getValue().topicId,
+ Collectors.mapping(entry ->
entry.getKey().topic(), Collectors.toSet())));
Review comment:
Could we iterate over `sessionPartitions` and directly populate
`sessionTopicNames` by using `putIfAbsent` or even `put`? The grouping seems
unnecessary to me here unless I am missing something.
##########
File path:
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -285,52 +268,57 @@ public FetchRequestData build() {
if (nextMetadata.isFull()) {
if (log.isDebugEnabled()) {
log.debug("Built full fetch {} for node {} with {}.",
- nextMetadata, node,
partitionsToLogString(next.keySet()));
+ nextMetadata, node,
topicPartitionsToLogString(next.keySet()));
}
sessionPartitions = next;
next = null;
+ Map<TopicPartition, PartitionData> toSend =
+ Collections.unmodifiableMap(new
LinkedHashMap<>(sessionPartitions));
Review comment:
As `toSend` is not used before L288, how about putting this line over
there?
##########
File path:
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -346,38 +334,36 @@ public FetchRequestData build() {
break;
}
sessionPartitions.put(topicPartition, nextData);
- added.add(topicPartition);
+ added.add(new TopicIdPartition(nextData.topicId,
topicPartition));
}
// Add topic IDs to session if we can use them. If an ID is
inconsistent, we will handle in the receiving broker.
// If we switched from using topic IDs to not using them (or vice
versa), that error will also be handled in the receiving broker.
if (canUseTopicIds) {
- for (Map.Entry<String, Uuid> topic : topicIds.entrySet()) {
- String topicName = topic.getKey();
- Uuid addedId = topic.getValue();
- sessionTopicIds.put(topicName, addedId);
- sessionTopicNames.put(addedId, topicName);
- }
+ Map<Uuid, Set<String>> newTopicNames =
added.stream().collect(Collectors.groupingByConcurrent(TopicIdPartition::topicId,
+ Collectors.mapping(topicIdPartition ->
topicIdPartition.topicPartition().topic(), Collectors.toSet())));
+
+ // There should only be one topic name per topic ID.
+ newTopicNames.forEach((topicId, topicNamesSet) ->
topicNamesSet.forEach(topicName -> sessionTopicNames.put(topicId, topicName)));
}
if (log.isDebugEnabled()) {
- log.debug("Built incremental fetch {} for node {}. Added {},
altered {}, removed {} " +
- "out of {}", nextMetadata, node,
partitionsToLogString(added),
- partitionsToLogString(altered),
partitionsToLogString(removed),
- partitionsToLogString(sessionPartitions.keySet()));
+ log.debug("Built incremental fetch {} for node {}. Added {},
altered {}, removed {}, " +
+ "replaced {} out of {}", nextMetadata, node,
topicIdPartitionsToLogString(added),
Review comment:
nit: Could we align like it was before?
##########
File path:
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -346,38 +334,36 @@ public FetchRequestData build() {
break;
}
sessionPartitions.put(topicPartition, nextData);
- added.add(topicPartition);
+ added.add(new TopicIdPartition(nextData.topicId,
topicPartition));
}
// Add topic IDs to session if we can use them. If an ID is
inconsistent, we will handle in the receiving broker.
// If we switched from using topic IDs to not using them (or vice
versa), that error will also be handled in the receiving broker.
if (canUseTopicIds) {
- for (Map.Entry<String, Uuid> topic : topicIds.entrySet()) {
- String topicName = topic.getKey();
- Uuid addedId = topic.getValue();
- sessionTopicIds.put(topicName, addedId);
- sessionTopicNames.put(addedId, topicName);
- }
+ Map<Uuid, Set<String>> newTopicNames =
added.stream().collect(Collectors.groupingByConcurrent(TopicIdPartition::topicId,
Review comment:
Same comment as before.
##########
File path:
clients/src/main/java/org/apache/kafka/common/requests/FetchMetadata.java
##########
@@ -120,6 +120,13 @@ public FetchMetadata nextCloseExisting() {
return new FetchMetadata(sessionId, INITIAL_EPOCH);
}
+ /**
+ * Return the metadata for the next closed session response.
+ */
+ public FetchMetadata closeExisting() {
Review comment:
It seems that this method is not used anymore. Could we remove it?
##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -163,18 +173,35 @@ class CachedPartition(val topic: String,
mustRespond
}
- override def hashCode: Int = Objects.hash(new TopicPartition(topic,
partition), topicId)
+ /**
+ * We have different equality checks depending on whether topic IDs are used.
+ * This means we need a different hash function as well. We use name to
calculate the hash if the ID is zero and unused.
+ * Otherwise, we use the topic ID in the hash calculation.
+ *
+ * @return the hash code for the CachedPartition depending on what request
version we are using.
+ */
+ override def hashCode: Int = if (topicId != Uuid.ZERO_UUID) (31 * partition)
+ topicId.hashCode else
+ (31 * partition) + topic.hashCode
def canEqual(that: Any): Boolean = that.isInstanceOf[CachedPartition]
+ /**
+ * We have different equality checks depending on whether topic IDs are used.
+ *
+ * This is because when we use topic IDs, a partition with a given ID and an
unknown name is the same as a partition with that
+ * ID and a known name. This means we can only use topic ID and partition
when determining equality.
+ *
+ * On the other hand, if we are using topic names, all IDs are zero. This
means we can only use topic name and partition
+ * when determining equality.
+ */
override def equals(that: Any): Boolean =
that match {
case that: CachedPartition =>
this.eq(that) ||
(that.canEqual(this) &&
Review comment:
`that.canEqual(this)` seems weird to me. It seems that we could just
remove it.
##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -238,47 +263,40 @@ class FetchSession(val id: Int,
def metadata: JFetchMetadata = synchronized { new JFetchMetadata(id, epoch) }
- def getFetchOffset(topicPartition: TopicPartition): Option[Long] =
synchronized {
- Option(partitionMap.find(new CachedPartition(topicPartition,
- sessionTopicIds.getOrDefault(topicPartition.topic(),
Uuid.ZERO_UUID)))).map(_.fetchOffset)
+ def getFetchOffset(topicIdPartition: TopicIdPartition): Option[Long] =
synchronized {
+ Option(partitionMap.find(new
CachedPartition(topicIdPartition.topicPartition,
topicIdPartition.topicId))).map(_.fetchOffset)
Review comment:
nit: We could add another constructor which takes a `TopicIdPartition`.
##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -708,40 +701,41 @@ class KafkaApis(val requestChannel: RequestChannel,
None
}
- val erroneous = mutable.ArrayBuffer[(TopicPartition,
FetchResponseData.PartitionData)]()
- val interesting = mutable.ArrayBuffer[(TopicPartition,
FetchRequest.PartitionData)]()
- val sessionTopicIds = mutable.Map[String, Uuid]()
+ val erroneous = mutable.ArrayBuffer[(TopicIdPartition,
FetchResponseData.PartitionData)]()
+ val interesting = mutable.ArrayBuffer[(TopicIdPartition,
FetchRequest.PartitionData)]()
if (fetchRequest.isFromFollower) {
// The follower must have ClusterAction on ClusterResource in order to
fetch partition data.
if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER,
CLUSTER_NAME)) {
- fetchContext.foreachPartition { (topicPartition, topicId, data) =>
- sessionTopicIds.put(topicPartition.topic(), topicId)
- if (!metadataCache.contains(topicPartition))
- erroneous += topicPartition ->
FetchResponse.partitionResponse(topicPartition.partition,
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ fetchContext.foreachPartition { (topicIdPartition, data) =>
+ if (topicIdPartition.topicPartition.topic == null )
+ erroneous += topicIdPartition ->
FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)
Review comment:
nit: There is an extra space after `== null`
##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -870,12 +864,15 @@ class KafkaApis(val requestChannel: RequestChannel,
// Prepare fetch response from converted data
val response =
- FetchResponse.of(unconvertedFetchResponse.error, throttleTimeMs,
unconvertedFetchResponse.sessionId, convertedData, sessionTopicIds.asJava)
+ FetchResponse.of(unconvertedFetchResponse.error, throttleTimeMs,
unconvertedFetchResponse.sessionId, convertedData)
// record the bytes out metrics only when the response is being sent
- response.data().responses().forEach { topicResponse =>
- topicResponse.partitions().forEach { data =>
- val tp = new TopicPartition(topicResponse.topic(),
data.partitionIndex())
- brokerTopicStats.updateBytesOut(tp.topic,
fetchRequest.isFromFollower, reassigningPartitions.contains(tp),
FetchResponse.recordsSize(data))
+ response.data.responses.forEach { topicResponse =>
+ topicResponse.partitions.forEach { data =>
+ // If the topic name was not known, we will have no bytes out.
+ if (topicResponse.topic != null) {
+ val tp = new TopicIdPartition(topicResponse.topicId, new
TopicPartition(topicResponse.topic, data.partitionIndex()))
Review comment:
nit: Parenthesis after `partitionIndex` could be omitted.
##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -3497,14 +3493,13 @@ object KafkaApis {
// TODO: remove resolvedResponseData method when sizeOf can take a data
object.
private[server] def sizeOfThrottledPartitions(versionId: Short,
unconvertedResponse:
FetchResponse,
- quota: ReplicationQuotaManager,
- topicIds: util.Map[String,
Uuid]): Int = {
- val responseData = new util.LinkedHashMap[TopicPartition,
FetchResponseData.PartitionData]
+ quota:
ReplicationQuotaManager): Int = {
+ val responseData = new util.LinkedHashMap[TopicIdPartition,
FetchResponseData.PartitionData]
unconvertedResponse.data.responses().forEach(topicResponse =>
topicResponse.partitions().forEach(partition =>
- responseData.put(new TopicPartition(topicResponse.topic(),
partition.partitionIndex()), partition)))
+ responseData.put(new TopicIdPartition(topicResponse.topicId, new
TopicPartition(topicResponse.topic(), partition.partitionIndex())), partition)))
Review comment:
nit: Parenthesis after partitionIndex could be omitted.
##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1186,7 +1174,7 @@ class ReplicaManager(val config: KafkaConfig,
lastStableOffset = None,
exception = Some(e))
case e: Throwable =>
- brokerTopicStats.topicStats(tp.topic).failedFetchRequestRate.mark()
+
brokerTopicStats.topicStats(tp.topicPartition.topic).failedFetchRequestRate.mark()
Review comment:
nit: `tp.topic`
##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -708,40 +701,41 @@ class KafkaApis(val requestChannel: RequestChannel,
None
}
- val erroneous = mutable.ArrayBuffer[(TopicPartition,
FetchResponseData.PartitionData)]()
- val interesting = mutable.ArrayBuffer[(TopicPartition,
FetchRequest.PartitionData)]()
- val sessionTopicIds = mutable.Map[String, Uuid]()
+ val erroneous = mutable.ArrayBuffer[(TopicIdPartition,
FetchResponseData.PartitionData)]()
+ val interesting = mutable.ArrayBuffer[(TopicIdPartition,
FetchRequest.PartitionData)]()
if (fetchRequest.isFromFollower) {
// The follower must have ClusterAction on ClusterResource in order to
fetch partition data.
if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER,
CLUSTER_NAME)) {
- fetchContext.foreachPartition { (topicPartition, topicId, data) =>
- sessionTopicIds.put(topicPartition.topic(), topicId)
- if (!metadataCache.contains(topicPartition))
- erroneous += topicPartition ->
FetchResponse.partitionResponse(topicPartition.partition,
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ fetchContext.foreachPartition { (topicIdPartition, data) =>
+ if (topicIdPartition.topicPartition.topic == null )
+ erroneous += topicIdPartition ->
FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)
+ else if (!metadataCache.contains(topicIdPartition.topicPartition))
+ erroneous += topicIdPartition ->
FetchResponse.partitionResponse(topicIdPartition,
Errors.UNKNOWN_TOPIC_OR_PARTITION)
else
- interesting += (topicPartition -> data)
+ interesting += (topicIdPartition -> data)
}
} else {
- fetchContext.foreachPartition { (part, topicId, _) =>
- sessionTopicIds.put(part.topic(), topicId)
- erroneous += part -> FetchResponse.partitionResponse(part.partition,
Errors.TOPIC_AUTHORIZATION_FAILED)
+ fetchContext.foreachPartition { (topicIdPartition, _) =>
+ erroneous += topicIdPartition ->
FetchResponse.partitionResponse(topicIdPartition,
Errors.TOPIC_AUTHORIZATION_FAILED)
}
}
} else {
// Regular Kafka consumers need READ permission on each partition they
are fetching.
- val partitionDatas = new mutable.ArrayBuffer[(TopicPartition,
FetchRequest.PartitionData)]
- fetchContext.foreachPartition { (topicPartition, topicId, partitionData)
=>
- partitionDatas += topicPartition -> partitionData
- sessionTopicIds.put(topicPartition.topic(), topicId)
- }
- val authorizedTopics = authHelper.filterByAuthorized(request.context,
READ, TOPIC, partitionDatas)(_._1.topic)
- partitionDatas.foreach { case (topicPartition, data) =>
- if (!authorizedTopics.contains(topicPartition.topic))
- erroneous += topicPartition ->
FetchResponse.partitionResponse(topicPartition.partition,
Errors.TOPIC_AUTHORIZATION_FAILED)
- else if (!metadataCache.contains(topicPartition))
- erroneous += topicPartition ->
FetchResponse.partitionResponse(topicPartition.partition,
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ val partitionDatas = new mutable.ArrayBuffer[(TopicIdPartition,
FetchRequest.PartitionData)]
+ fetchContext.foreachPartition { (topicIdPartition, partitionData) =>
+ if (topicIdPartition.topicPartition.topic == null)
+ erroneous += topicIdPartition ->
FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)
+ else
+ partitionDatas += topicIdPartition -> partitionData
+ }
+ val authorizedTopics = authHelper.filterByAuthorized(request.context,
READ, TOPIC, partitionDatas)(_._1.topicPartition.topic)
+ partitionDatas.foreach { case (topicIdPartition, data) =>
+ if (!authorizedTopics.contains(topicIdPartition.topicPartition.topic))
+ erroneous += topicIdPartition ->
FetchResponse.partitionResponse(topicIdPartition,
Errors.TOPIC_AUTHORIZATION_FAILED)
+ else if (!metadataCache.contains(topicIdPartition.topicPartition))
+ erroneous += topicIdPartition ->
FetchResponse.partitionResponse(topicIdPartition,
Errors.UNKNOWN_TOPIC_OR_PARTITION)
else
- interesting += (topicPartition -> data)
+ interesting += (topicIdPartition -> data)
Review comment:
nit: We can remove the parenthesis here.
##########
File path: core/src/main/scala/kafka/server/DelayedFetch.scala
##########
@@ -92,7 +92,7 @@ class DelayedFetch(delayMs: Long,
val fetchLeaderEpoch = fetchStatus.fetchInfo.currentLeaderEpoch
try {
if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
- val partition =
replicaManager.getPartitionOrException(topicPartition)
+ val partition =
replicaManager.getPartitionOrException(topicPartition.topicPartition)
Review comment:
Yeah, that would be great. `topicPartition.topicPartition` looks really
weird while reading.
##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -163,18 +173,35 @@ class CachedPartition(val topic: String,
mustRespond
}
- override def hashCode: Int = Objects.hash(new TopicPartition(topic,
partition), topicId)
+ /**
+ * We have different equality checks depending on whether topic IDs are used.
+ * This means we need a different hash function as well. We use name to
calculate the hash if the ID is zero and unused.
+ * Otherwise, we use the topic ID in the hash calculation.
+ *
+ * @return the hash code for the CachedPartition depending on what request
version we are using.
+ */
+ override def hashCode: Int = if (topicId != Uuid.ZERO_UUID) (31 * partition)
+ topicId.hashCode else
+ (31 * partition) + topic.hashCode
Review comment:
nit: Should we format the code as follow?
```
override def hashCode: Int = {
if (topicId != Uuid.ZERO_UUID)
(31 * partition) + topicId.hashCode
else
(31 * partition) + topic.hashCode
}
```
##########
File path:
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -824,6 +823,14 @@ static boolean
hasUsableOffsetForLeaderEpochVersion(NodeApiVersions nodeApiVersi
return
OffsetsForLeaderEpochRequest.supportsTopicPermission(apiVersion.maxVersion());
}
+ static boolean hasUsableTopicIdFetchRequestVersion(NodeApiVersions
nodeApiVersions) {
Review comment:
Is this method still used? I can't find any usages of it.
##########
File path:
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -364,10 +405,7 @@ public int maxBytes() {
} else {
name = topicNames.get(forgottenTopic.topicId());
}
- if (name == null) {
- throw new
UnknownTopicIdException(String.format("Topic Id %s in FetchRequest was unknown
to the server", forgottenTopic.topicId()));
- }
- forgottenTopic.partitions().forEach(partitionId ->
toForget.add(new TopicPartition(name, partitionId)));
+ forgottenTopic.partitions().forEach(partitionId ->
toForget.add(new TopicIdPartition(forgottenTopic.topicId(), new
TopicPartition(name, partitionId))));
Review comment:
I would also add a small comment here.
##########
File path:
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -328,31 +374,26 @@ public int maxBytes() {
} else {
name = topicNames.get(fetchTopic.topicId());
}
- if (name != null) {
- // If topic name is resolved, simply add to
fetchData map
- fetchTopic.partitions().forEach(fetchPartition ->
- fetchData.put(new TopicPartition(name,
fetchPartition.partition()),
- new PartitionData(
-
fetchPartition.fetchOffset(),
-
fetchPartition.logStartOffset(),
-
fetchPartition.partitionMaxBytes(),
-
optionalEpoch(fetchPartition.currentLeaderEpoch()),
-
optionalEpoch(fetchPartition.lastFetchedEpoch())
- )
- )
- );
- } else {
- throw new
UnknownTopicIdException(String.format("Topic Id %s in FetchRequest was unknown
to the server", fetchTopic.topicId()));
- }
+ fetchTopic.partitions().forEach(fetchPartition ->
+ fetchData.put(new
TopicIdPartition(fetchTopic.topicId(), new TopicPartition(name,
fetchPartition.partition())),
Review comment:
Should we add a comment here which explains that the topic name might be
null in `TopicIdPartition` if we were unable to resolve it?
##########
File path:
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -432,9 +425,9 @@ private String
partitionsToLogString(Collection<TopicPartition> partitions) {
String verifyFullFetchResponsePartitions(Set<TopicPartition>
topicPartitions, Set<Uuid> ids, short version) {
StringBuilder bld = new StringBuilder();
Set<TopicPartition> extra =
- findMissing(topicPartitions, sessionPartitions.keySet());
+ findMissing(topicPartitions, sessionPartitions.keySet());
Review comment:
nit: This change and the following ones do not seem necessary. I would
revert them back.
##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -374,7 +374,7 @@ abstract class AbstractFetcherThread(name: String,
}
}
} catch {
- case ime@( _: CorruptRecordException | _:
InvalidRecordException) =>
+ case ime@(_: CorruptRecordException | _:
InvalidRecordException) =>
Review comment:
Putting this here but it is not related to this line.
It seems that we have an opportunity in `processFetchRequest` to better
handle the `FETCH_SESSION_TOPIC_ID_ERROR` error. At the moment, it delays all
the partitions. It seems to me that we could retry directly, no? If you agree,
we could file a Jira and address this in a subsequent PR.
##########
File path:
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -199,26 +222,49 @@ public FetchRequest build(short version) {
fetchRequestData.setMaxBytes(maxBytes);
fetchRequestData.setIsolationLevel(isolationLevel.id());
fetchRequestData.setForgottenTopicsData(new ArrayList<>());
- toForget.stream()
- .collect(Collectors.groupingBy(TopicPartition::topic,
LinkedHashMap::new, Collectors.toList()))
- .forEach((topic, partitions) ->
- fetchRequestData.forgottenTopicsData().add(new
FetchRequestData.ForgottenTopic()
- .setTopic(topic)
- .setTopicId(topicIds.getOrDefault(topic,
Uuid.ZERO_UUID))
-
.setPartitions(partitions.stream().map(TopicPartition::partition).collect(Collectors.toList())))
- );
- fetchRequestData.setTopics(new ArrayList<>());
+
+ Map<String, FetchRequestData.ForgottenTopic> forgottenTopicMap =
new LinkedHashMap<>();
+ removed.forEach(topicIdPartition -> {
+ FetchRequestData.ForgottenTopic forgottenTopic =
forgottenTopicMap.get(topicIdPartition.topic());
+ if (forgottenTopic == null) {
+ forgottenTopic = new ForgottenTopic()
+ .setTopic(topicIdPartition.topic())
+ .setTopicId(topicIdPartition.topicId());
+ forgottenTopicMap.put(topicIdPartition.topic(),
forgottenTopic);
+ }
+ forgottenTopic.partitions().add(topicIdPartition.partition());
+ });
+
+ // If a version older than v13 is used, topic-partition which were
replaced
+ // by a topic-partition with the same name but a different topic
ID are not
+ // sent out in the "forget" set in order to not remove the newly
added
+ // partition in the "fetch" set.
+ if (version >= 13) {
+ replaced.forEach(topicIdPartition -> {
+ FetchRequestData.ForgottenTopic forgottenTopic =
forgottenTopicMap.get(topicIdPartition.topic());
+ if (forgottenTopic == null) {
+ forgottenTopic = new ForgottenTopic()
+ .setTopic(topicIdPartition.topic())
+ .setTopicId(topicIdPartition.topicId());
+ forgottenTopicMap.put(topicIdPartition.topic(),
forgottenTopic);
+ }
+
forgottenTopic.partitions().add(topicIdPartition.partition());
+ });
Review comment:
This block is identical to the previous one. Should we pull it into a
helper method? (yeah, I know, I wrote this...)
##########
File path:
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -285,52 +268,57 @@ public FetchRequestData build() {
if (nextMetadata.isFull()) {
if (log.isDebugEnabled()) {
log.debug("Built full fetch {} for node {} with {}.",
- nextMetadata, node,
partitionsToLogString(next.keySet()));
+ nextMetadata, node,
topicPartitionsToLogString(next.keySet()));
}
sessionPartitions = next;
next = null;
+ Map<TopicPartition, PartitionData> toSend =
+ Collections.unmodifiableMap(new
LinkedHashMap<>(sessionPartitions));
// Only add topic IDs to the session if we are using topic IDs.
if (canUseTopicIds) {
- sessionTopicIds = topicIds;
- sessionTopicNames = new HashMap<>(topicIds.size());
- topicIds.forEach((name, id) -> sessionTopicNames.put(id,
name));
+ Map<Uuid, Set<String>> newTopicNames =
sessionPartitions.entrySet().stream().collect(Collectors.groupingByConcurrent(entry
-> entry.getValue().topicId,
+ Collectors.mapping(entry ->
entry.getKey().topic(), Collectors.toSet())));
+
+ sessionTopicNames = new HashMap<>(newTopicNames.size());
+ // There should only be one topic name per topic ID.
+ newTopicNames.forEach((topicId, topicNamesSet) ->
topicNamesSet.forEach(topicName -> sessionTopicNames.put(topicId, topicName)));
} else {
- sessionTopicIds = new HashMap<>();
sessionTopicNames = new HashMap<>();
Review comment:
Not related to this PR but could we use `Collections.emtpyMap` here?
That would avoid allocating a `HashMap` all the times.
##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -708,40 +701,41 @@ class KafkaApis(val requestChannel: RequestChannel,
None
}
- val erroneous = mutable.ArrayBuffer[(TopicPartition,
FetchResponseData.PartitionData)]()
- val interesting = mutable.ArrayBuffer[(TopicPartition,
FetchRequest.PartitionData)]()
- val sessionTopicIds = mutable.Map[String, Uuid]()
+ val erroneous = mutable.ArrayBuffer[(TopicIdPartition,
FetchResponseData.PartitionData)]()
+ val interesting = mutable.ArrayBuffer[(TopicIdPartition,
FetchRequest.PartitionData)]()
if (fetchRequest.isFromFollower) {
// The follower must have ClusterAction on ClusterResource in order to
fetch partition data.
if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER,
CLUSTER_NAME)) {
- fetchContext.foreachPartition { (topicPartition, topicId, data) =>
- sessionTopicIds.put(topicPartition.topic(), topicId)
- if (!metadataCache.contains(topicPartition))
- erroneous += topicPartition ->
FetchResponse.partitionResponse(topicPartition.partition,
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ fetchContext.foreachPartition { (topicIdPartition, data) =>
+ if (topicIdPartition.topicPartition.topic == null )
+ erroneous += topicIdPartition ->
FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)
+ else if (!metadataCache.contains(topicIdPartition.topicPartition))
+ erroneous += topicIdPartition ->
FetchResponse.partitionResponse(topicIdPartition,
Errors.UNKNOWN_TOPIC_OR_PARTITION)
else
- interesting += (topicPartition -> data)
+ interesting += (topicIdPartition -> data)
Review comment:
nit: We can remove the parenthesis here.
##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -708,40 +701,41 @@ class KafkaApis(val requestChannel: RequestChannel,
None
}
- val erroneous = mutable.ArrayBuffer[(TopicPartition,
FetchResponseData.PartitionData)]()
- val interesting = mutable.ArrayBuffer[(TopicPartition,
FetchRequest.PartitionData)]()
- val sessionTopicIds = mutable.Map[String, Uuid]()
+ val erroneous = mutable.ArrayBuffer[(TopicIdPartition,
FetchResponseData.PartitionData)]()
+ val interesting = mutable.ArrayBuffer[(TopicIdPartition,
FetchRequest.PartitionData)]()
if (fetchRequest.isFromFollower) {
// The follower must have ClusterAction on ClusterResource in order to
fetch partition data.
if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER,
CLUSTER_NAME)) {
- fetchContext.foreachPartition { (topicPartition, topicId, data) =>
- sessionTopicIds.put(topicPartition.topic(), topicId)
- if (!metadataCache.contains(topicPartition))
- erroneous += topicPartition ->
FetchResponse.partitionResponse(topicPartition.partition,
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ fetchContext.foreachPartition { (topicIdPartition, data) =>
+ if (topicIdPartition.topicPartition.topic == null )
+ erroneous += topicIdPartition ->
FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)
+ else if (!metadataCache.contains(topicIdPartition.topicPartition))
+ erroneous += topicIdPartition ->
FetchResponse.partitionResponse(topicIdPartition,
Errors.UNKNOWN_TOPIC_OR_PARTITION)
else
- interesting += (topicPartition -> data)
+ interesting += (topicIdPartition -> data)
}
} else {
- fetchContext.foreachPartition { (part, topicId, _) =>
- sessionTopicIds.put(part.topic(), topicId)
- erroneous += part -> FetchResponse.partitionResponse(part.partition,
Errors.TOPIC_AUTHORIZATION_FAILED)
+ fetchContext.foreachPartition { (topicIdPartition, _) =>
+ erroneous += topicIdPartition ->
FetchResponse.partitionResponse(topicIdPartition,
Errors.TOPIC_AUTHORIZATION_FAILED)
Review comment:
I wonder if we should reply with `UNKNOWN_TOPIC_ID` for the topics whose
are not resolved.
##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -163,18 +173,35 @@ class CachedPartition(val topic: String,
mustRespond
}
- override def hashCode: Int = Objects.hash(new TopicPartition(topic,
partition), topicId)
+ /**
+ * We have different equality checks depending on whether topic IDs are used.
+ * This means we need a different hash function as well. We use name to
calculate the hash if the ID is zero and unused.
+ * Otherwise, we use the topic ID in the hash calculation.
+ *
+ * @return the hash code for the CachedPartition depending on what request
version we are using.
+ */
+ override def hashCode: Int = if (topicId != Uuid.ZERO_UUID) (31 * partition)
+ topicId.hashCode else
+ (31 * partition) + topic.hashCode
def canEqual(that: Any): Boolean = that.isInstanceOf[CachedPartition]
+ /**
+ * We have different equality checks depending on whether topic IDs are used.
+ *
+ * This is because when we use topic IDs, a partition with a given ID and an
unknown name is the same as a partition with that
+ * ID and a known name. This means we can only use topic ID and partition
when determining equality.
+ *
+ * On the other hand, if we are using topic names, all IDs are zero. This
means we can only use topic name and partition
+ * when determining equality.
+ */
override def equals(that: Any): Boolean =
that match {
case that: CachedPartition =>
this.eq(that) ||
(that.canEqual(this) &&
this.partition.equals(that.partition) &&
- this.topic.equals(that.topic) &&
- this.topicId.equals(that.topicId))
+ (if (this.topicId != Uuid.ZERO_UUID)
this.topicId.equals(that.topicId)
Review comment:
nit: The if/else inline reads a bit weird. Should we extract the if/else?
```
this.eq(that) || if (this.topicId != Uuid.ZERO_UUID)
this.partition.equals(that.partition) && this.topicId.equals(that.topicId)
else
this.partition.equals(that.partition) && this.topic.equals(that.topic)
```
##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -238,47 +263,40 @@ class FetchSession(val id: Int,
def metadata: JFetchMetadata = synchronized { new JFetchMetadata(id, epoch) }
- def getFetchOffset(topicPartition: TopicPartition): Option[Long] =
synchronized {
- Option(partitionMap.find(new CachedPartition(topicPartition,
- sessionTopicIds.getOrDefault(topicPartition.topic(),
Uuid.ZERO_UUID)))).map(_.fetchOffset)
+ def getFetchOffset(topicIdPartition: TopicIdPartition): Option[Long] =
synchronized {
+ Option(partitionMap.find(new
CachedPartition(topicIdPartition.topicPartition,
topicIdPartition.topicId))).map(_.fetchOffset)
}
- type TL = util.ArrayList[TopicPartition]
+ type TL = util.ArrayList[TopicIdPartition]
// Update the cached partition data based on the request.
def update(fetchData: FetchSession.REQ_MAP,
- toForget: util.List[TopicPartition],
+ toForget: util.List[TopicIdPartition],
reqMetadata: JFetchMetadata,
- topicIds: util.Map[String, Uuid]): (TL, TL, TL, TL) =
synchronized {
+ usesTopicIds: Boolean): (TL, TL, TL) = synchronized {
val added = new TL
val updated = new TL
val removed = new TL
- val inconsistentTopicIds = new TL
fetchData.forEach { (topicPart, reqData) =>
- // Get the topic ID on the broker, if it is valid and the topic is new
to the session, add its ID.
- // If the topic already existed, check that its ID is consistent.
- val id = topicIds.getOrDefault(topicPart.topic, Uuid.ZERO_UUID)
- val newCachedPart = new CachedPartition(topicPart, id, reqData)
- if (id != Uuid.ZERO_UUID) {
- val prevSessionTopicId = sessionTopicIds.putIfAbsent(topicPart.topic,
id)
- if (prevSessionTopicId != null && prevSessionTopicId != id)
- inconsistentTopicIds.add(topicPart)
- }
+ val newCachedPart = new CachedPartition(topicPart.topicPartition,
topicPart.topicId, reqData)
val cachedPart = partitionMap.find(newCachedPart)
if (cachedPart == null) {
partitionMap.mustAdd(newCachedPart)
added.add(topicPart)
} else {
cachedPart.updateRequestParams(reqData)
+ if (cachedPart.topic == null)
Review comment:
nit: It might be better to encapsulate this in `CachedPartition`. We
could add a method called `maybeSetTopicName` or piggy back on
`updateRequestParams`.
##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -801,23 +795,23 @@ class KafkaApis(val requestChannel: RequestChannel,
// down-conversion always guarantees that at least one batch
of messages is down-converted and sent out to the
// client.
new FetchResponseData.PartitionData()
- .setPartitionIndex(tp.partition)
+ .setPartitionIndex(tp.topicPartition.partition)
Review comment:
nit: We can use `tp.partition` here and a few other places.
##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -238,47 +263,40 @@ class FetchSession(val id: Int,
def metadata: JFetchMetadata = synchronized { new JFetchMetadata(id, epoch) }
- def getFetchOffset(topicPartition: TopicPartition): Option[Long] =
synchronized {
- Option(partitionMap.find(new CachedPartition(topicPartition,
- sessionTopicIds.getOrDefault(topicPartition.topic(),
Uuid.ZERO_UUID)))).map(_.fetchOffset)
+ def getFetchOffset(topicIdPartition: TopicIdPartition): Option[Long] =
synchronized {
+ Option(partitionMap.find(new
CachedPartition(topicIdPartition.topicPartition,
topicIdPartition.topicId))).map(_.fetchOffset)
}
- type TL = util.ArrayList[TopicPartition]
+ type TL = util.ArrayList[TopicIdPartition]
// Update the cached partition data based on the request.
def update(fetchData: FetchSession.REQ_MAP,
- toForget: util.List[TopicPartition],
+ toForget: util.List[TopicIdPartition],
reqMetadata: JFetchMetadata,
- topicIds: util.Map[String, Uuid]): (TL, TL, TL, TL) =
synchronized {
+ usesTopicIds: Boolean): (TL, TL, TL) = synchronized {
val added = new TL
val updated = new TL
val removed = new TL
- val inconsistentTopicIds = new TL
fetchData.forEach { (topicPart, reqData) =>
- // Get the topic ID on the broker, if it is valid and the topic is new
to the session, add its ID.
- // If the topic already existed, check that its ID is consistent.
- val id = topicIds.getOrDefault(topicPart.topic, Uuid.ZERO_UUID)
- val newCachedPart = new CachedPartition(topicPart, id, reqData)
- if (id != Uuid.ZERO_UUID) {
- val prevSessionTopicId = sessionTopicIds.putIfAbsent(topicPart.topic,
id)
- if (prevSessionTopicId != null && prevSessionTopicId != id)
- inconsistentTopicIds.add(topicPart)
- }
+ val newCachedPart = new CachedPartition(topicPart.topicPartition,
topicPart.topicId, reqData)
Review comment:
nit: How about naming it `cachedPartitionKey`? We could also benefits
from passing `TopicIdPartition` to the constructor directly.
##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -238,47 +263,40 @@ class FetchSession(val id: Int,
def metadata: JFetchMetadata = synchronized { new JFetchMetadata(id, epoch) }
- def getFetchOffset(topicPartition: TopicPartition): Option[Long] =
synchronized {
- Option(partitionMap.find(new CachedPartition(topicPartition,
- sessionTopicIds.getOrDefault(topicPartition.topic(),
Uuid.ZERO_UUID)))).map(_.fetchOffset)
+ def getFetchOffset(topicIdPartition: TopicIdPartition): Option[Long] =
synchronized {
+ Option(partitionMap.find(new
CachedPartition(topicIdPartition.topicPartition,
topicIdPartition.topicId))).map(_.fetchOffset)
}
- type TL = util.ArrayList[TopicPartition]
+ type TL = util.ArrayList[TopicIdPartition]
// Update the cached partition data based on the request.
def update(fetchData: FetchSession.REQ_MAP,
- toForget: util.List[TopicPartition],
+ toForget: util.List[TopicIdPartition],
reqMetadata: JFetchMetadata,
- topicIds: util.Map[String, Uuid]): (TL, TL, TL, TL) =
synchronized {
+ usesTopicIds: Boolean): (TL, TL, TL) = synchronized {
Review comment:
Is `usesTopicIds` used anywhere in this method?
##########
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##########
@@ -224,10 +224,8 @@ class ReplicaFetcherThread(name: String,
}
val fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse]
if (!fetchSessionHandler.handleResponse(fetchResponse,
clientResponse.requestHeader().apiVersion())) {
- // If we had a topic ID related error, throw it, otherwise return an
empty fetch data map.
- if (fetchResponse.error == Errors.UNKNOWN_TOPIC_ID ||
- fetchResponse.error == Errors.FETCH_SESSION_TOPIC_ID_ERROR ||
- fetchResponse.error == Errors.INCONSISTENT_TOPIC_ID) {
+ // If we had a session topic ID related error, throw it, otherwise
return an empty fetch data map.
+ if (fetchResponse.error == Errors.FETCH_SESSION_TOPIC_ID_ERROR) {
Review comment:
I already mentioned this before but it seems that we could retry
immediately in this case when the session was upgraded/downgraded. That would
avoid having to wait for the backoff.
##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1021,17 +1018,17 @@ class ReplicaManager(val config: KafkaConfig,
var bytesReadable: Long = 0
var errorReadingData = false
var hasDivergingEpoch = false
- val logReadResultMap = new mutable.HashMap[TopicPartition, LogReadResult]
- logReadResults.foreach { case (topicPartition, logReadResult) =>
-
brokerTopicStats.topicStats(topicPartition.topic).totalFetchRequestRate.mark()
+ val logReadResultMap = new mutable.HashMap[TopicIdPartition, LogReadResult]
+ logReadResults.foreach { case (topicIdPartition, logReadResult) =>
+
brokerTopicStats.topicStats(topicIdPartition.topicPartition.topic).totalFetchRequestRate.mark()
Review comment:
nit: `topicIdPartition.topic` should work.
##########
File path:
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -66,16 +69,28 @@ public PartitionData(
int maxBytes,
Optional<Integer> currentLeaderEpoch
) {
- this(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch,
Optional.empty());
+ this(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes,
currentLeaderEpoch, Optional.empty());
Review comment:
Do we still use this constructor?
##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1041,26 +1038,26 @@ class ReplicaManager(val config: KafkaConfig,
// 5) we found a diverging epoch
if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes
|| errorReadingData || hasDivergingEpoch) {
val fetchPartitionData = logReadResults.map { case (tp, result) =>
- val isReassignmentFetch = isFromFollower && isAddingReplica(tp,
replicaId)
+ val isReassignmentFetch = isFromFollower &&
isAddingReplica(tp.topicPartition, replicaId)
tp -> result.toFetchPartitionData(isReassignmentFetch)
}
responseCallback(fetchPartitionData)
} else {
// construct the fetch results from the read results
- val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicPartition,
FetchPartitionStatus)]
- fetchInfos.foreach { case (topicPartition, partitionData) =>
- logReadResultMap.get(topicPartition).foreach(logReadResult => {
+ val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicIdPartition,
FetchPartitionStatus)]
+ fetchInfos.foreach { case (topicIdPartition, partitionData) =>
+ logReadResultMap.get(topicIdPartition).foreach(logReadResult => {
val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata
- fetchPartitionStatus += (topicPartition ->
FetchPartitionStatus(logOffsetMetadata, partitionData))
+ fetchPartitionStatus += (topicIdPartition ->
FetchPartitionStatus(logOffsetMetadata, partitionData))
})
}
val fetchMetadata: SFetchMetadata = SFetchMetadata(fetchMinBytes,
fetchMaxBytes, hardMaxBytesLimit,
- fetchOnlyFromLeader, fetchIsolation, isFromFollower, replicaId,
topicIds, fetchPartitionStatus)
+ fetchOnlyFromLeader, fetchIsolation, isFromFollower, replicaId,
fetchPartitionStatus)
val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota,
clientMetadata,
responseCallback)
// create a list of (topic, partition) pairs to use as keys for this
delayed fetch operation
- val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) =>
TopicPartitionOperationKey(tp) }
+ val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) =>
TopicPartitionOperationKey(tp.topicPartition) }
Review comment:
nit: We could add another `apply` method to `TopicPartitionOperationKey`
which accepts a `TopicIdPartition`. That will be convenient.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]