junrao commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r660211602



##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -471,16 +512,26 @@ class IncrementalFetchContext(private val time: Time,
       if (session.epoch != expectedEpoch) {
         info(s"Incremental fetch session ${session.id} expected epoch 
$expectedEpoch, but " +
           s"got ${session.epoch}.  Possible duplicate request.")
-        FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, 
new FetchSession.RESP_MAP)
+        FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, 
new FetchSession.RESP_MAP, Collections.emptyMap())
       } else {
+        var topLevelError = Errors.NONE
         // Iterate over the update list using PartitionIterator. This will 
prune updates which don't need to be sent
+        // It will also set the top-level error to INCONSISTENT_TOPIC_ID if 
any partitions had this error.
         val partitionIter = new PartitionIterator(updates.entrySet.iterator, 
true)
         while (partitionIter.hasNext) {
-          partitionIter.next()
+          val entry = partitionIter.next()
+          if (entry.getValue.errorCode() == 
Errors.INCONSISTENT_TOPIC_ID.code()) {

Review comment:
       > I'm still not sure I follow "pending fetch request could still 
reference the outdated Partition object and therefore miss the topicId change" 
My understanding is that the log is the source of truth and we will either read 
from the log if it matches and not read if it doesn't. I see we could get an 
error erroneously if the partition didn't update in time, but I don't see us 
being able to read from the log due to a stale partition.
   > 
   > Or are you referring to the getPartitionOrException(tp) call picking up a 
stale partition and both the request and the partition are stale? In this case, 
we will read from the log, but will identify it with its correct ID. The client 
will handle based on this.
   
   A fetch request may pass the topicId check in ReplicaManager and is about to 
call log.read(), when the topicId changes. I was wondering in that case, if 
log.read() could return data that corresponds to the old topicId. It seems 
that's not possible since Log.close() closes all segments.

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -354,38 +377,55 @@ class SessionlessFetchContext(val fetchData: 
util.Map[TopicPartition, FetchReque
   * @param cache              The fetch session cache.
   * @param reqMetadata        The request metadata.
   * @param fetchData          The partition data from the fetch request.
+  * @param usesTopicIds       True if this session should use topic IDs.
+  * @param topicIds           The map from topic names to topic IDs.
   * @param isFromFollower     True if this fetch request came from a follower.
   */
 class FullFetchContext(private val time: Time,
                        private val cache: FetchSessionCache,
                        private val reqMetadata: JFetchMetadata,
                        private val fetchData: util.Map[TopicPartition, 
FetchRequest.PartitionData],
+                       private val usesTopicIds: Boolean,
+                       private val topicIds: util.Map[String, Uuid],
                        private val isFromFollower: Boolean) extends 
FetchContext {
   override def getFetchOffset(part: TopicPartition): Option[Long] =
     Option(fetchData.get(part)).map(_.fetchOffset)
 
-  override def foreachPartition(fun: (TopicPartition, 
FetchRequest.PartitionData) => Unit): Unit = {
-    fetchData.forEach(fun(_, _))
+  override def foreachPartition(fun: (TopicPartition, Uuid, 
FetchRequest.PartitionData) => Unit): Unit = {
+    fetchData.forEach((tp, data) => fun(tp, topicIds.get(tp.topic), data))
   }
 
   override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: 
Short): Int = {
-    FetchResponse.sizeOf(versionId, updates.entrySet.iterator)
+    FetchResponse.sizeOf(versionId, updates.entrySet.iterator, topicIds)
   }
 
   override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): 
FetchResponse = {
-    def createNewSession: FetchSession.CACHE_MAP = {
+    var topLevelError = Errors.NONE
+    def createNewSession: (FetchSession.CACHE_MAP, FetchSession.TOPIC_ID_MAP) 
= {
       val cachedPartitions = new FetchSession.CACHE_MAP(updates.size)
+      val sessionTopicIds = new util.HashMap[String, Uuid](updates.size)
       updates.forEach { (part, respData) =>
+        if (respData.errorCode() == Errors.INCONSISTENT_TOPIC_ID.code()) {
+          info(s"Session encountered an inconsistent topic ID for 
topicPartition $part.")
+          topLevelError = Errors.INCONSISTENT_TOPIC_ID
+        }
         val reqData = fetchData.get(part)
-        cachedPartitions.mustAdd(new CachedPartition(part, reqData, respData))
+        val id = topicIds.getOrDefault(part.topic(), Uuid.ZERO_UUID)
+        cachedPartitions.mustAdd(new CachedPartition(part, id, reqData, 
respData))
+        if (id != Uuid.ZERO_UUID)
+          sessionTopicIds.put(part.topic, id)
       }
-      cachedPartitions
+      (cachedPartitions, sessionTopicIds)
     }
     val responseSessionId = cache.maybeCreateSession(time.milliseconds(), 
isFromFollower,
-        updates.size, () => createNewSession)
-    debug(s"Full fetch context with session id $responseSessionId returning " +
-      s"${partitionsToLogString(updates.keySet)}")
-    FetchResponse.of(Errors.NONE, 0, responseSessionId, updates)
+        updates.size, usesTopicIds, () => createNewSession)
+    if (topLevelError == Errors.INCONSISTENT_TOPIC_ID) {

Review comment:
       This can also cause a bit confusing that we are treating 
INCONSISTENT_TOPIC_ID differently from other top-level errors. Since the only 
possible top level error is INCONSISTENT_TOPIC_ID, perhaps we can change 
topLevelError to hasInconsistentTopicId. Ditto in IncrementalFetchContext.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -296,11 +276,24 @@ public AbstractResponse getErrorResponse(int 
throttleTimeMs, Throwable e) {
         // may not be any partitions at all in the response.  For this reason, 
the top-level error code
         // is essential for them.
         Errors error = Errors.forException(e);
-        LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> 
responseData = new LinkedHashMap<>();
-        for (Map.Entry<TopicPartition, PartitionData> entry : 
fetchData.entrySet()) {
-            responseData.put(entry.getKey(), 
FetchResponse.partitionResponse(entry.getKey().partition(), error));
+        List<FetchResponseData.FetchableTopicResponse> topicResponseList = new 
ArrayList<>();
+        // Since UNKNOWN_TOPIC_ID is a new error type only returned when topic 
ID requests are made (from newer clients),
+        // we can skip returning the error on all partitions and returning any 
partitions at all.
+        if (error != Errors.UNKNOWN_TOPIC_ID) {

Review comment:
       This kind of special treatment for UNKNOWN_TOPIC_ID is a bit weird. If 
you look at the comment above, the reason for setting the same error code in 
all partitions is for backward compatibility when we don't have a top level 
error code. So, we probably can just check the request version. If version is 
>=13, we just always return a top level error code with no partitions. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to