junrao commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r625422142
##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -425,16 +438,27 @@ class IncrementalFetchContext(private val time: Time,
val topicPart = element.getKey
val respData = element.getValue
val cachedPart = session.partitionMap.find(new
CachedPartition(topicPart))
- val mustRespond = cachedPart.maybeUpdateResponseData(respData,
updateFetchContextAndRemoveUnselected)
- if (mustRespond) {
+
+ // If we have an situation where there is a valid ID on the partition,
but it does not match
Review comment:
an situation => a situation
##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -314,22 +321,24 @@ class SessionErrorContext(val error: Errors,
override def foreachPartition(fun: (TopicPartition,
FetchRequest.PartitionData) => Unit): Unit = {}
override def getResponseSize(updates: FetchSession.RESP_MAP, versionId:
Short): Int = {
- FetchResponse.sizeOf(versionId, (new
FetchSession.RESP_MAP).entrySet.iterator)
+ FetchResponse.sizeOf(versionId, (new
FetchSession.RESP_MAP).entrySet.iterator, Collections.emptyMap())
Review comment:
Hmm, it seems that we can't pass in an empty topicIds since partition
iterator is not empty?
##########
File path:
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -226,4 +284,4 @@ private static FetchResponseData toMessage(Errors error,
.setSessionId(sessionId)
.setResponses(topicResponseList);
}
-}
\ No newline at end of file
+}
Review comment:
no need for extra new line.
##########
File path:
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -319,12 +355,25 @@ public int maxBytes() {
return data.maxBytes();
}
- public Map<TopicPartition, PartitionData> fetchData() {
+ // For versions 13+, throws UnknownTopicIdException if the topic ID was
unknown to the server.
+ public Map<TopicPartition, PartitionData> fetchData(Map<Uuid, String>
topicNames) throws UnknownTopicIdException {
Review comment:
Since toPartitionDataMap() handles all versions, could we just simply
call toPartitionDataMap()? Then, I am not sure if we need to call
toPartitionDataMap() in the constructor.
##########
File path: core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
##########
@@ -276,7 +284,12 @@ class ReplicaAlterLogDirsThread(name: String,
} else {
// Set maxWait and minBytes to 0 because the response should return
immediately if
// the future log has caught up with the current log of the partition
- val requestBuilder =
FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 0,
requestMap).setMaxBytes(maxBytes)
+ val version: Short = if (ApiKeys.FETCH.latestVersion >= 13 &&
topics.size() != topicIdsInRequest.size())
+ 12
+ else
+ ApiKeys.FETCH.latestVersion
Review comment:
The calculation of version is duplicated between here and
ReplicaFetcherThread. Could we share them somehow?
##########
File path:
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -80,14 +89,26 @@ public Errors error() {
return Errors.forCode(data.errorCode());
}
- public LinkedHashMap<TopicPartition, FetchResponseData.PartitionData>
responseData() {
+ public LinkedHashMap<TopicPartition, FetchResponseData.PartitionData>
responseData(Map<Uuid, String> topicNames, short version) {
+ return toResponseDataMap(topicNames, version);
+
+ }
+
+ // TODO: Should be replaced or cleaned up. The idea is that in KafkaApis
we need to reconstruct responseData even though we could have just passed in
and out a map.
+ // With topic IDs, recreating the map takes a little more time since we
have to get the topic name from the topic ID to name map.
+ // The refactor somewhat helps in KafkaApis where we already have the
topic names, but we have to recompute the map using topic IDs instead of just
returning what we have.
+ // Can be replaced when we remove toMessage and change sizeOf as a part
of KAFKA-12410.
+ // Used when we can guarantee responseData is populated with all possible
partitions
+ // This occurs when we have a response version < 13 or we built the
FetchResponse with
+ // responseDataMap as a parameter and we have the same topic IDs available.
+ public LinkedHashMap<TopicPartition, FetchResponseData.PartitionData>
resolvedResponseData() {
if (responseData == null) {
synchronized (this) {
if (responseData == null) {
responseData = new LinkedHashMap<>();
data.responses().forEach(topicResponse ->
- topicResponse.partitions().forEach(partition ->
- responseData.put(new
TopicPartition(topicResponse.topic(), partition.partitionIndex()), partition))
+ topicResponse.partitions().forEach(partition ->
+ responseData.put(new
TopicPartition(topicResponse.topic(), partition.partitionIndex()), partition))
Review comment:
Do we need this method? It seems it's the same as toResponseDataMap().
##########
File path:
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -319,12 +355,25 @@ public int maxBytes() {
return data.maxBytes();
}
- public Map<TopicPartition, PartitionData> fetchData() {
- return fetchData;
+ // For versions 13+, throws UnknownTopicIdException if the topic ID was
unknown to the server.
+ public Map<TopicPartition, PartitionData> fetchData(Map<Uuid, String>
topicNames) throws UnknownTopicIdException {
+ if (version() < 13)
+ return fetchData;
+ return toPartitionDataMap(data.topics(), topicNames);
}
- public List<TopicPartition> toForget() {
- return toForget;
+ // For versions 13+, throws UnknownTopicIdException if the topic ID was
unknown to the server.
+ public List<FetchRequestData.ForgottenTopic> forgottenTopics(Map<Uuid,
String> topicNames) throws UnknownTopicIdException {
+ if (version() >= 13) {
+ data.forgottenTopicsData().forEach(forgottenTopic -> {
+ String name = topicNames.get(forgottenTopic.topicId());
+ if (name == null) {
+ throw new UnknownTopicIdException(String.format("Topic Id
%s in FetchRequest was unknown to the server", forgottenTopic.topicId()));
+ }
+
forgottenTopic.setTopic(topicNames.getOrDefault(forgottenTopic.topicId(), ""));
Review comment:
It's a bit weird for `forgottenTopics()` to have a side effect that
changes the internal data structure since requests are typically immutable.
It's all not consistent with toPartitionDataMap(). If we do want to modify the
internal data structure, we probably want to name the method more properly.
Also, why do we return a list of ForgottenTopic instead of list of
TopicPartition? The latter is easier to understand and it doesn't seem that we
need topicId in ForgottenTopic,
##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -425,16 +438,27 @@ class IncrementalFetchContext(private val time: Time,
val topicPart = element.getKey
val respData = element.getValue
val cachedPart = session.partitionMap.find(new
CachedPartition(topicPart))
- val mustRespond = cachedPart.maybeUpdateResponseData(respData,
updateFetchContextAndRemoveUnselected)
- if (mustRespond) {
+
+ // If we have an situation where there is a valid ID on the partition,
but it does not match
+ // the ID in topic IDs (likely due to topic deletion and re-creation)
or there is no valid topic
+ // ID on the broker (topic deleted or broker received a
metadataResponse without IDs),
+ // remove the cached partition from partitionMap and from the response.
Review comment:
Hmm, if the topicId has changed, it seems that we should send an error
(e.g. InvalidTopicId) back to indicate the topicId is no longer valid so that
the client could refresh the metadata?
##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -820,20 +838,30 @@ class KafkaApis(val requestChannel: RequestChannel,
def createResponse(throttleTimeMs: Int): FetchResponse = {
// Down-convert messages for each partition if required
val convertedData = new util.LinkedHashMap[TopicPartition,
FetchResponseData.PartitionData]
- unconvertedFetchResponse.responseData.forEach { (tp,
unconvertedPartitionData) =>
- val error = Errors.forCode(unconvertedPartitionData.errorCode)
- if (error != Errors.NONE)
- debug(s"Fetch request with correlation id
${request.header.correlationId} from client $clientId " +
- s"on partition $tp failed due to ${error.exceptionName}")
- convertedData.put(tp, maybeConvertFetchedData(tp,
unconvertedPartitionData))
+ unconvertedFetchResponse.data().responses().forEach { topicResponse =>
+ if (topicResponse.topic() != "") {
Review comment:
When will topicResponse.topic() be ""?
##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -466,6 +487,8 @@ class ZkMetadataCache(brokerId: Int) extends MetadataCache
with Logging {
topicIds: Map[String, Uuid],
controllerId: Option[Int],
aliveBrokers: mutable.LongMap[Broker],
- aliveNodes:
mutable.LongMap[collection.Map[ListenerName, Node]])
+ aliveNodes:
mutable.LongMap[collection.Map[ListenerName, Node]]) {
+ val topicNames = topicIds.map { case (topicName, topicId) => (topicId,
topicName) }
Review comment:
Could we explicitly define the type of topicNames ?
##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -237,14 +239,16 @@ class FetchSession(val id: Int,
type TL = util.ArrayList[TopicPartition]
// Update the cached partition data based on the request.
- def update(fetchData: FetchSession.REQ_MAP,
- toForget: util.List[TopicPartition],
- reqMetadata: JFetchMetadata): (TL, TL, TL) = synchronized {
+ def update(version: Short,
Review comment:
version seems unused?
##########
File path:
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -281,10 +296,16 @@ public String toString() {
public FetchRequest(FetchRequestData fetchRequestData, short version) {
super(ApiKeys.FETCH, version);
- this.data = fetchRequestData;
- this.fetchData = toPartitionDataMap(fetchRequestData.topics());
- this.toForget =
toForgottenTopicList(fetchRequestData.forgottenTopicsData());
- this.metadata = new FetchMetadata(fetchRequestData.sessionId(),
fetchRequestData.sessionEpoch());
+ if (version < 13) {
+ this.data = fetchRequestData;
Review comment:
It seems that we could share the code to populate this.data and
this.metadata. We only use `this` in this method. Should we just remove it?
##########
File path:
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -315,23 +397,32 @@ private String
partitionsToLogString(Collection<TopicPartition> partitions) {
/**
* Verify that a full fetch response contains all the partitions in the
fetch session.
*
- * @param response The response.
- * @return True if the full fetch response partitions are valid.
+ * @param topicPartitions The topicPartitions from the FetchResponse.
+ * @param ids The topic IDs from the FetchResponse.
+ * @param version The version of the FetchResponse.
+ * @return True if the full fetch response partitions are
valid.
*/
- String verifyFullFetchResponsePartitions(FetchResponse response) {
+ String verifyFullFetchResponsePartitions(Set<TopicPartition>
topicPartitions, Set<Uuid> ids, short version) {
StringBuilder bld = new StringBuilder();
Set<TopicPartition> extra =
- findMissing(response.responseData().keySet(),
sessionPartitions.keySet());
+ findMissing(topicPartitions, sessionPartitions.keySet());
Review comment:
Perhaps extra and omitted should be extraPartition and omittedPartitions
to make it clear?
##########
File path: core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
##########
@@ -256,6 +258,9 @@ class ReplicaAlterLogDirsThread(name: String,
private def buildFetchForPartition(tp: TopicPartition, fetchState:
PartitionFetchState): ResultWithPartitions[Option[ReplicaFetch]] = {
val requestMap = new util.LinkedHashMap[TopicPartition,
FetchRequest.PartitionData]
val partitionsWithError = mutable.Set[TopicPartition]()
+ val topics = new util.HashSet[String]()
Review comment:
Since there is only a single tp, does topics need to be a set?
##########
File path:
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -212,10 +273,19 @@ public FetchRequestData build() {
nextMetadata, node,
partitionsToLogString(next.keySet()));
}
sessionPartitions = next;
+ sessionTopicIds = topicIds;
+ topicIds.forEach((name, id) -> sessionTopicNames.put(id,
name));
next = null;
+ topicIds = null;
+ requestUsedTopicIds =
sessionTopicIds.keySet().containsAll(sessionPartitions.keySet().stream().map(
+ tp -> tp.topic()).collect(Collectors.toSet()));
Map<TopicPartition, PartitionData> toSend =
- Collections.unmodifiableMap(new
LinkedHashMap<>(sessionPartitions));
- return new FetchRequestData(toSend, Collections.emptyList(),
toSend, nextMetadata);
+ Collections.unmodifiableMap(new
LinkedHashMap<>(sessionPartitions));
+ Map<String, Uuid> toSendTopicIds =
+ Collections.unmodifiableMap(new
HashMap<>(sessionTopicIds));
+ Map<Uuid, String> toSendTopicNames =
Review comment:
Could we build the full toSendTopicIds and toSendTopicNames once and
reuse in both full and incremental?
##########
File path:
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -125,10 +167,22 @@ public FetchSessionHandler(LogContext logContext, int
node) {
return sessionPartitions;
}
+ public Map<String, Uuid> topicIds() {
+ return topicIds;
Review comment:
Do the new fields need to be included in toString()?
##########
File path:
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -297,16 +380,15 @@ private String
partitionsToLogString(Collection<TopicPartition> partitions) {
/**
* Return some partitions which are expected to be in a particular set,
but which are not.
*
- * @param toFind The partitions to look for.
- * @param toSearch The set of partitions to search.
- * @return null if all partitions were found; some of the missing
ones
- * in string form, if not.
+ * @param toFind The items to look for.
Review comment:
The above comment needs to be changed accordingly.
##########
File path:
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -127,10 +177,12 @@ public static FetchResponse parse(ByteBuffer buffer,
short version) {
* @return The response size in bytes.
*/
public static int sizeOf(short version,
- Iterator<Map.Entry<TopicPartition,
FetchResponseData.PartitionData>> partIterator) {
+ Iterator<Map.Entry<TopicPartition,
+ FetchResponseData.PartitionData>> partIterator,
+ Map<String, Uuid> topicIds) {
Review comment:
Could we add the new param to javadoc?
##########
File path:
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -296,11 +317,26 @@ public AbstractResponse getErrorResponse(int
throttleTimeMs, Throwable e) {
// may not be any partitions at all in the response. For this reason,
the top-level error code
// is essential for them.
Errors error = Errors.forException(e);
- LinkedHashMap<TopicPartition, FetchResponseData.PartitionData>
responseData = new LinkedHashMap<>();
- for (Map.Entry<TopicPartition, PartitionData> entry :
fetchData.entrySet()) {
- responseData.put(entry.getKey(),
FetchResponse.partitionResponse(entry.getKey().partition(), error));
+ if (version() < 13) {
+ LinkedHashMap<TopicPartition, FetchResponseData.PartitionData>
responseData = new LinkedHashMap<>();
+ for (Map.Entry<TopicPartition, PartitionData> entry :
fetchData.entrySet()) {
+ responseData.put(entry.getKey(),
FetchResponse.partitionResponse(entry.getKey().partition(), error));
+ }
+ return FetchResponse.of(error, throttleTimeMs, data.sessionId(),
responseData, Collections.emptyMap());
}
- return FetchResponse.of(error, throttleTimeMs, data.sessionId(),
responseData);
+ List<FetchResponseData.FetchableTopicResponse> topicResponseList = new
ArrayList<>();
Review comment:
It seems it will be clearer if we put this in an else clause.
##########
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##########
@@ -130,13 +149,30 @@ MetadataCache mergeWith(String newClusterId,
Set<String> addInvalidTopics,
Set<String> addInternalTopics,
Node newController,
+ Map<String, Uuid> topicIds,
Review comment:
Could we add the javadoc for the new param?
##########
File path:
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -73,6 +77,22 @@ public FetchSessionHandler(LogContext logContext, int node) {
private LinkedHashMap<TopicPartition, PartitionData> sessionPartitions =
new LinkedHashMap<>(0);
+ /**
+ * All of the topic ids mapped to topic names for topics which exist in
the fetch request session.
+ */
+ private Map<String, Uuid> sessionTopicIds = new HashMap<>(0);
+
+ /**
+ * All of the topic names mapped to topic ids for topics which exist in
the fetch request session.
+ */
+ private Map<Uuid, String> sessionTopicNames = new HashMap<>(0);
+
+ public Map<Uuid, String> sessionTopicNames() {
+ return sessionTopicNames;
+ }
+
+ public boolean requestUsedTopicIds = false;
Review comment:
It seems that this can just be a local val instead of an instance val?
##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -71,6 +75,7 @@ object FetchSession {
* localLogStartOffset is the log start offset of the partition on this
broker.
*/
class CachedPartition(val topic: String,
+ var topicId: Uuid,
Review comment:
Does topicId need to be a var?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]