junrao commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r625422142
########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -425,16 +438,27 @@ class IncrementalFetchContext(private val time: Time, val topicPart = element.getKey val respData = element.getValue val cachedPart = session.partitionMap.find(new CachedPartition(topicPart)) - val mustRespond = cachedPart.maybeUpdateResponseData(respData, updateFetchContextAndRemoveUnselected) - if (mustRespond) { + + // If we have an situation where there is a valid ID on the partition, but it does not match Review comment: an situation => a situation ########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -314,22 +321,24 @@ class SessionErrorContext(val error: Errors, override def foreachPartition(fun: (TopicPartition, FetchRequest.PartitionData) => Unit): Unit = {} override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: Short): Int = { - FetchResponse.sizeOf(versionId, (new FetchSession.RESP_MAP).entrySet.iterator) + FetchResponse.sizeOf(versionId, (new FetchSession.RESP_MAP).entrySet.iterator, Collections.emptyMap()) Review comment: Hmm, it seems that we can't pass in an empty topicIds since partition iterator is not empty? ########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ########## @@ -226,4 +284,4 @@ private static FetchResponseData toMessage(Errors error, .setSessionId(sessionId) .setResponses(topicResponseList); } -} \ No newline at end of file +} Review comment: no need for extra new line. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ########## @@ -319,12 +355,25 @@ public int maxBytes() { return data.maxBytes(); } - public Map<TopicPartition, PartitionData> fetchData() { + // For versions 13+, throws UnknownTopicIdException if the topic ID was unknown to the server. + public Map<TopicPartition, PartitionData> fetchData(Map<Uuid, String> topicNames) throws UnknownTopicIdException { Review comment: Since toPartitionDataMap() handles all versions, could we just simply call toPartitionDataMap()? Then, I am not sure if we need to call toPartitionDataMap() in the constructor. ########## File path: core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala ########## @@ -276,7 +284,12 @@ class ReplicaAlterLogDirsThread(name: String, } else { // Set maxWait and minBytes to 0 because the response should return immediately if // the future log has caught up with the current log of the partition - val requestBuilder = FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 0, requestMap).setMaxBytes(maxBytes) + val version: Short = if (ApiKeys.FETCH.latestVersion >= 13 && topics.size() != topicIdsInRequest.size()) + 12 + else + ApiKeys.FETCH.latestVersion Review comment: The calculation of version is duplicated between here and ReplicaFetcherThread. Could we share them somehow? ########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ########## @@ -80,14 +89,26 @@ public Errors error() { return Errors.forCode(data.errorCode()); } - public LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> responseData() { + public LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> responseData(Map<Uuid, String> topicNames, short version) { + return toResponseDataMap(topicNames, version); + + } + + // TODO: Should be replaced or cleaned up. The idea is that in KafkaApis we need to reconstruct responseData even though we could have just passed in and out a map. + // With topic IDs, recreating the map takes a little more time since we have to get the topic name from the topic ID to name map. + // The refactor somewhat helps in KafkaApis where we already have the topic names, but we have to recompute the map using topic IDs instead of just returning what we have. + // Can be replaced when we remove toMessage and change sizeOf as a part of KAFKA-12410. + // Used when we can guarantee responseData is populated with all possible partitions + // This occurs when we have a response version < 13 or we built the FetchResponse with + // responseDataMap as a parameter and we have the same topic IDs available. + public LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> resolvedResponseData() { if (responseData == null) { synchronized (this) { if (responseData == null) { responseData = new LinkedHashMap<>(); data.responses().forEach(topicResponse -> - topicResponse.partitions().forEach(partition -> - responseData.put(new TopicPartition(topicResponse.topic(), partition.partitionIndex()), partition)) + topicResponse.partitions().forEach(partition -> + responseData.put(new TopicPartition(topicResponse.topic(), partition.partitionIndex()), partition)) Review comment: Do we need this method? It seems it's the same as toResponseDataMap(). ########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ########## @@ -319,12 +355,25 @@ public int maxBytes() { return data.maxBytes(); } - public Map<TopicPartition, PartitionData> fetchData() { - return fetchData; + // For versions 13+, throws UnknownTopicIdException if the topic ID was unknown to the server. + public Map<TopicPartition, PartitionData> fetchData(Map<Uuid, String> topicNames) throws UnknownTopicIdException { + if (version() < 13) + return fetchData; + return toPartitionDataMap(data.topics(), topicNames); } - public List<TopicPartition> toForget() { - return toForget; + // For versions 13+, throws UnknownTopicIdException if the topic ID was unknown to the server. + public List<FetchRequestData.ForgottenTopic> forgottenTopics(Map<Uuid, String> topicNames) throws UnknownTopicIdException { + if (version() >= 13) { + data.forgottenTopicsData().forEach(forgottenTopic -> { + String name = topicNames.get(forgottenTopic.topicId()); + if (name == null) { + throw new UnknownTopicIdException(String.format("Topic Id %s in FetchRequest was unknown to the server", forgottenTopic.topicId())); + } + forgottenTopic.setTopic(topicNames.getOrDefault(forgottenTopic.topicId(), "")); Review comment: It's a bit weird for `forgottenTopics()` to have a side effect that changes the internal data structure since requests are typically immutable. It's all not consistent with toPartitionDataMap(). If we do want to modify the internal data structure, we probably want to name the method more properly. Also, why do we return a list of ForgottenTopic instead of list of TopicPartition? The latter is easier to understand and it doesn't seem that we need topicId in ForgottenTopic, ########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -425,16 +438,27 @@ class IncrementalFetchContext(private val time: Time, val topicPart = element.getKey val respData = element.getValue val cachedPart = session.partitionMap.find(new CachedPartition(topicPart)) - val mustRespond = cachedPart.maybeUpdateResponseData(respData, updateFetchContextAndRemoveUnselected) - if (mustRespond) { + + // If we have an situation where there is a valid ID on the partition, but it does not match + // the ID in topic IDs (likely due to topic deletion and re-creation) or there is no valid topic + // ID on the broker (topic deleted or broker received a metadataResponse without IDs), + // remove the cached partition from partitionMap and from the response. Review comment: Hmm, if the topicId has changed, it seems that we should send an error (e.g. InvalidTopicId) back to indicate the topicId is no longer valid so that the client could refresh the metadata? ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -820,20 +838,30 @@ class KafkaApis(val requestChannel: RequestChannel, def createResponse(throttleTimeMs: Int): FetchResponse = { // Down-convert messages for each partition if required val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData] - unconvertedFetchResponse.responseData.forEach { (tp, unconvertedPartitionData) => - val error = Errors.forCode(unconvertedPartitionData.errorCode) - if (error != Errors.NONE) - debug(s"Fetch request with correlation id ${request.header.correlationId} from client $clientId " + - s"on partition $tp failed due to ${error.exceptionName}") - convertedData.put(tp, maybeConvertFetchedData(tp, unconvertedPartitionData)) + unconvertedFetchResponse.data().responses().forEach { topicResponse => + if (topicResponse.topic() != "") { Review comment: When will topicResponse.topic() be ""? ########## File path: core/src/main/scala/kafka/server/MetadataCache.scala ########## @@ -466,6 +487,8 @@ class ZkMetadataCache(brokerId: Int) extends MetadataCache with Logging { topicIds: Map[String, Uuid], controllerId: Option[Int], aliveBrokers: mutable.LongMap[Broker], - aliveNodes: mutable.LongMap[collection.Map[ListenerName, Node]]) + aliveNodes: mutable.LongMap[collection.Map[ListenerName, Node]]) { + val topicNames = topicIds.map { case (topicName, topicId) => (topicId, topicName) } Review comment: Could we explicitly define the type of topicNames ? ########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -237,14 +239,16 @@ class FetchSession(val id: Int, type TL = util.ArrayList[TopicPartition] // Update the cached partition data based on the request. - def update(fetchData: FetchSession.REQ_MAP, - toForget: util.List[TopicPartition], - reqMetadata: JFetchMetadata): (TL, TL, TL) = synchronized { + def update(version: Short, Review comment: version seems unused? ########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ########## @@ -281,10 +296,16 @@ public String toString() { public FetchRequest(FetchRequestData fetchRequestData, short version) { super(ApiKeys.FETCH, version); - this.data = fetchRequestData; - this.fetchData = toPartitionDataMap(fetchRequestData.topics()); - this.toForget = toForgottenTopicList(fetchRequestData.forgottenTopicsData()); - this.metadata = new FetchMetadata(fetchRequestData.sessionId(), fetchRequestData.sessionEpoch()); + if (version < 13) { + this.data = fetchRequestData; Review comment: It seems that we could share the code to populate this.data and this.metadata. We only use `this` in this method. Should we just remove it? ########## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ########## @@ -315,23 +397,32 @@ private String partitionsToLogString(Collection<TopicPartition> partitions) { /** * Verify that a full fetch response contains all the partitions in the fetch session. * - * @param response The response. - * @return True if the full fetch response partitions are valid. + * @param topicPartitions The topicPartitions from the FetchResponse. + * @param ids The topic IDs from the FetchResponse. + * @param version The version of the FetchResponse. + * @return True if the full fetch response partitions are valid. */ - String verifyFullFetchResponsePartitions(FetchResponse response) { + String verifyFullFetchResponsePartitions(Set<TopicPartition> topicPartitions, Set<Uuid> ids, short version) { StringBuilder bld = new StringBuilder(); Set<TopicPartition> extra = - findMissing(response.responseData().keySet(), sessionPartitions.keySet()); + findMissing(topicPartitions, sessionPartitions.keySet()); Review comment: Perhaps extra and omitted should be extraPartition and omittedPartitions to make it clear? ########## File path: core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala ########## @@ -256,6 +258,9 @@ class ReplicaAlterLogDirsThread(name: String, private def buildFetchForPartition(tp: TopicPartition, fetchState: PartitionFetchState): ResultWithPartitions[Option[ReplicaFetch]] = { val requestMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] val partitionsWithError = mutable.Set[TopicPartition]() + val topics = new util.HashSet[String]() Review comment: Since there is only a single tp, does topics need to be a set? ########## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ########## @@ -212,10 +273,19 @@ public FetchRequestData build() { nextMetadata, node, partitionsToLogString(next.keySet())); } sessionPartitions = next; + sessionTopicIds = topicIds; + topicIds.forEach((name, id) -> sessionTopicNames.put(id, name)); next = null; + topicIds = null; + requestUsedTopicIds = sessionTopicIds.keySet().containsAll(sessionPartitions.keySet().stream().map( + tp -> tp.topic()).collect(Collectors.toSet())); Map<TopicPartition, PartitionData> toSend = - Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions)); - return new FetchRequestData(toSend, Collections.emptyList(), toSend, nextMetadata); + Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions)); + Map<String, Uuid> toSendTopicIds = + Collections.unmodifiableMap(new HashMap<>(sessionTopicIds)); + Map<Uuid, String> toSendTopicNames = Review comment: Could we build the full toSendTopicIds and toSendTopicNames once and reuse in both full and incremental? ########## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ########## @@ -125,10 +167,22 @@ public FetchSessionHandler(LogContext logContext, int node) { return sessionPartitions; } + public Map<String, Uuid> topicIds() { + return topicIds; Review comment: Do the new fields need to be included in toString()? ########## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ########## @@ -297,16 +380,15 @@ private String partitionsToLogString(Collection<TopicPartition> partitions) { /** * Return some partitions which are expected to be in a particular set, but which are not. * - * @param toFind The partitions to look for. - * @param toSearch The set of partitions to search. - * @return null if all partitions were found; some of the missing ones - * in string form, if not. + * @param toFind The items to look for. Review comment: The above comment needs to be changed accordingly. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ########## @@ -127,10 +177,12 @@ public static FetchResponse parse(ByteBuffer buffer, short version) { * @return The response size in bytes. */ public static int sizeOf(short version, - Iterator<Map.Entry<TopicPartition, FetchResponseData.PartitionData>> partIterator) { + Iterator<Map.Entry<TopicPartition, + FetchResponseData.PartitionData>> partIterator, + Map<String, Uuid> topicIds) { Review comment: Could we add the new param to javadoc? ########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ########## @@ -296,11 +317,26 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { // may not be any partitions at all in the response. For this reason, the top-level error code // is essential for them. Errors error = Errors.forException(e); - LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> responseData = new LinkedHashMap<>(); - for (Map.Entry<TopicPartition, PartitionData> entry : fetchData.entrySet()) { - responseData.put(entry.getKey(), FetchResponse.partitionResponse(entry.getKey().partition(), error)); + if (version() < 13) { + LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> responseData = new LinkedHashMap<>(); + for (Map.Entry<TopicPartition, PartitionData> entry : fetchData.entrySet()) { + responseData.put(entry.getKey(), FetchResponse.partitionResponse(entry.getKey().partition(), error)); + } + return FetchResponse.of(error, throttleTimeMs, data.sessionId(), responseData, Collections.emptyMap()); } - return FetchResponse.of(error, throttleTimeMs, data.sessionId(), responseData); + List<FetchResponseData.FetchableTopicResponse> topicResponseList = new ArrayList<>(); Review comment: It seems it will be clearer if we put this in an else clause. ########## File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java ########## @@ -130,13 +149,30 @@ MetadataCache mergeWith(String newClusterId, Set<String> addInvalidTopics, Set<String> addInternalTopics, Node newController, + Map<String, Uuid> topicIds, Review comment: Could we add the javadoc for the new param? ########## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ########## @@ -73,6 +77,22 @@ public FetchSessionHandler(LogContext logContext, int node) { private LinkedHashMap<TopicPartition, PartitionData> sessionPartitions = new LinkedHashMap<>(0); + /** + * All of the topic ids mapped to topic names for topics which exist in the fetch request session. + */ + private Map<String, Uuid> sessionTopicIds = new HashMap<>(0); + + /** + * All of the topic names mapped to topic ids for topics which exist in the fetch request session. + */ + private Map<Uuid, String> sessionTopicNames = new HashMap<>(0); + + public Map<Uuid, String> sessionTopicNames() { + return sessionTopicNames; + } + + public boolean requestUsedTopicIds = false; Review comment: It seems that this can just be a local val instead of an instance val? ########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -71,6 +75,7 @@ object FetchSession { * localLogStartOffset is the log start offset of the partition on this broker. */ class CachedPartition(val topic: String, + var topicId: Uuid, Review comment: Does topicId need to be a var? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org