junrao commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r625422142



##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -425,16 +438,27 @@ class IncrementalFetchContext(private val time: Time,
         val topicPart = element.getKey
         val respData = element.getValue
         val cachedPart = session.partitionMap.find(new 
CachedPartition(topicPart))
-        val mustRespond = cachedPart.maybeUpdateResponseData(respData, 
updateFetchContextAndRemoveUnselected)
-        if (mustRespond) {
+
+        // If we have an situation where there is a valid ID on the partition, 
but it does not match

Review comment:
       an situation => a situation

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -314,22 +321,24 @@ class SessionErrorContext(val error: Errors,
   override def foreachPartition(fun: (TopicPartition, 
FetchRequest.PartitionData) => Unit): Unit = {}
 
   override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: 
Short): Int = {
-    FetchResponse.sizeOf(versionId, (new 
FetchSession.RESP_MAP).entrySet.iterator)
+    FetchResponse.sizeOf(versionId, (new 
FetchSession.RESP_MAP).entrySet.iterator, Collections.emptyMap())

Review comment:
       Hmm, it seems that we can't pass in an empty topicIds since partition 
iterator is not empty?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -226,4 +284,4 @@ private static FetchResponseData toMessage(Errors error,
             .setSessionId(sessionId)
             .setResponses(topicResponseList);
     }
-}
\ No newline at end of file
+}

Review comment:
       no need for extra new line.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -319,12 +355,25 @@ public int maxBytes() {
         return data.maxBytes();
     }
 
-    public Map<TopicPartition, PartitionData> fetchData() {
+    // For versions 13+, throws UnknownTopicIdException if the topic ID was 
unknown to the server.
+    public Map<TopicPartition, PartitionData> fetchData(Map<Uuid, String> 
topicNames) throws UnknownTopicIdException {

Review comment:
       Since toPartitionDataMap() handles all versions, could we just simply 
call toPartitionDataMap()? Then, I am not sure if we need to call 
toPartitionDataMap() in the constructor.

##########
File path: core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
##########
@@ -276,7 +284,12 @@ class ReplicaAlterLogDirsThread(name: String,
     } else {
       // Set maxWait and minBytes to 0 because the response should return 
immediately if
       // the future log has caught up with the current log of the partition
-      val requestBuilder = 
FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 0, 
requestMap).setMaxBytes(maxBytes)
+      val version: Short = if (ApiKeys.FETCH.latestVersion >= 13 && 
topics.size() != topicIdsInRequest.size())
+        12
+      else
+        ApiKeys.FETCH.latestVersion

Review comment:
       The calculation of version is duplicated between here and 
ReplicaFetcherThread. Could we share them somehow?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -80,14 +89,26 @@ public Errors error() {
         return Errors.forCode(data.errorCode());
     }
 
-    public LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> 
responseData() {
+    public LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> 
responseData(Map<Uuid, String> topicNames, short version) {
+        return toResponseDataMap(topicNames, version);
+
+    }
+
+    // TODO: Should be replaced or cleaned up. The idea is that in KafkaApis 
we need to reconstruct responseData even though we could have just passed in 
and out a map.
+    //  With topic IDs, recreating the map takes a little more time since we 
have to get the topic name from the topic ID to name map.
+    //  The refactor somewhat helps in KafkaApis where we already have the 
topic names, but we have to recompute the map using topic IDs instead of just 
returning what we have.
+    //  Can be replaced when we remove toMessage and change sizeOf as a part 
of KAFKA-12410.
+    // Used when we can guarantee responseData is populated with all possible 
partitions
+    // This occurs when we have a response version < 13 or we built the 
FetchResponse with
+    // responseDataMap as a parameter and we have the same topic IDs available.
+    public LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> 
resolvedResponseData() {
         if (responseData == null) {
             synchronized (this) {
                 if (responseData == null) {
                     responseData = new LinkedHashMap<>();
                     data.responses().forEach(topicResponse ->
-                            topicResponse.partitions().forEach(partition ->
-                                    responseData.put(new 
TopicPartition(topicResponse.topic(), partition.partitionIndex()), partition))
+                        topicResponse.partitions().forEach(partition ->
+                                responseData.put(new 
TopicPartition(topicResponse.topic(), partition.partitionIndex()), partition))

Review comment:
       Do we need this method? It seems it's the same as toResponseDataMap().

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -319,12 +355,25 @@ public int maxBytes() {
         return data.maxBytes();
     }
 
-    public Map<TopicPartition, PartitionData> fetchData() {
-        return fetchData;
+    // For versions 13+, throws UnknownTopicIdException if the topic ID was 
unknown to the server.
+    public Map<TopicPartition, PartitionData> fetchData(Map<Uuid, String> 
topicNames) throws UnknownTopicIdException {
+        if (version() < 13)
+            return fetchData;
+        return toPartitionDataMap(data.topics(), topicNames);
     }
 
-    public List<TopicPartition> toForget() {
-        return toForget;
+    // For versions 13+, throws UnknownTopicIdException if the topic ID was 
unknown to the server.
+    public List<FetchRequestData.ForgottenTopic> forgottenTopics(Map<Uuid, 
String> topicNames) throws UnknownTopicIdException {
+        if (version() >= 13) {
+            data.forgottenTopicsData().forEach(forgottenTopic -> {
+                String name = topicNames.get(forgottenTopic.topicId());
+                if (name == null) {
+                    throw new UnknownTopicIdException(String.format("Topic Id 
%s in FetchRequest was unknown to the server", forgottenTopic.topicId()));
+                }
+                
forgottenTopic.setTopic(topicNames.getOrDefault(forgottenTopic.topicId(), ""));

Review comment:
       It's a bit weird for `forgottenTopics()` to have a side effect that 
changes the internal data structure since requests are typically immutable. 
It's all not consistent with toPartitionDataMap(). If we do want to modify the 
internal data structure, we probably want to name the method more properly.
   
   Also, why do we return a list of ForgottenTopic instead of list of 
TopicPartition? The latter is easier to understand and it doesn't seem that we 
need topicId in ForgottenTopic,

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -425,16 +438,27 @@ class IncrementalFetchContext(private val time: Time,
         val topicPart = element.getKey
         val respData = element.getValue
         val cachedPart = session.partitionMap.find(new 
CachedPartition(topicPart))
-        val mustRespond = cachedPart.maybeUpdateResponseData(respData, 
updateFetchContextAndRemoveUnselected)
-        if (mustRespond) {
+
+        // If we have an situation where there is a valid ID on the partition, 
but it does not match
+        // the ID in topic IDs (likely due to topic deletion and re-creation) 
or there is no valid topic
+        // ID on the broker (topic deleted or broker received a 
metadataResponse without IDs),
+        // remove the cached partition from partitionMap and from the response.

Review comment:
       Hmm, if the topicId has changed, it seems that we should send an error 
(e.g. InvalidTopicId) back to indicate the topicId is no longer valid so that 
the client could refresh the metadata?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -820,20 +838,30 @@ class KafkaApis(val requestChannel: RequestChannel,
       def createResponse(throttleTimeMs: Int): FetchResponse = {
         // Down-convert messages for each partition if required
         val convertedData = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
-        unconvertedFetchResponse.responseData.forEach { (tp, 
unconvertedPartitionData) =>
-          val error = Errors.forCode(unconvertedPartitionData.errorCode)
-          if (error != Errors.NONE)
-            debug(s"Fetch request with correlation id 
${request.header.correlationId} from client $clientId " +
-              s"on partition $tp failed due to ${error.exceptionName}")
-          convertedData.put(tp, maybeConvertFetchedData(tp, 
unconvertedPartitionData))
+        unconvertedFetchResponse.data().responses().forEach { topicResponse =>
+          if (topicResponse.topic() != "") {

Review comment:
       When will topicResponse.topic() be ""?

##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -466,6 +487,8 @@ class ZkMetadataCache(brokerId: Int) extends MetadataCache 
with Logging {
                               topicIds: Map[String, Uuid],
                               controllerId: Option[Int],
                               aliveBrokers: mutable.LongMap[Broker],
-                              aliveNodes: 
mutable.LongMap[collection.Map[ListenerName, Node]])
+                              aliveNodes: 
mutable.LongMap[collection.Map[ListenerName, Node]]) {
+    val topicNames = topicIds.map { case (topicName, topicId) => (topicId, 
topicName) }

Review comment:
       Could we explicitly define the type of topicNames ?

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -237,14 +239,16 @@ class FetchSession(val id: Int,
   type TL = util.ArrayList[TopicPartition]
 
   // Update the cached partition data based on the request.
-  def update(fetchData: FetchSession.REQ_MAP,
-             toForget: util.List[TopicPartition],
-             reqMetadata: JFetchMetadata): (TL, TL, TL) = synchronized {
+  def update(version: Short,

Review comment:
       version seems unused?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -281,10 +296,16 @@ public String toString() {
 
     public FetchRequest(FetchRequestData fetchRequestData, short version) {
         super(ApiKeys.FETCH, version);
-        this.data = fetchRequestData;
-        this.fetchData = toPartitionDataMap(fetchRequestData.topics());
-        this.toForget = 
toForgottenTopicList(fetchRequestData.forgottenTopicsData());
-        this.metadata = new FetchMetadata(fetchRequestData.sessionId(), 
fetchRequestData.sessionEpoch());
+        if (version < 13) {
+            this.data = fetchRequestData;

Review comment:
       It seems that we could share the code to populate this.data and 
this.metadata. We only use `this` in this method. Should we just remove it?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -315,23 +397,32 @@ private String 
partitionsToLogString(Collection<TopicPartition> partitions) {
     /**
      * Verify that a full fetch response contains all the partitions in the 
fetch session.
      *
-     * @param response  The response.
-     * @return          True if the full fetch response partitions are valid.
+     * @param topicPartitions  The topicPartitions from the FetchResponse.
+     * @param ids              The topic IDs from the FetchResponse.
+     * @param version          The version of the FetchResponse.
+     * @return                 True if the full fetch response partitions are 
valid.
      */
-    String verifyFullFetchResponsePartitions(FetchResponse response) {
+    String verifyFullFetchResponsePartitions(Set<TopicPartition> 
topicPartitions, Set<Uuid> ids, short version) {
         StringBuilder bld = new StringBuilder();
         Set<TopicPartition> extra =
-            findMissing(response.responseData().keySet(), 
sessionPartitions.keySet());
+            findMissing(topicPartitions, sessionPartitions.keySet());

Review comment:
       Perhaps extra and omitted should be extraPartition and omittedPartitions 
to make it clear?

##########
File path: core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
##########
@@ -256,6 +258,9 @@ class ReplicaAlterLogDirsThread(name: String,
   private def buildFetchForPartition(tp: TopicPartition, fetchState: 
PartitionFetchState): ResultWithPartitions[Option[ReplicaFetch]] = {
     val requestMap = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
     val partitionsWithError = mutable.Set[TopicPartition]()
+    val topics = new util.HashSet[String]()

Review comment:
       Since there is only a single tp, does topics need to be a set?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -212,10 +273,19 @@ public FetchRequestData build() {
                               nextMetadata, node, 
partitionsToLogString(next.keySet()));
                 }
                 sessionPartitions = next;
+                sessionTopicIds = topicIds;
+                topicIds.forEach((name, id) -> sessionTopicNames.put(id, 
name));
                 next = null;
+                topicIds = null;
+                requestUsedTopicIds = 
sessionTopicIds.keySet().containsAll(sessionPartitions.keySet().stream().map(
+                    tp -> tp.topic()).collect(Collectors.toSet()));
                 Map<TopicPartition, PartitionData> toSend =
-                    Collections.unmodifiableMap(new 
LinkedHashMap<>(sessionPartitions));
-                return new FetchRequestData(toSend, Collections.emptyList(), 
toSend, nextMetadata);
+                        Collections.unmodifiableMap(new 
LinkedHashMap<>(sessionPartitions));
+                Map<String, Uuid> toSendTopicIds =
+                        Collections.unmodifiableMap(new 
HashMap<>(sessionTopicIds));
+                Map<Uuid, String> toSendTopicNames =

Review comment:
       Could we build the full toSendTopicIds and toSendTopicNames once and 
reuse in both full and incremental?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -125,10 +167,22 @@ public FetchSessionHandler(LogContext logContext, int 
node) {
             return sessionPartitions;
         }
 
+        public Map<String, Uuid> topicIds() {
+            return topicIds;

Review comment:
       Do the new fields need to be included in toString()?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -297,16 +380,15 @@ private String 
partitionsToLogString(Collection<TopicPartition> partitions) {
     /**
      * Return some partitions which are expected to be in a particular set, 
but which are not.
      *
-     * @param toFind    The partitions to look for.
-     * @param toSearch  The set of partitions to search.
-     * @return          null if all partitions were found; some of the missing 
ones
-     *                  in string form, if not.
+     * @param toFind    The items to look for.

Review comment:
       The above comment needs to be changed accordingly.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -127,10 +177,12 @@ public static FetchResponse parse(ByteBuffer buffer, 
short version) {
      * @return              The response size in bytes.
      */
     public static int sizeOf(short version,
-                             Iterator<Map.Entry<TopicPartition, 
FetchResponseData.PartitionData>> partIterator) {
+                             Iterator<Map.Entry<TopicPartition,
+                             FetchResponseData.PartitionData>> partIterator,
+                             Map<String, Uuid> topicIds) {

Review comment:
       Could we add the new param to javadoc?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -296,11 +317,26 @@ public AbstractResponse getErrorResponse(int 
throttleTimeMs, Throwable e) {
         // may not be any partitions at all in the response.  For this reason, 
the top-level error code
         // is essential for them.
         Errors error = Errors.forException(e);
-        LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> 
responseData = new LinkedHashMap<>();
-        for (Map.Entry<TopicPartition, PartitionData> entry : 
fetchData.entrySet()) {
-            responseData.put(entry.getKey(), 
FetchResponse.partitionResponse(entry.getKey().partition(), error));
+        if (version() < 13) {
+            LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> 
responseData = new LinkedHashMap<>();
+            for (Map.Entry<TopicPartition, PartitionData> entry : 
fetchData.entrySet()) {
+                responseData.put(entry.getKey(), 
FetchResponse.partitionResponse(entry.getKey().partition(), error));
+            }
+            return FetchResponse.of(error, throttleTimeMs, data.sessionId(), 
responseData, Collections.emptyMap());
         }
-        return FetchResponse.of(error, throttleTimeMs, data.sessionId(), 
responseData);
+        List<FetchResponseData.FetchableTopicResponse> topicResponseList = new 
ArrayList<>();

Review comment:
       It seems it will be clearer if we put this in an else clause.

##########
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##########
@@ -130,13 +149,30 @@ MetadataCache mergeWith(String newClusterId,
                             Set<String> addInvalidTopics,
                             Set<String> addInternalTopics,
                             Node newController,
+                            Map<String, Uuid> topicIds,

Review comment:
       Could we add the javadoc for the new param?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -73,6 +77,22 @@ public FetchSessionHandler(LogContext logContext, int node) {
     private LinkedHashMap<TopicPartition, PartitionData> sessionPartitions =
         new LinkedHashMap<>(0);
 
+    /**
+     * All of the topic ids mapped to topic names for topics which exist in 
the fetch request session.
+     */
+    private Map<String, Uuid> sessionTopicIds = new HashMap<>(0);
+
+    /**
+     * All of the topic names mapped to topic ids for topics which exist in 
the fetch request session.
+     */
+    private Map<Uuid, String> sessionTopicNames = new HashMap<>(0);
+
+    public Map<Uuid, String> sessionTopicNames() {
+        return sessionTopicNames;
+    }
+
+    public boolean requestUsedTopicIds = false;

Review comment:
       It seems that this can just be a local val instead of an instance val?

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -71,6 +75,7 @@ object FetchSession {
   * localLogStartOffset is the log start offset of the partition on this 
broker.
   */
 class CachedPartition(val topic: String,
+                      var topicId: Uuid,

Review comment:
       Does topicId need to be a var?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to