[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-10 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r746870544



##
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##
@@ -967,6 +967,113 @@ class FetchSessionTest {
   .setErrorCode(errorCode)
   }
 
+  @Test
+  def testResolveUnknownPartitions(): Unit = {
+val time = new MockTime()
+val cache = new FetchSessionCache(10, 1000)
+val fetchManager = new FetchManager(time, cache)
+
+def newContext(
+  metadata: JFetchMetadata,
+  partitions: Seq[TopicIdPartition],
+  topicNames: Map[Uuid, String] // Topic ID to name mapping known by the 
broker.
+): FetchContext = {
+  val data = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
+  partitions.foreach { topicIdPartition =>
+data.put(
+  topicIdPartition.topicPartition,
+  new FetchRequest.PartitionData(topicIdPartition.topicId, 0, 0, 100, 
Optional.empty())
+)
+  }
+
+  val fetchRequest = createRequest(metadata, data, EMPTY_PART_LIST, false)
+
+  fetchManager.newContext(
+fetchRequest.version,
+fetchRequest.metadata,
+fetchRequest.isFromFollower,
+fetchRequest.fetchData(topicNames.asJava),
+fetchRequest.forgottenTopics(topicNames.asJava),
+topicNames.asJava
+  )
+}
+
+def updateAndGenerateResponseData(
+  context: FetchContext
+): Int = {
+  val data = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData]
+  context.foreachPartition { (topicIdPartition, _) =>
+data.put(
+  topicIdPartition,
+  if (topicIdPartition.topicId == Uuid.ZERO_UUID)

Review comment:
   That could be  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-10 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r746852321



##
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##
@@ -967,6 +967,113 @@ class FetchSessionTest {
   .setErrorCode(errorCode)
   }
 
+  @Test

Review comment:
   Yeah, that is not really necessary as you said. I don't mind if you 
remote it.

##
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##
@@ -967,6 +967,113 @@ class FetchSessionTest {
   .setErrorCode(errorCode)
   }
 
+  @Test

Review comment:
   Yeah, that is not really necessary as you said. I don't mind if you 
remove it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-10 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r746826740



##
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##
@@ -967,6 +967,113 @@ class FetchSessionTest {
   .setErrorCode(errorCode)
   }
 
+  @Test

Review comment:
   What do you mean?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-10 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r746805054



##
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##
@@ -967,6 +967,113 @@ class FetchSessionTest {
   .setErrorCode(errorCode)
   }
 
+  @Test

Review comment:
   Right. The question is how to validate that the first update method 
works? You have to get the partitions from the session as well, isn't it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-10 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r746803000



##
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##
@@ -877,34 +935,341 @@ class FetchSessionTest {
 
 // Create an incremental fetch request as though no topics changed.
 val reqData2 = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
-val request2 = createRequest(new JFetchMetadata(resp1.sessionId(), 1), 
reqData2, topicIds, EMPTY_PART_LIST, false)
+val request2 = createRequest(new JFetchMetadata(resp1.sessionId(), 1), 
reqData2, EMPTY_PART_LIST, false)
 // Simulate ID changing on server.
 val topicNamesFooChanged =  Map(topicIds.get("bar") -> "bar", 
Uuid.randomUuid() -> "foo").asJava
-val topicIdsFooChanged = topicNamesFooChanged.asScala.map(_.swap).asJava
 val context2 = fetchManager.newContext(
   request2.version,
   request2.metadata,
   request2.isFromFollower,
   request2.fetchData(topicNamesFooChanged),
   request2.forgottenTopics(topicNamesFooChanged),
-  topicIdsFooChanged
+  topicNamesFooChanged
 )
 assertEquals(classOf[IncrementalFetchContext], context2.getClass)
-val respData2 = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
+val respData2 = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData]
 // Likely if the topic ID is different in the broker, it will be different 
in the log. Simulate the log check finding an inconsistent ID.
-respData2.put(new TopicPartition("foo", 0), new 
FetchResponseData.PartitionData()
+respData2.put(tp0, new FetchResponseData.PartitionData()
   .setPartitionIndex(0)
   .setHighWatermark(-1)
   .setLastStableOffset(-1)
   .setLogStartOffset(-1)
   .setErrorCode(Errors.INCONSISTENT_TOPIC_ID.code))
 val resp2 = context2.updateAndGenerateResponseData(respData2)
 
-assertEquals(Errors.INCONSISTENT_TOPIC_ID, resp2.error)
+assertEquals(Errors.NONE, resp2.error)
+assertTrue(resp2.sessionId > 0)
+val responseData2 = resp2.responseData(topicNames, request2.version)
+// We should have the inconsistent topic ID error on the partition
+assertEquals(Errors.INCONSISTENT_TOPIC_ID.code, 
responseData2.get(tp0.topicPartition).errorCode)
+  }
+
+  private def noErrorResponse: FetchResponseData.PartitionData = {
+new FetchResponseData.PartitionData()
+  .setPartitionIndex(1)
+  .setHighWatermark(10)
+  .setLastStableOffset(10)
+  .setLogStartOffset(10)
+  }
+
+  private def errorResponse(errorCode: Short): FetchResponseData.PartitionData 
 = {
+new FetchResponseData.PartitionData()
+  .setPartitionIndex(0)
+  .setHighWatermark(-1)
+  .setLastStableOffset(-1)
+  .setLogStartOffset(-1)
+  .setErrorCode(errorCode)
+  }
+
+  @Test
+  def testResolveUnknownPartitions(): Unit = {
+val time = new MockTime()
+val cache = new FetchSessionCache(10, 1000)
+val fetchManager = new FetchManager(time, cache)
+
+def newContext(
+  metadata: JFetchMetadata,
+  partitions: Seq[TopicIdPartition],
+  topicNames: Map[Uuid, String] // Topic ID to name mapping known by the 
broker.
+): FetchContext = {
+  val data = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
+  partitions.foreach { topicIdPartition =>
+data.put(
+  topicIdPartition.topicPartition,
+  new FetchRequest.PartitionData(topicIdPartition.topicId, 0, 0, 100, 
Optional.empty())
+)
+  }
+
+  val fetchRequest = createRequest(metadata, data, EMPTY_PART_LIST, false)
+
+  fetchManager.newContext(
+fetchRequest.version,
+fetchRequest.metadata,
+fetchRequest.isFromFollower,
+fetchRequest.fetchData(topicNames.asJava),
+fetchRequest.forgottenTopics(topicNames.asJava),
+topicNames.asJava
+  )
+}
+
+def updateAndGenerateResponseData(
+  context: FetchContext
+): Int = {
+  val data = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData]
+  context.foreachPartition { (topicIdPartition, _) =>
+data.put(
+  topicIdPartition,
+  if (topicIdPartition.topicId == Uuid.ZERO_UUID)
+errorResponse(Errors.UNKNOWN_TOPIC_ID.code)
+  else
+noErrorResponse
+)
+  }
+  context.updateAndGenerateResponseData(data).sessionId
+}
+
+val foo = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0))
+val bar = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("bar", 0))
+val zar = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("zar", 0))
+
+val fooUnresolved = new TopicIdPartition(foo.topicId, new 
TopicPartition(null, foo.partition))
+val barUnresolved = new TopicIdPartition(bar.topicId, new 
TopicPartition(null, bar.partition))
+val 

[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-10 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r746461644



##
File path: 
clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java
##
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class FetchRequestTest {
+
+private static Stream fetchVersions() {
+return ApiKeys.FETCH.allVersions().stream().map(version -> 
Arguments.of(version));
+}
+
+@ParameterizedTest
+@MethodSource("fetchVersions")
+public void testToReplaceWithDifferentVersions(short version) {
+
+Uuid topicId = Uuid.randomUuid();
+TopicPartition tp = new TopicPartition("topic", 0);
+TopicIdPartition tidp = new TopicIdPartition(topicId, tp);
+
+Map partitionData = 
Collections.singletonMap(tp,
+new FetchRequest.PartitionData(topicId, 0, 0, 0, 
Optional.empty()));
+List toReplace = Collections.singletonList(tidp);
+boolean fetchRequestUsesTopicIds = version >= 13;
+
+FetchRequest fetchRequest = FetchRequest.Builder
+.forReplica(version, 0, 1, 1, partitionData)
+.removed(Collections.emptyList())
+.replaced(toReplace)
+.metadata(FetchMetadata.newIncremental(123)).build(version);
+
+// If version < 13, we should not see any partitions in 
forgottenTopics. This is because we can not
+// distinguish different topic IDs on versions earlier than 13.
+assertEquals(fetchRequestUsesTopicIds, 
fetchRequest.data().forgottenTopicsData().size() > 0);
+assertEquals(1, fetchRequest.data().topics().size());
+}
+
+@ParameterizedTest
+@MethodSource("fetchVersions")
+public void testFetchData(short version) {
+TopicPartition topicPartition0 = new TopicPartition("topic", 0);
+TopicPartition topicPartition1 = new TopicPartition("unknownIdTopic", 
0);
+Uuid topicId0 = Uuid.randomUuid();
+Uuid topicId1 = Uuid.randomUuid();
+
+// Only include topic IDs for the first topic partition.
+Map topicNames = Collections.singletonMap(topicId0, 
topicPartition0.topic());
+List topicIdPartitions = new LinkedList<>();
+topicIdPartitions.add(new TopicIdPartition(topicId0, topicPartition0));
+topicIdPartitions.add(new TopicIdPartition(topicId1, topicPartition1));
+
+// Include one topic with topic IDs in the topic names map and one 
without.
+Map partitionData = new 
LinkedHashMap<>();
+partitionData.put(topicPartition0, new 
FetchRequest.PartitionData(topicId0, 0, 0, 0, Optional.empty()));
+partitionData.put(topicPartition1, new 
FetchRequest.PartitionData(topicId1, 0, 0, 0, Optional.empty()));
+boolean fetchRequestUsesTopicIds = version >= 13;
+
+FetchRequest fetchRequest = FetchRequest.parse(FetchRequest.Builder
+.forReplica(version, 0, 1, 1, partitionData)
+.removed(Collections.emptyList())
+.replaced(Collections.emptyList())
+
.metadata(FetchMetadata.newIncremental(123)).build(version).serialize(), 
version);
+
+// For versions < 13, we will be provided a topic name and a zero UUID 
in FetchRequestData.
+// Versions 13+ will contain a valid topic ID but an empty topic name.
+List 

[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-10 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r746423986



##
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##
@@ -2440,6 +2442,57 @@ class KafkaApisTest {
 assertNull(partitionData.abortedTransactions)
   }
 
+  /**
+   * Verifies that partitions with unknown topic ID errors are added to the 
erroneous set and there is not an attempt to fetch them.
+   */
+  @Test
+  def testFetchRequestErroneousPartitions(): Unit = {
+val tidp = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0))
+val tp = tidp.topicPartition
+val nullTp = new TopicPartition(null, tp.partition)
+addTopicToMetadataCache(tp.topic, 1, topicId = tidp.topicId)
+
+expect(replicaManager.getLogConfig(EasyMock.eq(nullTp))).andReturn(None)
+
+// Simulate unknown topic ID in the context
+val fetchData = Map(new TopicIdPartition(tidp.topicId(), new 
TopicPartition(null, tidp.partition)) ->
+  new FetchRequest.PartitionData(tidp.topicId, 0, 0, 1000, 
Optional.empty())).asJava
+val fetchDataBuilder = Map(tp -> new 
FetchRequest.PartitionData(tidp.topicId, 0, 0, 1000,
+  Optional.empty())).asJava
+val fetchMetadata = new JFetchMetadata(0, 0)
+val fetchContext = new FullFetchContext(time, new FetchSessionCache(1000, 
100),
+  fetchMetadata, fetchData, false, false)
+expect(fetchManager.newContext(
+  anyObject[Short],
+  anyObject[JFetchMetadata],
+  anyObject[Boolean],
+  anyObject[util.Map[TopicIdPartition, FetchRequest.PartitionData]],
+  anyObject[util.List[TopicIdPartition]],
+  anyObject[util.Map[Uuid, String]])).andReturn(fetchContext)

Review comment:
   We usually prefer to not use `any*` but to rather provide the expected 
values.

##
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##
@@ -1361,102 +1732,197 @@ class FetchSessionTest {
 val resp4 = context2.updateAndGenerateResponseData(respData)
 assertEquals(Errors.NONE, resp4.error)
 assertEquals(resp1.sessionId, resp4.sessionId)
-assertEquals(Utils.mkSet(tp1, tp2), resp4.responseData(topicNames, 
request2.version).keySet)
+assertEquals(Utils.mkSet(tp1.topicPartition, tp2.topicPartition), 
resp4.responseData(topicNames, request2.version).keySet)
   }
 
   @Test
   def testDeprioritizesPartitionsWithRecordsOnly(): Unit = {
 val time = new MockTime()
 val cache = new FetchSessionCache(10, 1000)
 val fetchManager = new FetchManager(time, cache)
-val tp1 = new TopicPartition("foo", 1)
-val tp2 = new TopicPartition("bar", 2)
-val tp3 = new TopicPartition("zar", 3)
 val topicIds = Map("foo" -> Uuid.randomUuid(), "bar" -> Uuid.randomUuid(), 
"zar" -> Uuid.randomUuid()).asJava
 val topicNames = topicIds.asScala.map(_.swap).asJava
+val tp1 = new TopicIdPartition(topicIds.get("foo"), new 
TopicPartition("foo", 1))
+val tp2 = new TopicIdPartition(topicIds.get("bar"), new 
TopicPartition("bar", 2))
+val tp3 = new TopicIdPartition(topicIds.get("zar"), new 
TopicPartition("zar", 3))
 
-val reqData = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
-reqData.put(tp1, new FetchRequest.PartitionData(100, 0, 1000, 
Optional.of(5), Optional.of(4)))
-reqData.put(tp2, new FetchRequest.PartitionData(100, 0, 1000, 
Optional.of(5), Optional.of(4)))
-reqData.put(tp3, new FetchRequest.PartitionData(100, 0, 1000, 
Optional.of(5), Optional.of(4)))
+val reqData = new util.LinkedHashMap[TopicIdPartition, 
FetchRequest.PartitionData]
+reqData.put(tp1, new FetchRequest.PartitionData(tp1.topicId, 100, 0, 1000, 
Optional.of(5), Optional.of(4)))
+reqData.put(tp2, new FetchRequest.PartitionData(tp2.topicId, 100, 0, 1000, 
Optional.of(5), Optional.of(4)))
+reqData.put(tp3, new FetchRequest.PartitionData(tp3.topicId, 100, 0, 1000, 
Optional.of(5), Optional.of(4)))
 
 // Full fetch context returns all partitions in the response
 val context1 = fetchManager.newContext(ApiKeys.FETCH.latestVersion(), 
JFetchMetadata.INITIAL, false,
- reqData, Collections.emptyList(), topicIds)
+ reqData, Collections.emptyList(), topicNames)
 assertEquals(classOf[FullFetchContext], context1.getClass)
 
-val respData1 = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
+val respData1 = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData]
 respData1.put(tp1, new FetchResponseData.PartitionData()
-  .setPartitionIndex(tp1.partition)
+  .setPartitionIndex(tp1.topicPartition.partition)
   .setHighWatermark(50)
   .setLastStableOffset(50)
   .setLogStartOffset(0))
 respData1.put(tp2, new FetchResponseData.PartitionData()
-  .setPartitionIndex(tp2.partition)
+  .setPartitionIndex(tp2.topicPartition.partition)
   .setHighWatermark(50)
   .setLastStableOffset(50)
   

[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-10 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r746420015



##
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##
@@ -2440,6 +2442,57 @@ class KafkaApisTest {
 assertNull(partitionData.abortedTransactions)
   }
 
+  /**
+   * Verifies that partitions with unknown topic ID errors are added to the 
erroneous set and there is not an attempt to fetch them.
+   */
+  @Test
+  def testFetchRequestErroneousPartitions(): Unit = {
+val tidp = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0))
+val tp = tidp.topicPartition
+val nullTp = new TopicPartition(null, tp.partition)

Review comment:
   Could we simplify all of that by defining two `TopicIdPartition`? For 
instance, we could have the following:
   
   ```
   val foo = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 
0));
   val unresolvedFoo = new TopicIdPartition(foo.topicId, new 
TopicPartition(null, foo.partition));
   ```
   
   Then, we can use them where we need them.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-10 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r746404173



##
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##
@@ -967,6 +967,113 @@ class FetchSessionTest {
   .setErrorCode(errorCode)
   }
 
+  @Test

Review comment:
   I wrote that test to illustrate how we could improve the readability. My 
concern is that they are so many lines/assertions in 
`testUpdatedPartitionResolvesId` and `testToForgetCases` that we get distracted 
and we have almost missed the most important assertions - the ones which 
validate what the session contains (`assertPartitionsOrder`). 
`assertPartitionsOrder` is actually the piece which ensures that the names are 
resolved or not, right?
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-10 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r746376488



##
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##
@@ -794,25 +793,26 @@ class ReplicaManagerTest {
 
   // We receive one valid request from the follower and replica state is 
updated
   var successfulFetch: Option[FetchPartitionData] = None
-  def callback(response: Seq[(TopicPartition, FetchPartitionData)]): Unit 
= {
-successfulFetch = response.headOption.filter { case (topicPartition, 
_) => topicPartition == tp }.map { case (_, data) => data }
-  }
 
-  val validFetchPartitionData = new FetchRequest.PartitionData(0L, 0L, 
maxFetchBytes,
+  val validFetchPartitionData = new 
FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L, 0L, maxFetchBytes,
 Optional.of(leaderEpoch))
 
   // Fetch messages simulating a different ID than the one in the log.
+  val inconsistentTidp = new TopicIdPartition(Uuid.randomUuid(), 
tidp.topicPartition)
+  def callback1(response: Seq[(TopicIdPartition, FetchPartitionData)]): 
Unit = {
+// Check the topic partition only since we are reusing this callback 
on different TopicIdPartitions.
+successfulFetch = response.headOption.filter(_._1 == 
inconsistentTidp).map(_._2)
+  }

Review comment:
   Yeah, that works as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-09 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r746051435



##
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##
@@ -794,25 +793,26 @@ class ReplicaManagerTest {
 
   // We receive one valid request from the follower and replica state is 
updated
   var successfulFetch: Option[FetchPartitionData] = None
-  def callback(response: Seq[(TopicPartition, FetchPartitionData)]): Unit 
= {
-successfulFetch = response.headOption.filter { case (topicPartition, 
_) => topicPartition == tp }.map { case (_, data) => data }
-  }
 
-  val validFetchPartitionData = new FetchRequest.PartitionData(0L, 0L, 
maxFetchBytes,
+  val validFetchPartitionData = new 
FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L, 0L, maxFetchBytes,
 Optional.of(leaderEpoch))
 
   // Fetch messages simulating a different ID than the one in the log.
+  val inconsistentTidp = new TopicIdPartition(Uuid.randomUuid(), 
tidp.topicPartition)
+  def callback1(response: Seq[(TopicIdPartition, FetchPartitionData)]): 
Unit = {
+// Check the topic partition only since we are reusing this callback 
on different TopicIdPartitions.
+successfulFetch = response.headOption.filter(_._1 == 
inconsistentTidp).map(_._2)
+  }

Review comment:
   Correct. The method would return the full response. Then we can assert 
it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-09 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r745965419



##
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##
@@ -794,25 +793,26 @@ class ReplicaManagerTest {
 
   // We receive one valid request from the follower and replica state is 
updated
   var successfulFetch: Option[FetchPartitionData] = None
-  def callback(response: Seq[(TopicPartition, FetchPartitionData)]): Unit 
= {
-successfulFetch = response.headOption.filter { case (topicPartition, 
_) => topicPartition == tp }.map { case (_, data) => data }
-  }
 
-  val validFetchPartitionData = new FetchRequest.PartitionData(0L, 0L, 
maxFetchBytes,
+  val validFetchPartitionData = new 
FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L, 0L, maxFetchBytes,
 Optional.of(leaderEpoch))
 
   // Fetch messages simulating a different ID than the one in the log.
+  val inconsistentTidp = new TopicIdPartition(Uuid.randomUuid(), 
tidp.topicPartition)
+  def callback1(response: Seq[(TopicIdPartition, FetchPartitionData)]): 
Unit = {
+// Check the topic partition only since we are reusing this callback 
on different TopicIdPartitions.
+successfulFetch = response.headOption.filter(_._1 == 
inconsistentTidp).map(_._2)
+  }

Review comment:
   Hum.. I was thinking that the method would return the partitions and we 
would do the assertion after. That would make the helper generic enough to be 
reused in other places as well. I guess that either ways would work.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-09 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r745963579



##
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##
@@ -659,88 +670,125 @@ class FetchSessionTest {
   }
 
   @Test
-  def testIncrementalFetchSessionWithIdsWhenSessionDoesNotUseIds() : Unit = {
+  def testFetchSessionWithUnknownId(): Unit = {
 val time = new MockTime()
 val cache = new FetchSessionCache(10, 1000)
 val fetchManager = new FetchManager(time, cache)
-val topicIds = new util.HashMap[String, Uuid]()
-val topicNames = new util.HashMap[Uuid, String]()
+val fooId = Uuid.randomUuid()
+val barId = Uuid.randomUuid()
+val zarId = Uuid.randomUuid()
+val topicNames = Map(fooId -> "foo", barId -> "bar").asJava
+val foo0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0))
+val foo1 = new TopicIdPartition(fooId, new TopicPartition("foo", 1))
+val zar0 = new TopicIdPartition(zarId, new TopicPartition("zar", 0))
+val emptyFoo0 = new TopicIdPartition(fooId, new TopicPartition(null, 0))
+val emptyFoo1 = new TopicIdPartition(fooId, new TopicPartition(null, 1))
+val emptyZar0 = new TopicIdPartition(zarId, new TopicPartition(null, 0))
 
-// Create a new fetch session with foo-0
+// Create a new fetch session with foo-0 and foo-1
 val reqData1 = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
-reqData1.put(new TopicPartition("foo", 0), new 
FetchRequest.PartitionData(0, 0, 100,
+reqData1.put(foo0.topicPartition, new 
FetchRequest.PartitionData(foo0.topicId, 0, 0, 100,
   Optional.empty()))
-val request1 = createRequestWithoutTopicIds(JFetchMetadata.INITIAL, 
reqData1, topicIds, EMPTY_PART_LIST, false)
-// Start a fetch session using a request version that does not use topic 
IDs.
+reqData1.put(foo1.topicPartition, new 
FetchRequest.PartitionData(foo1.topicId,10, 0, 100,
+  Optional.empty()))
+reqData1.put(zar0.topicPartition, new 
FetchRequest.PartitionData(zar0.topicId,10, 0, 100,
+  Optional.empty()))
+val request1 = createRequest(JFetchMetadata.INITIAL, reqData1, 
EMPTY_PART_LIST, false)
+// Simulate unknown topic ID for foo.
+val topicNamesOnlyBar = Collections.singletonMap(barId, "bar")
+// We should not throw error since we have an older request version.
 val context1 = fetchManager.newContext(
   request1.version,
   request1.metadata,
   request1.isFromFollower,
-  request1.fetchData(topicNames),
-  request1.forgottenTopics(topicNames),
-  topicIds
+  request1.fetchData(topicNamesOnlyBar),
+  request1.forgottenTopics(topicNamesOnlyBar),
+  topicNamesOnlyBar
 )
 assertEquals(classOf[FullFetchContext], context1.getClass)
-val respData1 = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
-respData1.put(new TopicPartition("foo", 0), new 
FetchResponseData.PartitionData()
+assertPartitionsOrder(context1, Seq(emptyFoo0, emptyFoo1, emptyZar0))
+val respData1 = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData]
+respData1.put(emptyFoo0, new FetchResponseData.PartitionData()
   .setPartitionIndex(0)
-  .setHighWatermark(100)
-  .setLastStableOffset(100)
-  .setLogStartOffset(100))
+  .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code))
+respData1.put(emptyFoo1, new FetchResponseData.PartitionData()
+  .setPartitionIndex(1)
+  .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code))
+respData1.put(emptyZar0, new FetchResponseData.PartitionData()
+  .setPartitionIndex(1)
+  .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code))
 val resp1 = context1.updateAndGenerateResponseData(respData1)
+// On the latest request version, we should have unknown topic ID errors.
 assertEquals(Errors.NONE, resp1.error())
 assertTrue(resp1.sessionId() != INVALID_SESSION_ID)
+assertEquals(2, resp1.responseData(topicNames, request1.version).size)
+resp1.responseData(topicNames, request1.version).forEach( (_, resp) => 
assertEquals(Errors.UNKNOWN_TOPIC_ID.code, resp.errorCode))
 
-// Create an incremental fetch request as though no topics changed. 
However, send a v13 request.
-// Also simulate the topic ID found on the server.
-val fooId = Uuid.randomUuid()
-topicIds.put("foo", fooId)
-topicNames.put(fooId, "foo")
+// Create an incremental request where we resolve the partitions
 val reqData2 = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
-val request2 = createRequest(new JFetchMetadata(resp1.sessionId(), 1), 
reqData2, topicIds, EMPTY_PART_LIST, false)
+val request2 = createRequest(new JFetchMetadata(resp1.sessionId(), 1), 
reqData2, EMPTY_PART_LIST, false)
 val context2 = fetchManager.newContext(
   request2.version,
   request2.metadata,
   request2.isFromFollower,
   

[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-09 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r745871819



##
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##
@@ -428,82 +432,182 @@ public void testIdUsageRevokedOnIdDowngrade() {
 List partitions = Arrays.asList(0, 1);
 partitions.forEach(partition -> {
 String testType = partition == 0 ? "updating a partition" : 
"adding a new partition";
-Map topicIds = Collections.singletonMap("foo", 
Uuid.randomUuid());
+Uuid fooId = Uuid.randomUuid();
 FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
 FetchSessionHandler.Builder builder = handler.newBuilder();
-builder.add(new TopicPartition("foo", 0),  topicIds.get("foo"),
-new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
+builder.add(new TopicPartition("foo", 0),
+new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
 FetchSessionHandler.FetchRequestData data = builder.build();
-assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
 data.toSend(), data.sessionPartitions());
 assertTrue(data.metadata().isFull());
 assertTrue(data.canUseTopicIds());
 
 FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
+respMap(new RespEntry("foo", 0, fooId, 10, 20)));
 handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
 // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
 FetchSessionHandler.Builder builder2 = handler.newBuilder();
-builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
-new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+builder2.add(new TopicPartition("foo", partition),
+new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 
210, Optional.empty()));
 FetchSessionHandler.FetchRequestData data2 = builder2.build();
-// Should have the same session ID and next epoch, but can no 
longer use topic IDs.
-// The receiving broker will close the session if we were 
previously using topic IDs.
+// Should have the same session ID, and next epoch and can no 
longer use topic IDs.
+// The receiving broker will handle closing the session.
 assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
 assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
 assertFalse(data2.canUseTopicIds());
 });
 }
 
+private static Stream idUsageCombinations() {
+return Stream.of(
+Arguments.of(true, true),
+Arguments.of(true, false),
+Arguments.of(false, true),
+Arguments.of(false, false)
+);
+}
+
+@ParameterizedTest
+@MethodSource("idUsageCombinations")
+public void testTopicIdReplaced(boolean startsWithTopicIds, boolean 
endsWithTopicIds) {
+TopicPartition tp = new TopicPartition("foo", 0);
+FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+FetchSessionHandler.Builder builder = handler.newBuilder();
+Uuid topicId1 = startsWithTopicIds ? Uuid.randomUuid() : 
Uuid.ZERO_UUID;
+builder.add(tp, new FetchRequest.PartitionData(topicId1, 0, 100, 200, 
Optional.empty()));
+FetchSessionHandler.FetchRequestData data = builder.build();
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)),
+data.toSend(), data.sessionPartitions());
+assertTrue(data.metadata().isFull());
+assertEquals(startsWithTopicIds, data.canUseTopicIds());
+
+FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, respMap(new 
RespEntry("foo", 0, topicId1, 10, 20)));
+short version = startsWithTopicIds ? ApiKeys.FETCH.latestVersion() : 
12;
+handler.handleResponse(resp, version);
+
+// Try to add a new topic ID.
+FetchSessionHandler.Builder builder2 = handler.newBuilder();
+Uuid topicId2 = endsWithTopicIds ? Uuid.randomUuid() : Uuid.ZERO_UUID;
+// Use the same data besides the topic ID.
+FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty());
+builder2.add(tp, partitionData);
+FetchSessionHandler.FetchRequestData data2 = builder2.build();
+if (startsWithTopicIds && endsWithTopicIds) {
+// 

[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-09 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r745869793



##
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##
@@ -428,82 +432,182 @@ public void testIdUsageRevokedOnIdDowngrade() {
 List partitions = Arrays.asList(0, 1);
 partitions.forEach(partition -> {
 String testType = partition == 0 ? "updating a partition" : 
"adding a new partition";
-Map topicIds = Collections.singletonMap("foo", 
Uuid.randomUuid());
+Uuid fooId = Uuid.randomUuid();
 FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
 FetchSessionHandler.Builder builder = handler.newBuilder();
-builder.add(new TopicPartition("foo", 0),  topicIds.get("foo"),
-new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
+builder.add(new TopicPartition("foo", 0),
+new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
 FetchSessionHandler.FetchRequestData data = builder.build();
-assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
 data.toSend(), data.sessionPartitions());
 assertTrue(data.metadata().isFull());
 assertTrue(data.canUseTopicIds());
 
 FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
+respMap(new RespEntry("foo", 0, fooId, 10, 20)));
 handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
 // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
 FetchSessionHandler.Builder builder2 = handler.newBuilder();
-builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
-new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+builder2.add(new TopicPartition("foo", partition),
+new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 
210, Optional.empty()));
 FetchSessionHandler.FetchRequestData data2 = builder2.build();
-// Should have the same session ID and next epoch, but can no 
longer use topic IDs.
-// The receiving broker will close the session if we were 
previously using topic IDs.
+// Should have the same session ID, and next epoch and can no 
longer use topic IDs.
+// The receiving broker will handle closing the session.
 assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
 assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
 assertFalse(data2.canUseTopicIds());
 });
 }
 
+private static Stream idUsageCombinations() {
+return Stream.of(
+Arguments.of(true, true),
+Arguments.of(true, false),
+Arguments.of(false, true),
+Arguments.of(false, false)
+);
+}
+
+@ParameterizedTest
+@MethodSource("idUsageCombinations")
+public void testTopicIdReplaced(boolean startsWithTopicIds, boolean 
endsWithTopicIds) {
+TopicPartition tp = new TopicPartition("foo", 0);
+FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+FetchSessionHandler.Builder builder = handler.newBuilder();
+Uuid topicId1 = startsWithTopicIds ? Uuid.randomUuid() : 
Uuid.ZERO_UUID;
+builder.add(tp, new FetchRequest.PartitionData(topicId1, 0, 100, 200, 
Optional.empty()));
+FetchSessionHandler.FetchRequestData data = builder.build();
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)),
+data.toSend(), data.sessionPartitions());
+assertTrue(data.metadata().isFull());
+assertEquals(startsWithTopicIds, data.canUseTopicIds());
+
+FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, respMap(new 
RespEntry("foo", 0, topicId1, 10, 20)));
+short version = startsWithTopicIds ? ApiKeys.FETCH.latestVersion() : 
12;
+handler.handleResponse(resp, version);
+
+// Try to add a new topic ID.
+FetchSessionHandler.Builder builder2 = handler.newBuilder();
+Uuid topicId2 = endsWithTopicIds ? Uuid.randomUuid() : Uuid.ZERO_UUID;
+// Use the same data besides the topic ID.
+FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty());
+builder2.add(tp, partitionData);
+FetchSessionHandler.FetchRequestData data2 = builder2.build();
+if (startsWithTopicIds && endsWithTopicIds) {
+// 

[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-08 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r745060151



##
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##
@@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() {
 List partitions = Arrays.asList(0, 1);
 partitions.forEach(partition -> {
 String testType = partition == 0 ? "updating a partition" : 
"adding a new partition";
-Map topicIds = Collections.singletonMap("foo", 
Uuid.randomUuid());
+Uuid fooId = Uuid.randomUuid();
 FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
 FetchSessionHandler.Builder builder = handler.newBuilder();
-builder.add(new TopicPartition("foo", 0),  topicIds.get("foo"),
-new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
+builder.add(new TopicPartition("foo", 0),
+new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
 FetchSessionHandler.FetchRequestData data = builder.build();
-assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
 data.toSend(), data.sessionPartitions());
 assertTrue(data.metadata().isFull());
 assertTrue(data.canUseTopicIds());
 
 FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
+respMap(new RespEntry("foo", 0, fooId, 10, 20)));
 handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
 // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
 FetchSessionHandler.Builder builder2 = handler.newBuilder();
-builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
-new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+builder2.add(new TopicPartition("foo", partition),
+new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 
210, Optional.empty()));
 FetchSessionHandler.FetchRequestData data2 = builder2.build();
-// Should have the same session ID and next epoch, but can no 
longer use topic IDs.
-// The receiving broker will close the session if we were 
previously using topic IDs.
+// Should have the same session ID, and next epoch and can no 
longer use topic IDs.
+// The receiving broker will handle closing the session.
 assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
 assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
 assertFalse(data2.canUseTopicIds());
 });
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testTopicIdReplaced(boolean fetchRequestUsesIds) {
+TopicPartition tp = new TopicPartition("foo", 0);
+FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+FetchSessionHandler.Builder builder = handler.newBuilder();
+Uuid topicId1 = Uuid.randomUuid();
+builder.add(tp,
+new FetchRequest.PartitionData(topicId1, 0, 100, 200, 
Optional.empty()));
+FetchSessionHandler.FetchRequestData data = builder.build();
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)),
+data.toSend(), data.sessionPartitions());
+assertTrue(data.metadata().isFull());
+assertTrue(data.canUseTopicIds());
+
+FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
+respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20)));
+handler.handleResponse(resp, (short) 12);
+
+// Try to add a new topic ID.
+FetchSessionHandler.Builder builder2 = handler.newBuilder();
+Uuid topicId2 = Uuid.randomUuid();
+// Use the same data besides the topic ID.
+FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty());
+builder2.add(tp, partitionData);
+FetchSessionHandler.FetchRequestData data2 = builder2.build();
+// The old topic ID partition should be in toReplace, and the new one 
should be in toSend.
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)),
+data2.toSend(), data2.sessionPartitions());
+assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, 
tp)), data2.toReplace());
+// Should have the same session ID, and next epoch and can use topic 
IDs.
+assertEquals(123, data2.metadata().sessionId(), "Did not use same 

[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-08 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r745050475



##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -238,11 +237,13 @@ public String toString() {
  * incremental fetch requests (see below).
  */
 private LinkedHashMap next;
+private Map topicNames;
 private final boolean copySessionPartitions;
 private int partitionsWithoutTopicIds = 0;
 
 Builder() {
 this.next = new LinkedHashMap<>();
+this.topicNames = new HashMap<>();

Review comment:
   Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-08 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r745050242



##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -340,11 +340,9 @@ public FetchRequestData build() {
 // Add topic IDs to session if we can use them. If an ID is 
inconsistent, we will handle in the receiving broker.
 // If we switched from using topic IDs to not using them (or vice 
versa), that error will also be handled in the receiving broker.
 if (canUseTopicIds) {
-Map> newTopicNames = 
added.stream().collect(Collectors.groupingByConcurrent(TopicIdPartition::topicId,
-Collectors.mapping(topicIdPartition -> 
topicIdPartition.topicPartition().topic(), Collectors.toSet(;
-
-// There should only be one topic name per topic ID.
-newTopicNames.forEach((topicId, topicNamesSet) -> 
topicNamesSet.forEach(topicName -> sessionTopicNames.put(topicId, topicName)));
+sessionTopicNames = topicNames;
+} else {
+sessionTopicNames = Collections.emptyMap();

Review comment:
   That is a good question. I thought that it is better to empty the map if 
we don't use topic ids instead of keeping a out-of-date mapping. What do you 
think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-08 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r744942958



##
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##
@@ -305,9 +304,10 @@ class ReplicaFetcherThread(name: String,
 } else {
   val version: Short = if (fetchRequestVersion >= 13 && 
!fetchData.canUseTopicIds) 12 else fetchRequestVersion
   val requestBuilder = FetchRequest.Builder
-.forReplica(version, replicaId, maxWait, minBytes, fetchData.toSend, 
fetchData.topicIds)
+.forReplica(version, replicaId, maxWait, minBytes, fetchData.toSend)
 .setMaxBytes(maxBytes)
-.toForget(fetchData.toForget)
+.removed(fetchData.toForget)
+.replaced(fetchData.toReplace)

Review comment:
   Right. Here I would like to have tests which ensure that the Builder is 
fed correctly based on the FetchSessionHandler's data.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-08 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r744892585



##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -301,7 +299,9 @@ public FetchRequestData build() {
 if (nextData != null) {
 // We basically check if the new partition had the same 
topic ID. If not,
 // we add it to the "replaced" set.
-if (!prevData.topicId.equals(nextData.topicId) && 
!prevData.topicId.equals(Uuid.ZERO_UUID)) {
+if (!prevData.topicId.equals(nextData.topicId)
+&& !prevData.topicId.equals(Uuid.ZERO_UUID)
+&& !nextData.topicId.equals(Uuid.ZERO_UUID)) {

Review comment:
   Without this, when a topic id is set back to "zero", the former topic id 
is added to the replaced set which is a bit unintuitive, I think. In the end, 
it does not matter too much because the version is downgraded in this case so 
the replaced set is ignored. I was debating if it worth handling this case 
explicitly here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-08 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r744877634



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -413,8 +413,20 @@ abstract class AbstractFetcherThread(name: String,
 
 case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
   warn(s"Received ${Errors.UNKNOWN_TOPIC_OR_PARTITION} from 
the leader for partition $topicPartition. " +
-   "This error may be returned transiently when the 
partition is being created or deleted, but it is not " +
-   "expected to persist.")
+"This error may be returned transiently when the partition 
is being created or deleted, but it is not " +
+"expected to persist.")
+  partitionsWithError += topicPartition
+
+case Errors.UNKNOWN_TOPIC_ID =>
+  warn(s"Received ${Errors.UNKNOWN_TOPIC_ID} from the leader 
for partition $topicPartition. " +
+"This error may be returned transiently when the partition 
is being created or deleted, but it is not " +
+"expected to persist.")
+  partitionsWithError += topicPartition
+
+case Errors.INCONSISTENT_TOPIC_ID =>
+  warn(s"Received ${Errors.INCONSISTENT_TOPIC_ID} from the 
leader for partition $topicPartition. " +
+"This error may be returned transiently when the partition 
is being created or deleted, but it is not " +
+"expected to persist.")

Review comment:
   Yeah, that should work. Otherwise, we could also make the method package 
private and add a few unit tests for it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-08 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r744875433



##
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##
@@ -305,9 +304,10 @@ class ReplicaFetcherThread(name: String,
 } else {
   val version: Short = if (fetchRequestVersion >= 13 && 
!fetchData.canUseTopicIds) 12 else fetchRequestVersion
   val requestBuilder = FetchRequest.Builder
-.forReplica(version, replicaId, maxWait, minBytes, fetchData.toSend, 
fetchData.topicIds)
+.forReplica(version, replicaId, maxWait, minBytes, fetchData.toSend)
 .setMaxBytes(maxBytes)
-.toForget(fetchData.toForget)
+.removed(fetchData.toForget)
+.replaced(fetchData.toReplace)

Review comment:
   `buildFetch` seems to be well isolated so it should be quite easy to 
write a few unit tests for it. `buildFetch` returns a `Builder` so you will 
have to build the request in order to inspect it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-06 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r744104240



##
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##
@@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() {
 List partitions = Arrays.asList(0, 1);
 partitions.forEach(partition -> {
 String testType = partition == 0 ? "updating a partition" : 
"adding a new partition";
-Map topicIds = Collections.singletonMap("foo", 
Uuid.randomUuid());
+Uuid fooId = Uuid.randomUuid();
 FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
 FetchSessionHandler.Builder builder = handler.newBuilder();
-builder.add(new TopicPartition("foo", 0),  topicIds.get("foo"),
-new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
+builder.add(new TopicPartition("foo", 0),
+new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
 FetchSessionHandler.FetchRequestData data = builder.build();
-assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
 data.toSend(), data.sessionPartitions());
 assertTrue(data.metadata().isFull());
 assertTrue(data.canUseTopicIds());
 
 FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
+respMap(new RespEntry("foo", 0, fooId, 10, 20)));
 handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
 // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
 FetchSessionHandler.Builder builder2 = handler.newBuilder();
-builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
-new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+builder2.add(new TopicPartition("foo", partition),
+new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 
210, Optional.empty()));
 FetchSessionHandler.FetchRequestData data2 = builder2.build();
-// Should have the same session ID and next epoch, but can no 
longer use topic IDs.
-// The receiving broker will close the session if we were 
previously using topic IDs.
+// Should have the same session ID, and next epoch and can no 
longer use topic IDs.
+// The receiving broker will handle closing the session.
 assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
 assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
 assertFalse(data2.canUseTopicIds());
 });
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testTopicIdReplaced(boolean fetchRequestUsesIds) {
+TopicPartition tp = new TopicPartition("foo", 0);
+FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+FetchSessionHandler.Builder builder = handler.newBuilder();
+Uuid topicId1 = Uuid.randomUuid();
+builder.add(tp,
+new FetchRequest.PartitionData(topicId1, 0, 100, 200, 
Optional.empty()));
+FetchSessionHandler.FetchRequestData data = builder.build();
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)),
+data.toSend(), data.sessionPartitions());
+assertTrue(data.metadata().isFull());
+assertTrue(data.canUseTopicIds());
+
+FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
+respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20)));
+handler.handleResponse(resp, (short) 12);
+
+// Try to add a new topic ID.
+FetchSessionHandler.Builder builder2 = handler.newBuilder();
+Uuid topicId2 = Uuid.randomUuid();
+// Use the same data besides the topic ID.
+FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty());
+builder2.add(tp, partitionData);
+FetchSessionHandler.FetchRequestData data2 = builder2.build();
+// The old topic ID partition should be in toReplace, and the new one 
should be in toSend.
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)),
+data2.toSend(), data2.sessionPartitions());
+assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, 
tp)), data2.toReplace());
+// Should have the same session ID, and next epoch and can use topic 
IDs.
+assertEquals(123, data2.metadata().sessionId(), "Did not use same 

[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-06 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r744103895



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -262,11 +262,12 @@ public synchronized int sendFetches() {
 maxVersion = ApiKeys.FETCH.latestVersion();
 }
 final FetchRequest.Builder request = FetchRequest.Builder
-.forConsumer(maxVersion, this.maxWaitMs, this.minBytes, 
data.toSend(), data.topicIds())
+.forConsumer(maxVersion, this.maxWaitMs, this.minBytes, 
data.toSend())
 .isolationLevel(isolationLevel)
 .setMaxBytes(this.maxBytes)
 .metadata(data.metadata())
-.toForget(data.toForget())
+.removed(data.toForget())
+.replaced(data.toReplace())

Review comment:
   We must be able to verify that the request sent out by this method is 
correct. In the unit tests, we mock the network client for this purpose. If I 
remember correctly, we can pass a request matcher to it. I need to look into 
the existing unit tests for this class to see how we have done it for other 
cases.
   
   We might already have tests verifying that the version of the fetch request 
sent out is correct based on wether topic ids are used or not. If we do, I 
suppose that we could proceed similarly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743574922



##
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##
@@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() {
 List partitions = Arrays.asList(0, 1);
 partitions.forEach(partition -> {
 String testType = partition == 0 ? "updating a partition" : 
"adding a new partition";
-Map topicIds = Collections.singletonMap("foo", 
Uuid.randomUuid());
+Uuid fooId = Uuid.randomUuid();
 FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
 FetchSessionHandler.Builder builder = handler.newBuilder();
-builder.add(new TopicPartition("foo", 0),  topicIds.get("foo"),
-new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
+builder.add(new TopicPartition("foo", 0),
+new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
 FetchSessionHandler.FetchRequestData data = builder.build();
-assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
 data.toSend(), data.sessionPartitions());
 assertTrue(data.metadata().isFull());
 assertTrue(data.canUseTopicIds());
 
 FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
+respMap(new RespEntry("foo", 0, fooId, 10, 20)));
 handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
 // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
 FetchSessionHandler.Builder builder2 = handler.newBuilder();
-builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
-new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+builder2.add(new TopicPartition("foo", partition),
+new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 
210, Optional.empty()));
 FetchSessionHandler.FetchRequestData data2 = builder2.build();
-// Should have the same session ID and next epoch, but can no 
longer use topic IDs.
-// The receiving broker will close the session if we were 
previously using topic IDs.
+// Should have the same session ID, and next epoch and can no 
longer use topic IDs.
+// The receiving broker will handle closing the session.
 assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
 assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
 assertFalse(data2.canUseTopicIds());
 });
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testTopicIdReplaced(boolean fetchRequestUsesIds) {
+TopicPartition tp = new TopicPartition("foo", 0);
+FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+FetchSessionHandler.Builder builder = handler.newBuilder();
+Uuid topicId1 = Uuid.randomUuid();
+builder.add(tp,
+new FetchRequest.PartitionData(topicId1, 0, 100, 200, 
Optional.empty()));
+FetchSessionHandler.FetchRequestData data = builder.build();
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)),
+data.toSend(), data.sessionPartitions());
+assertTrue(data.metadata().isFull());
+assertTrue(data.canUseTopicIds());
+
+FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
+respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20)));
+handler.handleResponse(resp, (short) 12);
+
+// Try to add a new topic ID.
+FetchSessionHandler.Builder builder2 = handler.newBuilder();
+Uuid topicId2 = Uuid.randomUuid();
+// Use the same data besides the topic ID.
+FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty());
+builder2.add(tp, partitionData);
+FetchSessionHandler.FetchRequestData data2 = builder2.build();
+// The old topic ID partition should be in toReplace, and the new one 
should be in toSend.
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)),
+data2.toSend(), data2.sessionPartitions());
+assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, 
tp)), data2.toReplace());
+// Should have the same session ID, and next epoch and can use topic 
IDs.
+assertEquals(123, data2.metadata().sessionId(), "Did not use same 

[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r742933244



##
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##
@@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() {
 List partitions = Arrays.asList(0, 1);
 partitions.forEach(partition -> {
 String testType = partition == 0 ? "updating a partition" : 
"adding a new partition";
-Map topicIds = Collections.singletonMap("foo", 
Uuid.randomUuid());
+Uuid fooId = Uuid.randomUuid();
 FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
 FetchSessionHandler.Builder builder = handler.newBuilder();
-builder.add(new TopicPartition("foo", 0),  topicIds.get("foo"),
-new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
+builder.add(new TopicPartition("foo", 0),
+new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
 FetchSessionHandler.FetchRequestData data = builder.build();
-assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
 data.toSend(), data.sessionPartitions());
 assertTrue(data.metadata().isFull());
 assertTrue(data.canUseTopicIds());
 
 FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
+respMap(new RespEntry("foo", 0, fooId, 10, 20)));
 handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
 // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
 FetchSessionHandler.Builder builder2 = handler.newBuilder();
-builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
-new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+builder2.add(new TopicPartition("foo", partition),
+new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 
210, Optional.empty()));
 FetchSessionHandler.FetchRequestData data2 = builder2.build();
-// Should have the same session ID and next epoch, but can no 
longer use topic IDs.
-// The receiving broker will close the session if we were 
previously using topic IDs.
+// Should have the same session ID, and next epoch and can no 
longer use topic IDs.
+// The receiving broker will handle closing the session.
 assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
 assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
 assertFalse(data2.canUseTopicIds());
 });
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testTopicIdReplaced(boolean fetchRequestUsesIds) {
+TopicPartition tp = new TopicPartition("foo", 0);
+FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+FetchSessionHandler.Builder builder = handler.newBuilder();
+Uuid topicId1 = Uuid.randomUuid();
+builder.add(tp,
+new FetchRequest.PartitionData(topicId1, 0, 100, 200, 
Optional.empty()));
+FetchSessionHandler.FetchRequestData data = builder.build();
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)),
+data.toSend(), data.sessionPartitions());
+assertTrue(data.metadata().isFull());
+assertTrue(data.canUseTopicIds());
+
+FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
+respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20)));
+handler.handleResponse(resp, (short) 12);
+
+// Try to add a new topic ID.
+FetchSessionHandler.Builder builder2 = handler.newBuilder();
+Uuid topicId2 = Uuid.randomUuid();
+// Use the same data besides the topic ID.
+FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty());
+builder2.add(tp, partitionData);
+FetchSessionHandler.FetchRequestData data2 = builder2.build();
+// The old topic ID partition should be in toReplace, and the new one 
should be in toSend.
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)),
+data2.toSend(), data2.sessionPartitions());
+assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, 
tp)), data2.toReplace());
+// Should have the same session ID, and next epoch and can use topic 
IDs.
+assertEquals(123, data2.metadata().sessionId(), "Did not use same 

[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743909554



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -93,27 +93,42 @@ class CachedPartition(val topic: String,
   def this(topic: String, partition: Int, topicId: Uuid) =
 this(topic, topicId, partition, -1, -1, -1, Optional.empty(), -1, -1, 
Optional.empty[Integer])
 
-  def this(part: TopicPartition, topicId: Uuid) =
-this(part.topic, part.partition, topicId)
+  def this(part: TopicIdPartition) = {
+this(part.topic, part.partition, part.topicId)
+  }
 
-  def this(part: TopicPartition, id: Uuid, reqData: 
FetchRequest.PartitionData) =
-this(part.topic, id, part.partition, reqData.maxBytes, 
reqData.fetchOffset, -1,
+  def this(part: TopicIdPartition, reqData: FetchRequest.PartitionData) =
+this(part.topic, part.topicId, part.partition, reqData.maxBytes, 
reqData.fetchOffset, -1,
   reqData.currentLeaderEpoch, reqData.logStartOffset, -1, 
reqData.lastFetchedEpoch)
 
-  def this(part: TopicPartition, id: Uuid, reqData: FetchRequest.PartitionData,
+  def this(part: TopicIdPartition, reqData: FetchRequest.PartitionData,
respData: FetchResponseData.PartitionData) =
-this(part.topic, id, part.partition, reqData.maxBytes, 
reqData.fetchOffset, respData.highWatermark,
+this(part.topic, part.topicId, part.partition, reqData.maxBytes, 
reqData.fetchOffset, respData.highWatermark,
   reqData.currentLeaderEpoch, reqData.logStartOffset, 
respData.logStartOffset, reqData.lastFetchedEpoch)
 
-  def reqData = new FetchRequest.PartitionData(fetchOffset, 
fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch)
+  def reqData = new FetchRequest.PartitionData(topicId, fetchOffset, 
fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch)
 
-  def updateRequestParams(reqData: FetchRequest.PartitionData): Unit = {
+  def maybeUpdateRequestParamsOrName(reqData: FetchRequest.PartitionData, 
name: String): Unit = {
 // Update our cached request parameters.
 maxBytes = reqData.maxBytes
 fetchOffset = reqData.fetchOffset
 fetcherLogStartOffset = reqData.logStartOffset
 leaderEpoch = reqData.currentLeaderEpoch
 lastFetchedEpoch = reqData.lastFetchedEpoch
+// Update name if needed
+maybeSetUnknownName(name)
+  }
+
+  def maybeSetUnknownName(name: String): Unit = {
+if (this.topic == null) {
+  this.topic = name
+}
+  }
+
+  def maybeResolveUnknownName(topicNames: FetchSession.TOPIC_NAME_MAP): Unit = 
{

Review comment:
   Yeah, I agree with you. Perhaps, we could just remove the 
maybeSetTopicName and move its logic into the update request params method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743908454



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -262,11 +262,12 @@ public synchronized int sendFetches() {
 maxVersion = ApiKeys.FETCH.latestVersion();
 }
 final FetchRequest.Builder request = FetchRequest.Builder
-.forConsumer(maxVersion, this.maxWaitMs, this.minBytes, 
data.toSend(), data.topicIds())
+.forConsumer(maxVersion, this.maxWaitMs, this.minBytes, 
data.toSend())
 .isolationLevel(isolationLevel)
 .setMaxBytes(this.maxBytes)
 .metadata(data.metadata())
-.toForget(data.toForget())
+.removed(data.toForget())
+.replaced(data.toReplace())

Review comment:
   Right. You might have to assert on the request in the fetcher as well. 
As you said, we can't really get the data out from the builder otherwise.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743836225



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -262,11 +262,12 @@ public synchronized int sendFetches() {
 maxVersion = ApiKeys.FETCH.latestVersion();
 }
 final FetchRequest.Builder request = FetchRequest.Builder
-.forConsumer(maxVersion, this.maxWaitMs, this.minBytes, 
data.toSend(), data.topicIds())
+.forConsumer(maxVersion, this.maxWaitMs, this.minBytes, 
data.toSend())
 .isolationLevel(isolationLevel)
 .setMaxBytes(this.maxBytes)
 .metadata(data.metadata())
-.toForget(data.toForget())
+.removed(data.toForget())
+.replaced(data.toReplace())

Review comment:
   We should have a test in the Fetcher which ensure that the builder 
received the correct information. Then we could have one for the request which 
ensure that the builder does its job correctly as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743764793



##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -285,52 +268,57 @@ public FetchRequestData build() {
 if (nextMetadata.isFull()) {
 if (log.isDebugEnabled()) {
 log.debug("Built full fetch {} for node {} with {}.",
-  nextMetadata, node, 
partitionsToLogString(next.keySet()));
+nextMetadata, node, 
topicPartitionsToLogString(next.keySet()));
 }
 sessionPartitions = next;
 next = null;
 // Only add topic IDs to the session if we are using topic IDs.
 if (canUseTopicIds) {
-sessionTopicIds = topicIds;
-sessionTopicNames = new HashMap<>(topicIds.size());
-topicIds.forEach((name, id) -> sessionTopicNames.put(id, 
name));
+Map> newTopicNames = 
sessionPartitions.entrySet().stream().collect(Collectors.groupingByConcurrent(entry
 -> entry.getValue().topicId,
+Collectors.mapping(entry -> 
entry.getKey().topic(), Collectors.toSet(;
+
+sessionTopicNames = new HashMap<>(newTopicNames.size());
+// There should only be one topic name per topic ID.
+newTopicNames.forEach((topicId, topicNamesSet) -> 
topicNamesSet.forEach(topicName -> sessionTopicNames.put(topicId, topicName)));
 } else {
-sessionTopicIds = new HashMap<>();
-sessionTopicNames = new HashMap<>();
+sessionTopicNames = Collections.emptyMap();
 }
-topicIds = null;
 Map toSend =
-Collections.unmodifiableMap(new 
LinkedHashMap<>(sessionPartitions));
-Map toSendTopicIds =
-Collections.unmodifiableMap(new 
HashMap<>(sessionTopicIds));
-Map toSendTopicNames =
-Collections.unmodifiableMap(new 
HashMap<>(sessionTopicNames));
-return new FetchRequestData(toSend, Collections.emptyList(), 
toSend, toSendTopicIds, toSendTopicNames, nextMetadata, canUseTopicIds);
+Collections.unmodifiableMap(new 
LinkedHashMap<>(sessionPartitions));
+return new FetchRequestData(toSend, Collections.emptyList(), 
Collections.emptyList(), toSend, nextMetadata, canUseTopicIds);
 }
 
-List added = new ArrayList<>();
-List removed = new ArrayList<>();
-List altered = new ArrayList<>();
+List added = new ArrayList<>();
+List removed = new ArrayList<>();
+List altered = new ArrayList<>();
+List replaced = new ArrayList<>();
 for (Iterator> iter =
- sessionPartitions.entrySet().iterator(); iter.hasNext(); 
) {
+ sessionPartitions.entrySet().iterator(); iter.hasNext(); ) {
 Entry entry = iter.next();
 TopicPartition topicPartition = entry.getKey();
 PartitionData prevData = entry.getValue();
 PartitionData nextData = next.remove(topicPartition);
 if (nextData != null) {
-if (!prevData.equals(nextData)) {
+// We basically check if the new partition had the same 
topic ID. If not,
+// we add it to the "replaced" set.
+if (!prevData.topicId.equals(nextData.topicId) && 
!prevData.topicId.equals(Uuid.ZERO_UUID)) {
+// Re-add the replaced partition to the end of 'next'
+next.put(topicPartition, nextData);
+entry.setValue(nextData);
+replaced.add(new TopicIdPartition(prevData.topicId, 
topicPartition));
+} else if (!prevData.equals(nextData)) {
 // Re-add the altered partition to the end of 'next'
 next.put(topicPartition, nextData);
 entry.setValue(nextData);
-altered.add(topicPartition);
+altered.add(new TopicIdPartition(nextData.topicId, 
topicPartition));
 }
 } else {
 // Remove this partition from the session.
 iter.remove();
 // Indicate that we no longer want to listen to this 
partition.
-removed.add(topicPartition);
+removed.add(new TopicIdPartition(prevData.topicId, 
topicPartition));
 // If we do not have this topic ID in the builder or the 
session, we can not use topic IDs.
-if 

[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743763229



##
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##
@@ -305,9 +304,10 @@ class ReplicaFetcherThread(name: String,
 } else {
   val version: Short = if (fetchRequestVersion >= 13 && 
!fetchData.canUseTopicIds) 12 else fetchRequestVersion
   val requestBuilder = FetchRequest.Builder
-.forReplica(version, replicaId, maxWait, minBytes, fetchData.toSend, 
fetchData.topicIds)
+.forReplica(version, replicaId, maxWait, minBytes, fetchData.toSend)
 .setMaxBytes(maxBytes)
-.toForget(fetchData.toForget)
+.removed(fetchData.toForget)
+.replaced(fetchData.toReplace)

Review comment:
   Do we have tests verifying this change?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743759128



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -93,27 +93,42 @@ class CachedPartition(val topic: String,
   def this(topic: String, partition: Int, topicId: Uuid) =
 this(topic, topicId, partition, -1, -1, -1, Optional.empty(), -1, -1, 
Optional.empty[Integer])
 
-  def this(part: TopicPartition, topicId: Uuid) =
-this(part.topic, part.partition, topicId)
+  def this(part: TopicIdPartition) = {
+this(part.topic, part.partition, part.topicId)
+  }
 
-  def this(part: TopicPartition, id: Uuid, reqData: 
FetchRequest.PartitionData) =
-this(part.topic, id, part.partition, reqData.maxBytes, 
reqData.fetchOffset, -1,
+  def this(part: TopicIdPartition, reqData: FetchRequest.PartitionData) =
+this(part.topic, part.topicId, part.partition, reqData.maxBytes, 
reqData.fetchOffset, -1,
   reqData.currentLeaderEpoch, reqData.logStartOffset, -1, 
reqData.lastFetchedEpoch)
 
-  def this(part: TopicPartition, id: Uuid, reqData: FetchRequest.PartitionData,
+  def this(part: TopicIdPartition, reqData: FetchRequest.PartitionData,
respData: FetchResponseData.PartitionData) =
-this(part.topic, id, part.partition, reqData.maxBytes, 
reqData.fetchOffset, respData.highWatermark,
+this(part.topic, part.topicId, part.partition, reqData.maxBytes, 
reqData.fetchOffset, respData.highWatermark,
   reqData.currentLeaderEpoch, reqData.logStartOffset, 
respData.logStartOffset, reqData.lastFetchedEpoch)
 
-  def reqData = new FetchRequest.PartitionData(fetchOffset, 
fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch)
+  def reqData = new FetchRequest.PartitionData(topicId, fetchOffset, 
fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch)
 
-  def updateRequestParams(reqData: FetchRequest.PartitionData): Unit = {
+  def maybeUpdateRequestParamsOrName(reqData: FetchRequest.PartitionData, 
name: String): Unit = {
 // Update our cached request parameters.
 maxBytes = reqData.maxBytes
 fetchOffset = reqData.fetchOffset
 fetcherLogStartOffset = reqData.logStartOffset
 leaderEpoch = reqData.currentLeaderEpoch
 lastFetchedEpoch = reqData.lastFetchedEpoch
+// Update name if needed
+maybeSetUnknownName(name)
+  }
+
+  def maybeSetUnknownName(name: String): Unit = {
+if (this.topic == null) {
+  this.topic = name
+}
+  }
+
+  def maybeResolveUnknownName(topicNames: FetchSession.TOPIC_NAME_MAP): Unit = 
{

Review comment:
   Should we use the same name for both `maybeSetUnknownName` and 
`maybeResolveUnknownName`? I guess that you could differ by their argument.
   
   If we add unit tests for other methods of this class, should we cover all 
the methods that we have changed or added as well?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743759128



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -93,27 +93,42 @@ class CachedPartition(val topic: String,
   def this(topic: String, partition: Int, topicId: Uuid) =
 this(topic, topicId, partition, -1, -1, -1, Optional.empty(), -1, -1, 
Optional.empty[Integer])
 
-  def this(part: TopicPartition, topicId: Uuid) =
-this(part.topic, part.partition, topicId)
+  def this(part: TopicIdPartition) = {
+this(part.topic, part.partition, part.topicId)
+  }
 
-  def this(part: TopicPartition, id: Uuid, reqData: 
FetchRequest.PartitionData) =
-this(part.topic, id, part.partition, reqData.maxBytes, 
reqData.fetchOffset, -1,
+  def this(part: TopicIdPartition, reqData: FetchRequest.PartitionData) =
+this(part.topic, part.topicId, part.partition, reqData.maxBytes, 
reqData.fetchOffset, -1,
   reqData.currentLeaderEpoch, reqData.logStartOffset, -1, 
reqData.lastFetchedEpoch)
 
-  def this(part: TopicPartition, id: Uuid, reqData: FetchRequest.PartitionData,
+  def this(part: TopicIdPartition, reqData: FetchRequest.PartitionData,
respData: FetchResponseData.PartitionData) =
-this(part.topic, id, part.partition, reqData.maxBytes, 
reqData.fetchOffset, respData.highWatermark,
+this(part.topic, part.topicId, part.partition, reqData.maxBytes, 
reqData.fetchOffset, respData.highWatermark,
   reqData.currentLeaderEpoch, reqData.logStartOffset, 
respData.logStartOffset, reqData.lastFetchedEpoch)
 
-  def reqData = new FetchRequest.PartitionData(fetchOffset, 
fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch)
+  def reqData = new FetchRequest.PartitionData(topicId, fetchOffset, 
fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch)
 
-  def updateRequestParams(reqData: FetchRequest.PartitionData): Unit = {
+  def maybeUpdateRequestParamsOrName(reqData: FetchRequest.PartitionData, 
name: String): Unit = {
 // Update our cached request parameters.
 maxBytes = reqData.maxBytes
 fetchOffset = reqData.fetchOffset
 fetcherLogStartOffset = reqData.logStartOffset
 leaderEpoch = reqData.currentLeaderEpoch
 lastFetchedEpoch = reqData.lastFetchedEpoch
+// Update name if needed
+maybeSetUnknownName(name)
+  }
+
+  def maybeSetUnknownName(name: String): Unit = {
+if (this.topic == null) {
+  this.topic = name
+}
+  }
+
+  def maybeResolveUnknownName(topicNames: FetchSession.TOPIC_NAME_MAP): Unit = 
{

Review comment:
   Should we use the same name for both `maybeSetUnknownName` and 
`maybeResolveUnknownName`? I guess that you could differ by their argument.
   
   If we add unit tests for other methods of this class, should we cover them 
as well?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743759128



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -93,27 +93,42 @@ class CachedPartition(val topic: String,
   def this(topic: String, partition: Int, topicId: Uuid) =
 this(topic, topicId, partition, -1, -1, -1, Optional.empty(), -1, -1, 
Optional.empty[Integer])
 
-  def this(part: TopicPartition, topicId: Uuid) =
-this(part.topic, part.partition, topicId)
+  def this(part: TopicIdPartition) = {
+this(part.topic, part.partition, part.topicId)
+  }
 
-  def this(part: TopicPartition, id: Uuid, reqData: 
FetchRequest.PartitionData) =
-this(part.topic, id, part.partition, reqData.maxBytes, 
reqData.fetchOffset, -1,
+  def this(part: TopicIdPartition, reqData: FetchRequest.PartitionData) =
+this(part.topic, part.topicId, part.partition, reqData.maxBytes, 
reqData.fetchOffset, -1,
   reqData.currentLeaderEpoch, reqData.logStartOffset, -1, 
reqData.lastFetchedEpoch)
 
-  def this(part: TopicPartition, id: Uuid, reqData: FetchRequest.PartitionData,
+  def this(part: TopicIdPartition, reqData: FetchRequest.PartitionData,
respData: FetchResponseData.PartitionData) =
-this(part.topic, id, part.partition, reqData.maxBytes, 
reqData.fetchOffset, respData.highWatermark,
+this(part.topic, part.topicId, part.partition, reqData.maxBytes, 
reqData.fetchOffset, respData.highWatermark,
   reqData.currentLeaderEpoch, reqData.logStartOffset, 
respData.logStartOffset, reqData.lastFetchedEpoch)
 
-  def reqData = new FetchRequest.PartitionData(fetchOffset, 
fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch)
+  def reqData = new FetchRequest.PartitionData(topicId, fetchOffset, 
fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch)
 
-  def updateRequestParams(reqData: FetchRequest.PartitionData): Unit = {
+  def maybeUpdateRequestParamsOrName(reqData: FetchRequest.PartitionData, 
name: String): Unit = {
 // Update our cached request parameters.
 maxBytes = reqData.maxBytes
 fetchOffset = reqData.fetchOffset
 fetcherLogStartOffset = reqData.logStartOffset
 leaderEpoch = reqData.currentLeaderEpoch
 lastFetchedEpoch = reqData.lastFetchedEpoch
+// Update name if needed
+maybeSetUnknownName(name)
+  }
+
+  def maybeSetUnknownName(name: String): Unit = {
+if (this.topic == null) {
+  this.topic = name
+}
+  }
+
+  def maybeResolveUnknownName(topicNames: FetchSession.TOPIC_NAME_MAP): Unit = 
{

Review comment:
   nit: Should we use the same name for both `maybeSetUnknownName` and 
`maybeResolveUnknownName`? I guess that you could differ by their argument.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743756690



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -163,18 +178,37 @@ class CachedPartition(val topic: String,
 mustRespond
   }
 
-  override def hashCode: Int = Objects.hash(new TopicPartition(topic, 
partition), topicId)
+  /**
+   * We have different equality checks depending on whether topic IDs are used.
+   * This means we need a different hash function as well. We use name to 
calculate the hash if the ID is zero and unused.
+   * Otherwise, we use the topic ID in the hash calculation.
+   *
+   * @return the hash code for the CachedPartition depending on what request 
version we are using.
+   */
+  override def hashCode: Int =
+if (topicId != Uuid.ZERO_UUID)
+  (31 * partition) + topicId.hashCode
+else
+  (31 * partition) + topic.hashCode
 
   def canEqual(that: Any): Boolean = that.isInstanceOf[CachedPartition]

Review comment:
   I guess that we could remove it now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743755936



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -413,8 +413,20 @@ abstract class AbstractFetcherThread(name: String,
 
 case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
   warn(s"Received ${Errors.UNKNOWN_TOPIC_OR_PARTITION} from 
the leader for partition $topicPartition. " +
-   "This error may be returned transiently when the 
partition is being created or deleted, but it is not " +
-   "expected to persist.")
+"This error may be returned transiently when the partition 
is being created or deleted, but it is not " +
+"expected to persist.")
+  partitionsWithError += topicPartition
+
+case Errors.UNKNOWN_TOPIC_ID =>
+  warn(s"Received ${Errors.UNKNOWN_TOPIC_ID} from the leader 
for partition $topicPartition. " +
+"This error may be returned transiently when the partition 
is being created or deleted, but it is not " +
+"expected to persist.")
+  partitionsWithError += topicPartition
+
+case Errors.INCONSISTENT_TOPIC_ID =>
+  warn(s"Received ${Errors.INCONSISTENT_TOPIC_ID} from the 
leader for partition $topicPartition. " +
+"This error may be returned transiently when the partition 
is being created or deleted, but it is not " +
+"expected to persist.")

Review comment:
   Do we have unit tests covering those cases? There are almost no changes 
in `AbstractFetcherThreadTest` so it seems that we don't. Are they somewhere 
else perhaps?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743753874



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -374,7 +374,7 @@ abstract class AbstractFetcherThread(name: String,
   }
 }
   } catch {
-case ime@( _: CorruptRecordException | _: 
InvalidRecordException) =>
+case ime@(_: CorruptRecordException | _: 
InvalidRecordException) =>

Review comment:
   Anyway, we don't need to address this in this PR. I just wanted to point 
out that there is an opportunity for an improvement.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743753319



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -374,7 +374,7 @@ abstract class AbstractFetcherThread(name: String,
   }
 }
   } catch {
-case ime@( _: CorruptRecordException | _: 
InvalidRecordException) =>
+case ime@(_: CorruptRecordException | _: 
InvalidRecordException) =>

Review comment:
   Sorry, I wanted to say happen.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743044369



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -374,7 +374,7 @@ abstract class AbstractFetcherThread(name: String,
   }
 }
   } catch {
-case ime@( _: CorruptRecordException | _: 
InvalidRecordException) =>
+case ime@(_: CorruptRecordException | _: 
InvalidRecordException) =>

Review comment:
   I think that would for instance happen when the controller fails over to 
an older IBP during an upgrade. This should remove the topic ids which means 
that v12 will be used for the next fetch request and trigger a 
FETCH_SESSION_TOPIC_ID_ERROR. In this particular case, re-trying directly would 
be the optimal way to proceed for a follower. I wonder if they are other cases 
to consider here.
   
   For the consumer, it is definitely different.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743751246



##
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##
@@ -242,65 +244,68 @@ public void testIncrementals() {
 FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
 FetchSessionHandler.Builder builder = handler.newBuilder();
 addTopicId(topicIds, topicNames, "foo", version);
-builder.add(new TopicPartition("foo", 0), 
topicIds.getOrDefault("foo", Uuid.ZERO_UUID),
-new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
-builder.add(new TopicPartition("foo", 1), 
topicIds.getOrDefault("foo", Uuid.ZERO_UUID),
-new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+Uuid fooId = topicIds.getOrDefault("foo", Uuid.ZERO_UUID);
+TopicPartition foo0 = new TopicPartition("foo", 0);
+TopicPartition foo1 = new TopicPartition("foo", 1);
+builder.add(foo0, new FetchRequest.PartitionData(fooId, 0, 100, 
200, Optional.empty()));
+builder.add(foo1, new FetchRequest.PartitionData(fooId, 10, 110, 
210, Optional.empty()));
 FetchSessionHandler.FetchRequestData data = builder.build();
-assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200),
-new ReqEntry("foo", 1, 10, 110, 210)),
+assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200),
+new ReqEntry("foo", fooId, 1, 10, 110, 210)),
 data.toSend(), data.sessionPartitions());
 assertEquals(INVALID_SESSION_ID, data.metadata().sessionId());
 assertEquals(INITIAL_EPOCH, data.metadata().epoch());
 
 FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-respMap(new RespEntry("foo", 0, 10, 20),
-new RespEntry("foo", 1, 10, 20)), topicIds);
+respMap(new RespEntry("foo", 0, fooId, 10, 20),
+new RespEntry("foo", 1, fooId, 10, 20)));
 handler.handleResponse(resp, version);
 
 // Test an incremental fetch request which adds one partition and 
modifies another.
 FetchSessionHandler.Builder builder2 = handler.newBuilder();
 addTopicId(topicIds, topicNames, "bar", version);
-builder2.add(new TopicPartition("foo", 0), 
topicIds.getOrDefault("foo", Uuid.ZERO_UUID),
-new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
-builder2.add(new TopicPartition("foo", 1), 
topicIds.getOrDefault("foo", Uuid.ZERO_UUID),
-new FetchRequest.PartitionData(10, 120, 210, 
Optional.empty()));
-builder2.add(new TopicPartition("bar", 0), 
topicIds.getOrDefault("bar", Uuid.ZERO_UUID),
-new FetchRequest.PartitionData(20, 200, 200, 
Optional.empty()));
+Uuid barId = topicIds.getOrDefault("bar", Uuid.ZERO_UUID);
+TopicPartition bar0 = new TopicPartition("bar", 0);
+builder2.add(foo0,
+new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));

Review comment:
   There are a few more cases where we could put the partition data back on 
the previous line in this file.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743750508



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##
@@ -314,8 +356,7 @@ public int maxBytes() {
 
 // For versions < 13, builds the partitionData map using only the 
FetchRequestData.
 // For versions 13+, builds the partitionData map using both the 
FetchRequestData and a mapping of topic IDs to names.
-// Throws UnknownTopicIdException for versions 13+ if the topic ID was 
unknown to the server.
-public Map fetchData(Map 
topicNames) throws UnknownTopicIdException {
+public Map fetchData(Map 
topicNames) throws UnknownTopicIdException {

Review comment:
   Do we have a unit test for this one and for `forgottenTopics`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743749915



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##
@@ -199,26 +235,31 @@ public FetchRequest build(short version) {
 fetchRequestData.setMaxBytes(maxBytes);
 fetchRequestData.setIsolationLevel(isolationLevel.id());
 fetchRequestData.setForgottenTopicsData(new ArrayList<>());
-toForget.stream()
-.collect(Collectors.groupingBy(TopicPartition::topic, 
LinkedHashMap::new, Collectors.toList()))
-.forEach((topic, partitions) ->
-fetchRequestData.forgottenTopicsData().add(new 
FetchRequestData.ForgottenTopic()
-.setTopic(topic)
-.setTopicId(topicIds.getOrDefault(topic, 
Uuid.ZERO_UUID))
-
.setPartitions(partitions.stream().map(TopicPartition::partition).collect(Collectors.toList(
-);
-fetchRequestData.setTopics(new ArrayList<>());
+
+Map forgottenTopicMap = 
new LinkedHashMap<>();
+addToForgottenTopicMap(removed, forgottenTopicMap);
+
+// If a version older than v13 is used, topic-partition which were 
replaced
+// by a topic-partition with the same name but a different topic 
ID are not
+// sent out in the "forget" set in order to not remove the newly 
added
+// partition in the "fetch" set.
+if (version >= 13) {
+addToForgottenTopicMap(replaced, forgottenTopicMap);
+}

Review comment:
   Should we add a few unit tests to validate the changes that we have done 
in this class? We could add a few to FetchRequestTest (not use if it already 
exists though).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743745314



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -262,11 +262,12 @@ public synchronized int sendFetches() {
 maxVersion = ApiKeys.FETCH.latestVersion();
 }
 final FetchRequest.Builder request = FetchRequest.Builder
-.forConsumer(maxVersion, this.maxWaitMs, this.minBytes, 
data.toSend(), data.topicIds())
+.forConsumer(maxVersion, this.maxWaitMs, this.minBytes, 
data.toSend())
 .isolationLevel(isolationLevel)
 .setMaxBytes(this.maxBytes)
 .metadata(data.metadata())
-.toForget(data.toForget())
+.removed(data.toForget())
+.replaced(data.toReplace())

Review comment:
   Should we add or extend a test in `FetcherTest` to cover this change? I 
would like to have one which ensure that the request sent is populated 
correctly (especially the replaced part) by the fetcher based on the session 
handler. It seems that we don't have such test in the suite at the moment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743578530



##
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##
@@ -3530,37 +3534,37 @@ class KafkaApisTest {
   def testSizeOfThrottledPartitions(): Unit = {
 val topicNames = new util.HashMap[Uuid, String]
 val topicIds = new util.HashMap[String, Uuid]()
-def fetchResponse(data: Map[TopicPartition, String]): FetchResponse = {
-  val responseData = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData](
+def fetchResponse(data: Map[TopicIdPartition, String]): FetchResponse = {
+  val responseData = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData](
 data.map { case (tp, raw) =>
   tp -> new FetchResponseData.PartitionData()
-.setPartitionIndex(tp.partition)
+.setPartitionIndex(tp.topicPartition.partition)
 .setHighWatermark(105)
 .setLastStableOffset(105)
 .setLogStartOffset(0)
 .setRecords(MemoryRecords.withRecords(CompressionType.NONE, new 
SimpleRecord(100, raw.getBytes(StandardCharsets.UTF_8
   }.toMap.asJava)
 
   data.foreach{case (tp, _) =>
-val id = Uuid.randomUuid()
-topicIds.put(tp.topic(), id)
-topicNames.put(id, tp.topic())
+topicIds.put(tp.topicPartition.topic, tp.topicId)
+topicNames.put(tp.topicId, tp.topicPartition.topic)
   }
-  FetchResponse.of(Errors.NONE, 100, 100, responseData, topicIds)
+  FetchResponse.of(Errors.NONE, 100, 100, responseData)
 }
 
-val throttledPartition = new TopicPartition("throttledData", 0)
+val throttledPartition = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("throttledData", 0))
 val throttledData = Map(throttledPartition -> "throttledData")
 val expectedSize = 
FetchResponse.sizeOf(FetchResponseData.HIGHEST_SUPPORTED_VERSION,
-  fetchResponse(throttledData).responseData(topicNames, 
FetchResponseData.HIGHEST_SUPPORTED_VERSION).entrySet.iterator, topicIds)
+  fetchResponse(throttledData).responseData(topicNames, 
FetchResponseData.HIGHEST_SUPPORTED_VERSION).entrySet.asScala.map( entry =>
+  (new TopicIdPartition(Uuid.ZERO_UUID, entry.getKey), 
entry.getValue)).toMap.asJava.entrySet.iterator)
 
-val response = fetchResponse(throttledData ++ Map(new 
TopicPartition("nonThrottledData", 0) -> "nonThrottledData"))
+val response = fetchResponse(throttledData ++ Map(new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("nonThrottledData", 0)) 
-> "nonThrottledData"))
 
 val quota = Mockito.mock(classOf[ReplicationQuotaManager])
 
Mockito.when(quota.isThrottled(ArgumentMatchers.any(classOf[TopicPartition])))
-  .thenAnswer(invocation => throttledPartition == 
invocation.getArgument(0).asInstanceOf[TopicPartition])
+  .thenAnswer(invocation => throttledPartition.topicPartition == 
invocation.getArgument(0).asInstanceOf[TopicPartition])
 
-assertEquals(expectedSize, 
KafkaApis.sizeOfThrottledPartitions(FetchResponseData.HIGHEST_SUPPORTED_VERSION,
 response, quota, topicIds))
+assertEquals(expectedSize, 
KafkaApis.sizeOfThrottledPartitions(FetchResponseData.HIGHEST_SUPPORTED_VERSION,
 response, quota))
   }
 
   @Test

Review comment:
   That is right.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743578382



##
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##
@@ -659,88 +670,108 @@ class FetchSessionTest {
   }
 
   @Test
-  def testIncrementalFetchSessionWithIdsWhenSessionDoesNotUseIds() : Unit = {
+  def testFetchSessionWithUnknownId(): Unit = {
 val time = new MockTime()
 val cache = new FetchSessionCache(10, 1000)
 val fetchManager = new FetchManager(time, cache)
-val topicIds = new util.HashMap[String, Uuid]()
-val topicNames = new util.HashMap[Uuid, String]()
+val topicNames = Map(Uuid.randomUuid() -> "foo", Uuid.randomUuid() -> 
"bar").asJava

Review comment:
   You could use `assertPartitionsOrder` helper here as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743578252



##
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##
@@ -659,88 +670,108 @@ class FetchSessionTest {
   }
 
   @Test
-  def testIncrementalFetchSessionWithIdsWhenSessionDoesNotUseIds() : Unit = {
+  def testFetchSessionWithUnknownId(): Unit = {
 val time = new MockTime()
 val cache = new FetchSessionCache(10, 1000)
 val fetchManager = new FetchManager(time, cache)
-val topicIds = new util.HashMap[String, Uuid]()
-val topicNames = new util.HashMap[Uuid, String]()
+val topicNames = Map(Uuid.randomUuid() -> "foo", Uuid.randomUuid() -> 
"bar").asJava
+val topicIds = topicNames.asScala.map(_.swap).asJava
+val foo0 = new TopicIdPartition(topicIds.getOrDefault("foo", 
Uuid.ZERO_UUID), new TopicPartition("foo", 0))
+val foo1 = new TopicIdPartition(topicIds.getOrDefault("foo", 
Uuid.ZERO_UUID), new TopicPartition("foo", 1))
+val emptyFoo0 = new TopicIdPartition(topicIds.getOrDefault("foo", 
Uuid.ZERO_UUID), new TopicPartition(null, 0))
+val emptyFoo1 = new TopicIdPartition(topicIds.getOrDefault("foo", 
Uuid.ZERO_UUID), new TopicPartition(null, 1))
 
-// Create a new fetch session with foo-0
+// Create a new fetch session with foo-0 and foo-1
 val reqData1 = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
-reqData1.put(new TopicPartition("foo", 0), new 
FetchRequest.PartitionData(0, 0, 100,
+reqData1.put(foo0.topicPartition, new 
FetchRequest.PartitionData(foo0.topicId, 0, 0, 100,
   Optional.empty()))
-val request1 = createRequestWithoutTopicIds(JFetchMetadata.INITIAL, 
reqData1, topicIds, EMPTY_PART_LIST, false)
-// Start a fetch session using a request version that does not use topic 
IDs.
+reqData1.put(foo1.topicPartition, new 
FetchRequest.PartitionData(foo1.topicId,10, 0, 100,
+  Optional.empty()))
+val request1 = createRequest(JFetchMetadata.INITIAL, reqData1, 
EMPTY_PART_LIST, false)
+// Simulate unknown topic ID for foo.
+val topicNamesOnlyBar = Collections.singletonMap(topicIds.get("bar"), 
"bar")
+// We should not throw error since we have an older request version.
 val context1 = fetchManager.newContext(
   request1.version,
   request1.metadata,
   request1.isFromFollower,
-  request1.fetchData(topicNames),
-  request1.forgottenTopics(topicNames),
-  topicIds
+  request1.fetchData(topicNamesOnlyBar),
+  request1.forgottenTopics(topicNamesOnlyBar),
+  topicNamesOnlyBar
 )
 assertEquals(classOf[FullFetchContext], context1.getClass)
-val respData1 = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
-respData1.put(new TopicPartition("foo", 0), new 
FetchResponseData.PartitionData()
+val respData1 = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData]

Review comment:
   Yeah, I meant exactly that. How about using `assertPartitionsOrder` 
helper? The assertion would be more complete.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743577292



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
##
@@ -4814,6 +4842,7 @@ private void buildDependencies(MetricConfig metricConfig,
 metadata = new ConsumerMetadata(0, metadataExpireMs, false, false,
 subscriptions, logContext, new ClusterResourceListeners());
 client = new MockClient(time, metadata);
+client.setNodeApiVersions(NodeApiVersions.create());
 metrics = new Metrics(metricConfig, time);
 consumerClient = new ConsumerNetworkClient(logContext, client, 
metadata, time,
 100, 1000, Integer.MAX_VALUE);

Review comment:
   Yes, I was referring to those. Ack, I missed them during my first read.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743576928



##
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##
@@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() {
 List partitions = Arrays.asList(0, 1);
 partitions.forEach(partition -> {
 String testType = partition == 0 ? "updating a partition" : 
"adding a new partition";
-Map topicIds = Collections.singletonMap("foo", 
Uuid.randomUuid());
+Uuid fooId = Uuid.randomUuid();
 FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
 FetchSessionHandler.Builder builder = handler.newBuilder();
-builder.add(new TopicPartition("foo", 0),  topicIds.get("foo"),
-new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
+builder.add(new TopicPartition("foo", 0),
+new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
 FetchSessionHandler.FetchRequestData data = builder.build();
-assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
 data.toSend(), data.sessionPartitions());
 assertTrue(data.metadata().isFull());
 assertTrue(data.canUseTopicIds());
 
 FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
+respMap(new RespEntry("foo", 0, fooId, 10, 20)));
 handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
 // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
 FetchSessionHandler.Builder builder2 = handler.newBuilder();
-builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
-new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+builder2.add(new TopicPartition("foo", partition),
+new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 
210, Optional.empty()));
 FetchSessionHandler.FetchRequestData data2 = builder2.build();
-// Should have the same session ID and next epoch, but can no 
longer use topic IDs.
-// The receiving broker will close the session if we were 
previously using topic IDs.
+// Should have the same session ID, and next epoch and can no 
longer use topic IDs.
+// The receiving broker will handle closing the session.
 assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
 assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
 assertFalse(data2.canUseTopicIds());
 });
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testTopicIdReplaced(boolean fetchRequestUsesIds) {
+TopicPartition tp = new TopicPartition("foo", 0);
+FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+FetchSessionHandler.Builder builder = handler.newBuilder();
+Uuid topicId1 = Uuid.randomUuid();
+builder.add(tp,
+new FetchRequest.PartitionData(topicId1, 0, 100, 200, 
Optional.empty()));
+FetchSessionHandler.FetchRequestData data = builder.build();
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)),
+data.toSend(), data.sessionPartitions());
+assertTrue(data.metadata().isFull());
+assertTrue(data.canUseTopicIds());
+
+FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
+respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20)));
+handler.handleResponse(resp, (short) 12);
+
+// Try to add a new topic ID.
+FetchSessionHandler.Builder builder2 = handler.newBuilder();
+Uuid topicId2 = Uuid.randomUuid();
+// Use the same data besides the topic ID.
+FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty());
+builder2.add(tp, partitionData);
+FetchSessionHandler.FetchRequestData data2 = builder2.build();
+// The old topic ID partition should be in toReplace, and the new one 
should be in toSend.
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)),
+data2.toSend(), data2.sessionPartitions());
+assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, 
tp)), data2.toReplace());
+// Should have the same session ID, and next epoch and can use topic 
IDs.
+assertEquals(123, data2.metadata().sessionId(), "Did not use same 

[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743575635



##
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##
@@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() {
 List partitions = Arrays.asList(0, 1);
 partitions.forEach(partition -> {
 String testType = partition == 0 ? "updating a partition" : 
"adding a new partition";
-Map topicIds = Collections.singletonMap("foo", 
Uuid.randomUuid());
+Uuid fooId = Uuid.randomUuid();
 FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
 FetchSessionHandler.Builder builder = handler.newBuilder();
-builder.add(new TopicPartition("foo", 0),  topicIds.get("foo"),
-new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
+builder.add(new TopicPartition("foo", 0),
+new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
 FetchSessionHandler.FetchRequestData data = builder.build();
-assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
 data.toSend(), data.sessionPartitions());
 assertTrue(data.metadata().isFull());
 assertTrue(data.canUseTopicIds());
 
 FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
+respMap(new RespEntry("foo", 0, fooId, 10, 20)));
 handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
 // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
 FetchSessionHandler.Builder builder2 = handler.newBuilder();
-builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
-new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+builder2.add(new TopicPartition("foo", partition),
+new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 
210, Optional.empty()));
 FetchSessionHandler.FetchRequestData data2 = builder2.build();
-// Should have the same session ID and next epoch, but can no 
longer use topic IDs.
-// The receiving broker will close the session if we were 
previously using topic IDs.
+// Should have the same session ID, and next epoch and can no 
longer use topic IDs.
+// The receiving broker will handle closing the session.
 assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
 assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
 assertFalse(data2.canUseTopicIds());
 });
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testTopicIdReplaced(boolean fetchRequestUsesIds) {
+TopicPartition tp = new TopicPartition("foo", 0);
+FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+FetchSessionHandler.Builder builder = handler.newBuilder();
+Uuid topicId1 = Uuid.randomUuid();
+builder.add(tp,
+new FetchRequest.PartitionData(topicId1, 0, 100, 200, 
Optional.empty()));
+FetchSessionHandler.FetchRequestData data = builder.build();
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)),
+data.toSend(), data.sessionPartitions());
+assertTrue(data.metadata().isFull());
+assertTrue(data.canUseTopicIds());
+
+FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
+respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20)));
+handler.handleResponse(resp, (short) 12);
+
+// Try to add a new topic ID.
+FetchSessionHandler.Builder builder2 = handler.newBuilder();
+Uuid topicId2 = Uuid.randomUuid();
+// Use the same data besides the topic ID.
+FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty());
+builder2.add(tp, partitionData);
+FetchSessionHandler.FetchRequestData data2 = builder2.build();
+// The old topic ID partition should be in toReplace, and the new one 
should be in toSend.
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)),
+data2.toSend(), data2.sessionPartitions());
+assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, 
tp)), data2.toReplace());

Review comment:
   Correct. I was referring to the upgrade case. We might need to handle 
the downgrade case for 

[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743574922



##
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##
@@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() {
 List partitions = Arrays.asList(0, 1);
 partitions.forEach(partition -> {
 String testType = partition == 0 ? "updating a partition" : 
"adding a new partition";
-Map topicIds = Collections.singletonMap("foo", 
Uuid.randomUuid());
+Uuid fooId = Uuid.randomUuid();
 FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
 FetchSessionHandler.Builder builder = handler.newBuilder();
-builder.add(new TopicPartition("foo", 0),  topicIds.get("foo"),
-new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
+builder.add(new TopicPartition("foo", 0),
+new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
 FetchSessionHandler.FetchRequestData data = builder.build();
-assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
 data.toSend(), data.sessionPartitions());
 assertTrue(data.metadata().isFull());
 assertTrue(data.canUseTopicIds());
 
 FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
+respMap(new RespEntry("foo", 0, fooId, 10, 20)));
 handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
 // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
 FetchSessionHandler.Builder builder2 = handler.newBuilder();
-builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
-new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+builder2.add(new TopicPartition("foo", partition),
+new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 
210, Optional.empty()));
 FetchSessionHandler.FetchRequestData data2 = builder2.build();
-// Should have the same session ID and next epoch, but can no 
longer use topic IDs.
-// The receiving broker will close the session if we were 
previously using topic IDs.
+// Should have the same session ID, and next epoch and can no 
longer use topic IDs.
+// The receiving broker will handle closing the session.
 assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
 assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
 assertFalse(data2.canUseTopicIds());
 });
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testTopicIdReplaced(boolean fetchRequestUsesIds) {
+TopicPartition tp = new TopicPartition("foo", 0);
+FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+FetchSessionHandler.Builder builder = handler.newBuilder();
+Uuid topicId1 = Uuid.randomUuid();
+builder.add(tp,
+new FetchRequest.PartitionData(topicId1, 0, 100, 200, 
Optional.empty()));
+FetchSessionHandler.FetchRequestData data = builder.build();
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)),
+data.toSend(), data.sessionPartitions());
+assertTrue(data.metadata().isFull());
+assertTrue(data.canUseTopicIds());
+
+FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
+respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20)));
+handler.handleResponse(resp, (short) 12);
+
+// Try to add a new topic ID.
+FetchSessionHandler.Builder builder2 = handler.newBuilder();
+Uuid topicId2 = Uuid.randomUuid();
+// Use the same data besides the topic ID.
+FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty());
+builder2.add(tp, partitionData);
+FetchSessionHandler.FetchRequestData data2 = builder2.build();
+// The old topic ID partition should be in toReplace, and the new one 
should be in toSend.
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)),
+data2.toSend(), data2.sessionPartitions());
+assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, 
tp)), data2.toReplace());
+// Should have the same session ID, and next epoch and can use topic 
IDs.
+assertEquals(123, data2.metadata().sessionId(), "Did not use same 

[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743046927



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##
@@ -66,16 +69,28 @@ public PartitionData(
 int maxBytes,
 Optional currentLeaderEpoch
 ) {
-this(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch, 
Optional.empty());
+this(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, 
currentLeaderEpoch, Optional.empty());

Review comment:
   Yeah, that's a good question. I guess that that constructor is 
convenient for tests but might be bug prone in the regular code. I am tempted 
to remove it entirely What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743046003



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -708,40 +701,41 @@ class KafkaApis(val requestChannel: RequestChannel,
   None
 }
 
-val erroneous = mutable.ArrayBuffer[(TopicPartition, 
FetchResponseData.PartitionData)]()
-val interesting = mutable.ArrayBuffer[(TopicPartition, 
FetchRequest.PartitionData)]()
-val sessionTopicIds = mutable.Map[String, Uuid]()
+val erroneous = mutable.ArrayBuffer[(TopicIdPartition, 
FetchResponseData.PartitionData)]()
+val interesting = mutable.ArrayBuffer[(TopicIdPartition, 
FetchRequest.PartitionData)]()
 if (fetchRequest.isFromFollower) {
   // The follower must have ClusterAction on ClusterResource in order to 
fetch partition data.
   if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, 
CLUSTER_NAME)) {
-fetchContext.foreachPartition { (topicPartition, topicId, data) =>
-  sessionTopicIds.put(topicPartition.topic(), topicId)
-  if (!metadataCache.contains(topicPartition))
-erroneous += topicPartition -> 
FetchResponse.partitionResponse(topicPartition.partition, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+fetchContext.foreachPartition { (topicIdPartition, data) =>
+  if (topicIdPartition.topicPartition.topic == null )
+erroneous += topicIdPartition -> 
FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)
+  else if (!metadataCache.contains(topicIdPartition.topicPartition))
+erroneous += topicIdPartition -> 
FetchResponse.partitionResponse(topicIdPartition, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
   else
-interesting += (topicPartition -> data)
+interesting += (topicIdPartition -> data)
 }
   } else {
-fetchContext.foreachPartition { (part, topicId, _) =>
-  sessionTopicIds.put(part.topic(), topicId)
-  erroneous += part -> FetchResponse.partitionResponse(part.partition, 
Errors.TOPIC_AUTHORIZATION_FAILED)
+fetchContext.foreachPartition { (topicIdPartition, _) =>
+  erroneous += topicIdPartition -> 
FetchResponse.partitionResponse(topicIdPartition, 
Errors.TOPIC_AUTHORIZATION_FAILED)

Review comment:
   I guess that it does not change much in the end. I was considering this 
in order to be consistent with how we handle this for the consumer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743044970



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -163,18 +173,35 @@ class CachedPartition(val topic: String,
 mustRespond
   }
 
-  override def hashCode: Int = Objects.hash(new TopicPartition(topic, 
partition), topicId)
+  /**
+   * We have different equality checks depending on whether topic IDs are used.
+   * This means we need a different hash function as well. We use name to 
calculate the hash if the ID is zero and unused.
+   * Otherwise, we use the topic ID in the hash calculation.
+   *
+   * @return the hash code for the CachedPartition depending on what request 
version we are using.
+   */
+  override def hashCode: Int = if (topicId != Uuid.ZERO_UUID) (31 * partition) 
+ topicId.hashCode else
+(31 * partition) + topic.hashCode
 
   def canEqual(that: Any): Boolean = that.isInstanceOf[CachedPartition]
 
+  /**
+   * We have different equality checks depending on whether topic IDs are used.
+   *
+   * This is because when we use topic IDs, a partition with a given ID and an 
unknown name is the same as a partition with that
+   * ID and a known name. This means we can only use topic ID and partition 
when determining equality.
+   *
+   * On the other hand, if we are using topic names, all IDs are zero. This 
means we can only use topic name and partition
+   * when determining equality.
+   */
   override def equals(that: Any): Boolean =
 that match {
   case that: CachedPartition =>
 this.eq(that) ||
   (that.canEqual(this) &&

Review comment:
   Right. It seems to be that the `canEqual(this)` does not make any sense 
here. Could you double check?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743044369



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -374,7 +374,7 @@ abstract class AbstractFetcherThread(name: String,
   }
 }
   } catch {
-case ime@( _: CorruptRecordException | _: 
InvalidRecordException) =>
+case ime@(_: CorruptRecordException | _: 
InvalidRecordException) =>

Review comment:
   I think that would for instance append when the controller fails over to 
an older IBP during an upgrade. This should remove the topic ids which means 
that v12 will be used for the next fetch request and trigger a 
FETCH_SESSION_TOPIC_ID_ERROR. In this particular case, re-trying directly would 
be the optimal way to proceed for a follower. I wonder if they are other cases 
to consider here.
   
   For the consumer, it is definitely different.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743041695



##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -285,52 +268,57 @@ public FetchRequestData build() {
 if (nextMetadata.isFull()) {
 if (log.isDebugEnabled()) {
 log.debug("Built full fetch {} for node {} with {}.",
-  nextMetadata, node, 
partitionsToLogString(next.keySet()));
+nextMetadata, node, 
topicPartitionsToLogString(next.keySet()));
 }
 sessionPartitions = next;
 next = null;
+Map toSend =
+Collections.unmodifiableMap(new 
LinkedHashMap<>(sessionPartitions));
 // Only add topic IDs to the session if we are using topic IDs.
 if (canUseTopicIds) {
-sessionTopicIds = topicIds;
-sessionTopicNames = new HashMap<>(topicIds.size());
-topicIds.forEach((name, id) -> sessionTopicNames.put(id, 
name));
+Map> newTopicNames = 
sessionPartitions.entrySet().stream().collect(Collectors.groupingByConcurrent(entry
 -> entry.getValue().topicId,
+Collectors.mapping(entry -> 
entry.getKey().topic(), Collectors.toSet(;

Review comment:
   I think that the grouping is slower because it has to allocate another 
Map, Sets for each Uuid, etc.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743011890



##
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##
@@ -1361,102 +1542,113 @@ class FetchSessionTest {
 val resp4 = context2.updateAndGenerateResponseData(respData)
 assertEquals(Errors.NONE, resp4.error)
 assertEquals(resp1.sessionId, resp4.sessionId)
-assertEquals(Utils.mkSet(tp1, tp2), resp4.responseData(topicNames, 
request2.version).keySet)
+assertEquals(Utils.mkSet(tp1.topicPartition, tp2.topicPartition), 
resp4.responseData(topicNames, request2.version).keySet)
   }
 
   @Test
   def testDeprioritizesPartitionsWithRecordsOnly(): Unit = {
 val time = new MockTime()
 val cache = new FetchSessionCache(10, 1000)
 val fetchManager = new FetchManager(time, cache)
-val tp1 = new TopicPartition("foo", 1)
-val tp2 = new TopicPartition("bar", 2)
-val tp3 = new TopicPartition("zar", 3)
 val topicIds = Map("foo" -> Uuid.randomUuid(), "bar" -> Uuid.randomUuid(), 
"zar" -> Uuid.randomUuid()).asJava
 val topicNames = topicIds.asScala.map(_.swap).asJava
+val tp1 = new TopicIdPartition(topicIds.get("foo"), new 
TopicPartition("foo", 1))
+val tp2 = new TopicIdPartition(topicIds.get("bar"), new 
TopicPartition("bar", 2))
+val tp3 = new TopicIdPartition(topicIds.get("zar"), new 
TopicPartition("zar", 3))
 
-val reqData = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
-reqData.put(tp1, new FetchRequest.PartitionData(100, 0, 1000, 
Optional.of(5), Optional.of(4)))
-reqData.put(tp2, new FetchRequest.PartitionData(100, 0, 1000, 
Optional.of(5), Optional.of(4)))
-reqData.put(tp3, new FetchRequest.PartitionData(100, 0, 1000, 
Optional.of(5), Optional.of(4)))
+val reqData = new util.LinkedHashMap[TopicIdPartition, 
FetchRequest.PartitionData]
+reqData.put(tp1, new FetchRequest.PartitionData(topicIds.get("foo"), 100, 
0, 1000, Optional.of(5), Optional.of(4)))
+reqData.put(tp2, new FetchRequest.PartitionData(topicIds.get("bar"), 100, 
0, 1000, Optional.of(5), Optional.of(4)))
+reqData.put(tp3, new FetchRequest.PartitionData(topicIds.get("zar"), 100, 
0, 1000, Optional.of(5), Optional.of(4)))
 
 // Full fetch context returns all partitions in the response
 val context1 = fetchManager.newContext(ApiKeys.FETCH.latestVersion(), 
JFetchMetadata.INITIAL, false,
- reqData, Collections.emptyList(), topicIds)
+ reqData, Collections.emptyList(), topicNames)
 assertEquals(classOf[FullFetchContext], context1.getClass)
 
-val respData1 = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
+val respData1 = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData]
 respData1.put(tp1, new FetchResponseData.PartitionData()
-  .setPartitionIndex(tp1.partition)
+  .setPartitionIndex(tp1.topicPartition.partition)
   .setHighWatermark(50)
   .setLastStableOffset(50)
   .setLogStartOffset(0))
 respData1.put(tp2, new FetchResponseData.PartitionData()
-  .setPartitionIndex(tp2.partition)
+  .setPartitionIndex(tp2.topicPartition.partition)
   .setHighWatermark(50)
   .setLastStableOffset(50)
   .setLogStartOffset(0))
 respData1.put(tp3, new FetchResponseData.PartitionData()
-  .setPartitionIndex(tp3.partition)
+  .setPartitionIndex(tp3.topicPartition.partition)
   .setHighWatermark(50)
   .setLastStableOffset(50)
   .setLogStartOffset(0))
 
 val resp1 = context1.updateAndGenerateResponseData(respData1)
 assertEquals(Errors.NONE, resp1.error)
 assertNotEquals(INVALID_SESSION_ID, resp1.sessionId)
-assertEquals(Utils.mkSet(tp1, tp2, tp3), resp1.responseData(topicNames, 
ApiKeys.FETCH.latestVersion()).keySet())
+assertEquals(Utils.mkSet(tp1.topicPartition, tp2.topicPartition, 
tp3.topicPartition), resp1.responseData(topicNames, 
ApiKeys.FETCH.latestVersion()).keySet())
 
 // Incremental fetch context returns partitions with changes but only 
deprioritizes
 // the partitions with records
 val context2 = fetchManager.newContext(ApiKeys.FETCH.latestVersion(), new 
JFetchMetadata(resp1.sessionId, 1), false,
-  reqData, Collections.emptyList(), topicIds)
+  reqData, Collections.emptyList(), topicNames)
 assertEquals(classOf[IncrementalFetchContext], context2.getClass)
 
 // Partitions are ordered in the session as per last response
 assertPartitionsOrder(context2, Seq(tp1, tp2, tp3))
 
 // Response is empty
-val respData2 = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
+val respData2 = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData]
 val resp2 = context2.updateAndGenerateResponseData(respData2)
 assertEquals(Errors.NONE, resp2.error)
 assertEquals(resp1.sessionId, resp2.sessionId)
 

[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r742933244



##
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##
@@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() {
 List partitions = Arrays.asList(0, 1);
 partitions.forEach(partition -> {
 String testType = partition == 0 ? "updating a partition" : 
"adding a new partition";
-Map topicIds = Collections.singletonMap("foo", 
Uuid.randomUuid());
+Uuid fooId = Uuid.randomUuid();
 FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
 FetchSessionHandler.Builder builder = handler.newBuilder();
-builder.add(new TopicPartition("foo", 0),  topicIds.get("foo"),
-new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
+builder.add(new TopicPartition("foo", 0),
+new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
 FetchSessionHandler.FetchRequestData data = builder.build();
-assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
 data.toSend(), data.sessionPartitions());
 assertTrue(data.metadata().isFull());
 assertTrue(data.canUseTopicIds());
 
 FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
+respMap(new RespEntry("foo", 0, fooId, 10, 20)));
 handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
 // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
 FetchSessionHandler.Builder builder2 = handler.newBuilder();
-builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
-new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+builder2.add(new TopicPartition("foo", partition),
+new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 
210, Optional.empty()));
 FetchSessionHandler.FetchRequestData data2 = builder2.build();
-// Should have the same session ID and next epoch, but can no 
longer use topic IDs.
-// The receiving broker will close the session if we were 
previously using topic IDs.
+// Should have the same session ID, and next epoch and can no 
longer use topic IDs.
+// The receiving broker will handle closing the session.
 assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
 assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
 assertFalse(data2.canUseTopicIds());
 });
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testTopicIdReplaced(boolean fetchRequestUsesIds) {
+TopicPartition tp = new TopicPartition("foo", 0);
+FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+FetchSessionHandler.Builder builder = handler.newBuilder();
+Uuid topicId1 = Uuid.randomUuid();
+builder.add(tp,
+new FetchRequest.PartitionData(topicId1, 0, 100, 200, 
Optional.empty()));
+FetchSessionHandler.FetchRequestData data = builder.build();
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)),
+data.toSend(), data.sessionPartitions());
+assertTrue(data.metadata().isFull());
+assertTrue(data.canUseTopicIds());
+
+FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
+respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20)));
+handler.handleResponse(resp, (short) 12);
+
+// Try to add a new topic ID.
+FetchSessionHandler.Builder builder2 = handler.newBuilder();
+Uuid topicId2 = Uuid.randomUuid();
+// Use the same data besides the topic ID.
+FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty());
+builder2.add(tp, partitionData);
+FetchSessionHandler.FetchRequestData data2 = builder2.build();
+// The old topic ID partition should be in toReplace, and the new one 
should be in toSend.
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)),
+data2.toSend(), data2.sessionPartitions());
+assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, 
tp)), data2.toReplace());
+// Should have the same session ID, and next epoch and can use topic 
IDs.
+assertEquals(123, data2.metadata().sessionId(), "Did not use same 

[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-03 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r741789472



##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -285,52 +268,57 @@ public FetchRequestData build() {
 if (nextMetadata.isFull()) {
 if (log.isDebugEnabled()) {
 log.debug("Built full fetch {} for node {} with {}.",
-  nextMetadata, node, 
partitionsToLogString(next.keySet()));
+nextMetadata, node, 
topicPartitionsToLogString(next.keySet()));
 }
 sessionPartitions = next;
 next = null;
+Map toSend =
+Collections.unmodifiableMap(new 
LinkedHashMap<>(sessionPartitions));
 // Only add topic IDs to the session if we are using topic IDs.
 if (canUseTopicIds) {
-sessionTopicIds = topicIds;
-sessionTopicNames = new HashMap<>(topicIds.size());
-topicIds.forEach((name, id) -> sessionTopicNames.put(id, 
name));
+Map> newTopicNames = 
sessionPartitions.entrySet().stream().collect(Collectors.groupingByConcurrent(entry
 -> entry.getValue().topicId,
+Collectors.mapping(entry -> 
entry.getKey().topic(), Collectors.toSet(;

Review comment:
   Could we iterate over `sessionPartitions` and directly populate 
`sessionTopicNames` by using `putIfAbsent` or even `put`? The grouping seems 
unnecessary to me here unless I am missing something.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -285,52 +268,57 @@ public FetchRequestData build() {
 if (nextMetadata.isFull()) {
 if (log.isDebugEnabled()) {
 log.debug("Built full fetch {} for node {} with {}.",
-  nextMetadata, node, 
partitionsToLogString(next.keySet()));
+nextMetadata, node, 
topicPartitionsToLogString(next.keySet()));
 }
 sessionPartitions = next;
 next = null;
+Map toSend =
+Collections.unmodifiableMap(new 
LinkedHashMap<>(sessionPartitions));

Review comment:
   As `toSend` is not used before L288, how about putting this line over 
there?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -346,38 +334,36 @@ public FetchRequestData build() {
 break;
 }
 sessionPartitions.put(topicPartition, nextData);
-added.add(topicPartition);
+added.add(new TopicIdPartition(nextData.topicId, 
topicPartition));
 }
 
 // Add topic IDs to session if we can use them. If an ID is 
inconsistent, we will handle in the receiving broker.
 // If we switched from using topic IDs to not using them (or vice 
versa), that error will also be handled in the receiving broker.
 if (canUseTopicIds) {
-for (Map.Entry topic : topicIds.entrySet()) {
-String topicName = topic.getKey();
-Uuid addedId = topic.getValue();
-sessionTopicIds.put(topicName, addedId);
-sessionTopicNames.put(addedId, topicName);
-}
+Map> newTopicNames = 
added.stream().collect(Collectors.groupingByConcurrent(TopicIdPartition::topicId,
+Collectors.mapping(topicIdPartition -> 
topicIdPartition.topicPartition().topic(), Collectors.toSet(;
+
+// There should only be one topic name per topic ID.
+newTopicNames.forEach((topicId, topicNamesSet) -> 
topicNamesSet.forEach(topicName -> sessionTopicNames.put(topicId, topicName)));
 }
 
 if (log.isDebugEnabled()) {
-log.debug("Built incremental fetch {} for node {}. Added {}, 
altered {}, removed {} " +
-  "out of {}", nextMetadata, node, 
partitionsToLogString(added),
-  partitionsToLogString(altered), 
partitionsToLogString(removed),
-  partitionsToLogString(sessionPartitions.keySet()));
+log.debug("Built incremental fetch {} for node {}. Added {}, 
altered {}, removed {}, " +
+"replaced {} out of {}", nextMetadata, node, 
topicIdPartitionsToLogString(added),

Review comment:
   nit: Could we align like it was before?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -346,38 +334,36 @@ public FetchRequestData build() {
 break;
 }
 sessionPartitions.put(topicPartition, 

[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-03 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r741789472



##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -285,52 +268,57 @@ public FetchRequestData build() {
 if (nextMetadata.isFull()) {
 if (log.isDebugEnabled()) {
 log.debug("Built full fetch {} for node {} with {}.",
-  nextMetadata, node, 
partitionsToLogString(next.keySet()));
+nextMetadata, node, 
topicPartitionsToLogString(next.keySet()));
 }
 sessionPartitions = next;
 next = null;
+Map toSend =
+Collections.unmodifiableMap(new 
LinkedHashMap<>(sessionPartitions));
 // Only add topic IDs to the session if we are using topic IDs.
 if (canUseTopicIds) {
-sessionTopicIds = topicIds;
-sessionTopicNames = new HashMap<>(topicIds.size());
-topicIds.forEach((name, id) -> sessionTopicNames.put(id, 
name));
+Map> newTopicNames = 
sessionPartitions.entrySet().stream().collect(Collectors.groupingByConcurrent(entry
 -> entry.getValue().topicId,
+Collectors.mapping(entry -> 
entry.getKey().topic(), Collectors.toSet(;

Review comment:
   Could we iterate over `sessionPartitions` and directly populate 
`sessionTopicNames` by using `putIfAbsent` or even `put`? The grouping seems 
unnecessary to me here unless I am missing something.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -285,52 +268,57 @@ public FetchRequestData build() {
 if (nextMetadata.isFull()) {
 if (log.isDebugEnabled()) {
 log.debug("Built full fetch {} for node {} with {}.",
-  nextMetadata, node, 
partitionsToLogString(next.keySet()));
+nextMetadata, node, 
topicPartitionsToLogString(next.keySet()));
 }
 sessionPartitions = next;
 next = null;
+Map toSend =
+Collections.unmodifiableMap(new 
LinkedHashMap<>(sessionPartitions));

Review comment:
   As `toSend` is not used before L288, how about putting this line over 
there?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -346,38 +334,36 @@ public FetchRequestData build() {
 break;
 }
 sessionPartitions.put(topicPartition, nextData);
-added.add(topicPartition);
+added.add(new TopicIdPartition(nextData.topicId, 
topicPartition));
 }
 
 // Add topic IDs to session if we can use them. If an ID is 
inconsistent, we will handle in the receiving broker.
 // If we switched from using topic IDs to not using them (or vice 
versa), that error will also be handled in the receiving broker.
 if (canUseTopicIds) {
-for (Map.Entry topic : topicIds.entrySet()) {
-String topicName = topic.getKey();
-Uuid addedId = topic.getValue();
-sessionTopicIds.put(topicName, addedId);
-sessionTopicNames.put(addedId, topicName);
-}
+Map> newTopicNames = 
added.stream().collect(Collectors.groupingByConcurrent(TopicIdPartition::topicId,
+Collectors.mapping(topicIdPartition -> 
topicIdPartition.topicPartition().topic(), Collectors.toSet(;
+
+// There should only be one topic name per topic ID.
+newTopicNames.forEach((topicId, topicNamesSet) -> 
topicNamesSet.forEach(topicName -> sessionTopicNames.put(topicId, topicName)));
 }
 
 if (log.isDebugEnabled()) {
-log.debug("Built incremental fetch {} for node {}. Added {}, 
altered {}, removed {} " +
-  "out of {}", nextMetadata, node, 
partitionsToLogString(added),
-  partitionsToLogString(altered), 
partitionsToLogString(removed),
-  partitionsToLogString(sessionPartitions.keySet()));
+log.debug("Built incremental fetch {} for node {}. Added {}, 
altered {}, removed {}, " +
+"replaced {} out of {}", nextMetadata, node, 
topicIdPartitionsToLogString(added),

Review comment:
   nit: Could we align like it was before?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -346,38 +334,36 @@ public FetchRequestData build() {
 break;
 }
 sessionPartitions.put(topicPartition, 

[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-03 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r741789472



##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -285,52 +268,57 @@ public FetchRequestData build() {
 if (nextMetadata.isFull()) {
 if (log.isDebugEnabled()) {
 log.debug("Built full fetch {} for node {} with {}.",
-  nextMetadata, node, 
partitionsToLogString(next.keySet()));
+nextMetadata, node, 
topicPartitionsToLogString(next.keySet()));
 }
 sessionPartitions = next;
 next = null;
+Map toSend =
+Collections.unmodifiableMap(new 
LinkedHashMap<>(sessionPartitions));
 // Only add topic IDs to the session if we are using topic IDs.
 if (canUseTopicIds) {
-sessionTopicIds = topicIds;
-sessionTopicNames = new HashMap<>(topicIds.size());
-topicIds.forEach((name, id) -> sessionTopicNames.put(id, 
name));
+Map> newTopicNames = 
sessionPartitions.entrySet().stream().collect(Collectors.groupingByConcurrent(entry
 -> entry.getValue().topicId,
+Collectors.mapping(entry -> 
entry.getKey().topic(), Collectors.toSet(;

Review comment:
   Could we iterate over `sessionPartitions` and directly populate 
`sessionTopicNames` by using `putIfAbsent` or even `put`? The grouping seems 
unnecessary to me here unless I am missing something.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -285,52 +268,57 @@ public FetchRequestData build() {
 if (nextMetadata.isFull()) {
 if (log.isDebugEnabled()) {
 log.debug("Built full fetch {} for node {} with {}.",
-  nextMetadata, node, 
partitionsToLogString(next.keySet()));
+nextMetadata, node, 
topicPartitionsToLogString(next.keySet()));
 }
 sessionPartitions = next;
 next = null;
+Map toSend =
+Collections.unmodifiableMap(new 
LinkedHashMap<>(sessionPartitions));

Review comment:
   As `toSend` is not used before L288, how about putting this line over 
there?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -346,38 +334,36 @@ public FetchRequestData build() {
 break;
 }
 sessionPartitions.put(topicPartition, nextData);
-added.add(topicPartition);
+added.add(new TopicIdPartition(nextData.topicId, 
topicPartition));
 }
 
 // Add topic IDs to session if we can use them. If an ID is 
inconsistent, we will handle in the receiving broker.
 // If we switched from using topic IDs to not using them (or vice 
versa), that error will also be handled in the receiving broker.
 if (canUseTopicIds) {
-for (Map.Entry topic : topicIds.entrySet()) {
-String topicName = topic.getKey();
-Uuid addedId = topic.getValue();
-sessionTopicIds.put(topicName, addedId);
-sessionTopicNames.put(addedId, topicName);
-}
+Map> newTopicNames = 
added.stream().collect(Collectors.groupingByConcurrent(TopicIdPartition::topicId,
+Collectors.mapping(topicIdPartition -> 
topicIdPartition.topicPartition().topic(), Collectors.toSet(;
+
+// There should only be one topic name per topic ID.
+newTopicNames.forEach((topicId, topicNamesSet) -> 
topicNamesSet.forEach(topicName -> sessionTopicNames.put(topicId, topicName)));
 }
 
 if (log.isDebugEnabled()) {
-log.debug("Built incremental fetch {} for node {}. Added {}, 
altered {}, removed {} " +
-  "out of {}", nextMetadata, node, 
partitionsToLogString(added),
-  partitionsToLogString(altered), 
partitionsToLogString(removed),
-  partitionsToLogString(sessionPartitions.keySet()));
+log.debug("Built incremental fetch {} for node {}. Added {}, 
altered {}, removed {}, " +
+"replaced {} out of {}", nextMetadata, node, 
topicIdPartitionsToLogString(added),

Review comment:
   nit: Could we align like it was before?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -346,38 +334,36 @@ public FetchRequestData build() {
 break;
 }
 sessionPartitions.put(topicPartition, 

[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-10-13 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r727909392



##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -249,115 +215,126 @@ public String toString() {
  * Another reason is because we make use of the list ordering to 
optimize the preparation of
  * incremental fetch requests (see below).
  */
-private LinkedHashMap next;
-private Map topicIds;
+private LinkedHashMap next;
 private final boolean copySessionPartitions;
 private int partitionsWithoutTopicIds = 0;
+private int partitionsWithTopicIds = 0;
 
 Builder() {
 this.next = new LinkedHashMap<>();
-this.topicIds = new HashMap<>();
 this.copySessionPartitions = true;
 }
 
 Builder(int initialSize, boolean copySessionPartitions) {
 this.next = new LinkedHashMap<>(initialSize);
-this.topicIds = new HashMap<>(initialSize);
 this.copySessionPartitions = copySessionPartitions;
 }
 
 /**
  * Mark that we want data from this partition in the upcoming fetch.
  */
-public void add(TopicPartition topicPartition, Uuid topicId, 
PartitionData data) {
-next.put(topicPartition, data);
-// topicIds should not change between adding partitions and 
building, so we can use putIfAbsent
-if (!topicId.equals(Uuid.ZERO_UUID)) {
-topicIds.putIfAbsent(topicPartition.topic(), topicId);
-} else {
+public void add(TopicIdPartition topicIdPartition, PartitionData data) 
{
+next.put(topicIdPartition, data);
+if (topicIdPartition.topicId().equals(Uuid.ZERO_UUID)) {
 partitionsWithoutTopicIds++;
+} else {
+partitionsWithTopicIds++;
+}
+}
+
+private Map buildFullSession(boolean 
canUseTopicIds) {
+if (log.isDebugEnabled()) {
+log.debug("Built full fetch {} for node {} with {}.",
+nextMetadata, node, 
partitionsToLogString(next.keySet()));
 }
+sessionPartitions = next;
+next = null;
+// Only add topic IDs to the session if we are using topic IDs.
+sessionTopicNames = new HashMap<>();
+if (canUseTopicIds) {
+Map> newTopicNames = 
sessionPartitions.keySet().stream().collect(Collectors.groupingByConcurrent(TopicIdPartition::topicId,
+Collectors.mapping(topicIdPartition -> 
topicIdPartition.topicPartition().topic(), Collectors.toSet(;
+
+sessionTopicNames = new HashMap<>(newTopicNames.size());
+// There should only be one topic name per topic ID.
+newTopicNames.forEach((topicId, topicNamesSet) -> 
topicNamesSet.forEach(topicName -> sessionTopicNames.put(topicId, topicName)));
+} else {
+sessionTopicNames = new HashMap<>();
+}
+return Collections.unmodifiableMap(new 
LinkedHashMap<>(sessionPartitions));
 }
 
 public FetchRequestData build() {
 boolean canUseTopicIds = partitionsWithoutTopicIds == 0;
 
 if (nextMetadata.isFull()) {
-if (log.isDebugEnabled()) {
-log.debug("Built full fetch {} for node {} with {}.",
-  nextMetadata, node, 
partitionsToLogString(next.keySet()));
-}
-sessionPartitions = next;
-next = null;
-// Only add topic IDs to the session if we are using topic IDs.
-if (canUseTopicIds) {
-sessionTopicIds = topicIds;
-sessionTopicNames = new HashMap<>(topicIds.size());
-topicIds.forEach((name, id) -> sessionTopicNames.put(id, 
name));
-} else {
-sessionTopicIds = new HashMap<>();
-sessionTopicNames = new HashMap<>();
-}
-topicIds = null;
-Map toSend =
-Collections.unmodifiableMap(new 
LinkedHashMap<>(sessionPartitions));
-Map toSendTopicIds =
-Collections.unmodifiableMap(new 
HashMap<>(sessionTopicIds));
-Map toSendTopicNames =
-Collections.unmodifiableMap(new 
HashMap<>(sessionTopicNames));
-return new FetchRequestData(toSend, Collections.emptyList(), 
toSend, toSendTopicIds, toSendTopicNames, nextMetadata, canUseTopicIds);
+Map toSend = 
buildFullSession(canUseTopicIds);
+return new FetchRequestData(toSend, Collections.emptyList(), 
toSend, nextMetadata, canUseTopicIds);
 }
 
-List added = new ArrayList<>();

[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-10-13 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r728132912



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -708,40 +701,41 @@ class KafkaApis(val requestChannel: RequestChannel,
   None
 }
 
-val erroneous = mutable.ArrayBuffer[(TopicPartition, 
FetchResponseData.PartitionData)]()
-val interesting = mutable.ArrayBuffer[(TopicPartition, 
FetchRequest.PartitionData)]()
-val sessionTopicIds = mutable.Map[String, Uuid]()
+val erroneous = mutable.ArrayBuffer[(TopicIdPartition, 
FetchResponseData.PartitionData)]()
+val interesting = mutable.ArrayBuffer[(TopicIdPartition, 
FetchRequest.PartitionData)]()
 if (fetchRequest.isFromFollower) {
   // The follower must have ClusterAction on ClusterResource in order to 
fetch partition data.
   if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, 
CLUSTER_NAME)) {
-fetchContext.foreachPartition { (topicPartition, topicId, data) =>
-  sessionTopicIds.put(topicPartition.topic(), topicId)
-  if (!metadataCache.contains(topicPartition))
-erroneous += topicPartition -> 
FetchResponse.partitionResponse(topicPartition.partition, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+fetchContext.foreachPartition { (topicIdPartition, data) =>
+  if (topicIdPartition.topicPartition.topic == null )

Review comment:
   Sounds good, thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-10-13 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r728115288



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -708,40 +701,41 @@ class KafkaApis(val requestChannel: RequestChannel,
   None
 }
 
-val erroneous = mutable.ArrayBuffer[(TopicPartition, 
FetchResponseData.PartitionData)]()
-val interesting = mutable.ArrayBuffer[(TopicPartition, 
FetchRequest.PartitionData)]()
-val sessionTopicIds = mutable.Map[String, Uuid]()
+val erroneous = mutable.ArrayBuffer[(TopicIdPartition, 
FetchResponseData.PartitionData)]()
+val interesting = mutable.ArrayBuffer[(TopicIdPartition, 
FetchRequest.PartitionData)]()
 if (fetchRequest.isFromFollower) {
   // The follower must have ClusterAction on ClusterResource in order to 
fetch partition data.
   if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, 
CLUSTER_NAME)) {
-fetchContext.foreachPartition { (topicPartition, topicId, data) =>
-  sessionTopicIds.put(topicPartition.topic(), topicId)
-  if (!metadataCache.contains(topicPartition))
-erroneous += topicPartition -> 
FetchResponse.partitionResponse(topicPartition.partition, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+fetchContext.foreachPartition { (topicIdPartition, data) =>
+  if (topicIdPartition.topicPartition.topic == null )

Review comment:
   Actually, you're right. That is not entirely true. I thought that the 
`requireNonNull` for the `topic` in one of the 
[constructor](https://github.com/apache/kafka/pull/11374/files#diff-3d6aa1dec2a2548f28148717926536cc937acec2ab4bd03a7bcdc58c84a6cbbaR38)
 would prevent this to work. However as we use the other `TopicIdPartition` 
constructor in this case, it is not impacted by the `requireNonNull`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-10-13 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r728124707



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -708,40 +701,41 @@ class KafkaApis(val requestChannel: RequestChannel,
   None
 }
 
-val erroneous = mutable.ArrayBuffer[(TopicPartition, 
FetchResponseData.PartitionData)]()
-val interesting = mutable.ArrayBuffer[(TopicPartition, 
FetchRequest.PartitionData)]()
-val sessionTopicIds = mutable.Map[String, Uuid]()
+val erroneous = mutable.ArrayBuffer[(TopicIdPartition, 
FetchResponseData.PartitionData)]()
+val interesting = mutable.ArrayBuffer[(TopicIdPartition, 
FetchRequest.PartitionData)]()
 if (fetchRequest.isFromFollower) {
   // The follower must have ClusterAction on ClusterResource in order to 
fetch partition data.
   if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, 
CLUSTER_NAME)) {
-fetchContext.foreachPartition { (topicPartition, topicId, data) =>
-  sessionTopicIds.put(topicPartition.topic(), topicId)
-  if (!metadataCache.contains(topicPartition))
-erroneous += topicPartition -> 
FetchResponse.partitionResponse(topicPartition.partition, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+fetchContext.foreachPartition { (topicIdPartition, data) =>
+  if (topicIdPartition.topicPartition.topic == null )

Review comment:
   In this case, it would be nice if we would have a `TopicIdPartition` 
which contains an optional topic name. For the context, the issue is that we 
might have partitions in the fetch requests for which the topic name is unknown 
or not yet known by the broker.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-10-13 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r728115288



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -708,40 +701,41 @@ class KafkaApis(val requestChannel: RequestChannel,
   None
 }
 
-val erroneous = mutable.ArrayBuffer[(TopicPartition, 
FetchResponseData.PartitionData)]()
-val interesting = mutable.ArrayBuffer[(TopicPartition, 
FetchRequest.PartitionData)]()
-val sessionTopicIds = mutable.Map[String, Uuid]()
+val erroneous = mutable.ArrayBuffer[(TopicIdPartition, 
FetchResponseData.PartitionData)]()
+val interesting = mutable.ArrayBuffer[(TopicIdPartition, 
FetchRequest.PartitionData)]()
 if (fetchRequest.isFromFollower) {
   // The follower must have ClusterAction on ClusterResource in order to 
fetch partition data.
   if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, 
CLUSTER_NAME)) {
-fetchContext.foreachPartition { (topicPartition, topicId, data) =>
-  sessionTopicIds.put(topicPartition.topic(), topicId)
-  if (!metadataCache.contains(topicPartition))
-erroneous += topicPartition -> 
FetchResponse.partitionResponse(topicPartition.partition, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+fetchContext.foreachPartition { (topicIdPartition, data) =>
+  if (topicIdPartition.topicPartition.topic == null )

Review comment:
   Actually, you're right. That is not entirely true. I thought that the 
`requireNonNull` for the `topic` in one of the 
[constructor](https://github.com/apache/kafka/pull/11374/files#diff-3d6aa1dec2a2548f28148717926536cc937acec2ab4bd03a7bcdc58c84a6cbbaR38)
 would prevent this to work. However, as we use the other `TopicIdPartition` 
constructor here it is not impacted by the `requireNonNull`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-10-13 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r727996093



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -708,40 +701,41 @@ class KafkaApis(val requestChannel: RequestChannel,
   None
 }
 
-val erroneous = mutable.ArrayBuffer[(TopicPartition, 
FetchResponseData.PartitionData)]()
-val interesting = mutable.ArrayBuffer[(TopicPartition, 
FetchRequest.PartitionData)]()
-val sessionTopicIds = mutable.Map[String, Uuid]()
+val erroneous = mutable.ArrayBuffer[(TopicIdPartition, 
FetchResponseData.PartitionData)]()
+val interesting = mutable.ArrayBuffer[(TopicIdPartition, 
FetchRequest.PartitionData)]()
 if (fetchRequest.isFromFollower) {
   // The follower must have ClusterAction on ClusterResource in order to 
fetch partition data.
   if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, 
CLUSTER_NAME)) {
-fetchContext.foreachPartition { (topicPartition, topicId, data) =>
-  sessionTopicIds.put(topicPartition.topic(), topicId)
-  if (!metadataCache.contains(topicPartition))
-erroneous += topicPartition -> 
FetchResponse.partitionResponse(topicPartition.partition, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+fetchContext.foreachPartition { (topicIdPartition, data) =>
+  if (topicIdPartition.topicPartition.topic == null )

Review comment:
   @jolshan With Ismael's PR (https://github.com/apache/kafka/pull/11374), 
this trick does not work any more. We need to think about an alternative/better 
approach. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-10-13 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r727909392



##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -249,115 +215,126 @@ public String toString() {
  * Another reason is because we make use of the list ordering to 
optimize the preparation of
  * incremental fetch requests (see below).
  */
-private LinkedHashMap next;
-private Map topicIds;
+private LinkedHashMap next;
 private final boolean copySessionPartitions;
 private int partitionsWithoutTopicIds = 0;
+private int partitionsWithTopicIds = 0;
 
 Builder() {
 this.next = new LinkedHashMap<>();
-this.topicIds = new HashMap<>();
 this.copySessionPartitions = true;
 }
 
 Builder(int initialSize, boolean copySessionPartitions) {
 this.next = new LinkedHashMap<>(initialSize);
-this.topicIds = new HashMap<>(initialSize);
 this.copySessionPartitions = copySessionPartitions;
 }
 
 /**
  * Mark that we want data from this partition in the upcoming fetch.
  */
-public void add(TopicPartition topicPartition, Uuid topicId, 
PartitionData data) {
-next.put(topicPartition, data);
-// topicIds should not change between adding partitions and 
building, so we can use putIfAbsent
-if (!topicId.equals(Uuid.ZERO_UUID)) {
-topicIds.putIfAbsent(topicPartition.topic(), topicId);
-} else {
+public void add(TopicIdPartition topicIdPartition, PartitionData data) 
{
+next.put(topicIdPartition, data);
+if (topicIdPartition.topicId().equals(Uuid.ZERO_UUID)) {
 partitionsWithoutTopicIds++;
+} else {
+partitionsWithTopicIds++;
+}
+}
+
+private Map buildFullSession(boolean 
canUseTopicIds) {
+if (log.isDebugEnabled()) {
+log.debug("Built full fetch {} for node {} with {}.",
+nextMetadata, node, 
partitionsToLogString(next.keySet()));
 }
+sessionPartitions = next;
+next = null;
+// Only add topic IDs to the session if we are using topic IDs.
+sessionTopicNames = new HashMap<>();
+if (canUseTopicIds) {
+Map> newTopicNames = 
sessionPartitions.keySet().stream().collect(Collectors.groupingByConcurrent(TopicIdPartition::topicId,
+Collectors.mapping(topicIdPartition -> 
topicIdPartition.topicPartition().topic(), Collectors.toSet(;
+
+sessionTopicNames = new HashMap<>(newTopicNames.size());
+// There should only be one topic name per topic ID.
+newTopicNames.forEach((topicId, topicNamesSet) -> 
topicNamesSet.forEach(topicName -> sessionTopicNames.put(topicId, topicName)));
+} else {
+sessionTopicNames = new HashMap<>();
+}
+return Collections.unmodifiableMap(new 
LinkedHashMap<>(sessionPartitions));
 }
 
 public FetchRequestData build() {
 boolean canUseTopicIds = partitionsWithoutTopicIds == 0;
 
 if (nextMetadata.isFull()) {
-if (log.isDebugEnabled()) {
-log.debug("Built full fetch {} for node {} with {}.",
-  nextMetadata, node, 
partitionsToLogString(next.keySet()));
-}
-sessionPartitions = next;
-next = null;
-// Only add topic IDs to the session if we are using topic IDs.
-if (canUseTopicIds) {
-sessionTopicIds = topicIds;
-sessionTopicNames = new HashMap<>(topicIds.size());
-topicIds.forEach((name, id) -> sessionTopicNames.put(id, 
name));
-} else {
-sessionTopicIds = new HashMap<>();
-sessionTopicNames = new HashMap<>();
-}
-topicIds = null;
-Map toSend =
-Collections.unmodifiableMap(new 
LinkedHashMap<>(sessionPartitions));
-Map toSendTopicIds =
-Collections.unmodifiableMap(new 
HashMap<>(sessionTopicIds));
-Map toSendTopicNames =
-Collections.unmodifiableMap(new 
HashMap<>(sessionTopicNames));
-return new FetchRequestData(toSend, Collections.emptyList(), 
toSend, toSendTopicIds, toSendTopicNames, nextMetadata, canUseTopicIds);
+Map toSend = 
buildFullSession(canUseTopicIds);
+return new FetchRequestData(toSend, Collections.emptyList(), 
toSend, nextMetadata, canUseTopicIds);
 }
 
-List added = new ArrayList<>();

[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-10-12 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r727155822



##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -249,115 +215,126 @@ public String toString() {
  * Another reason is because we make use of the list ordering to 
optimize the preparation of
  * incremental fetch requests (see below).
  */
-private LinkedHashMap next;
-private Map topicIds;
+private LinkedHashMap next;
 private final boolean copySessionPartitions;
 private int partitionsWithoutTopicIds = 0;
+private int partitionsWithTopicIds = 0;
 
 Builder() {
 this.next = new LinkedHashMap<>();
-this.topicIds = new HashMap<>();
 this.copySessionPartitions = true;
 }
 
 Builder(int initialSize, boolean copySessionPartitions) {
 this.next = new LinkedHashMap<>(initialSize);
-this.topicIds = new HashMap<>(initialSize);
 this.copySessionPartitions = copySessionPartitions;
 }
 
 /**
  * Mark that we want data from this partition in the upcoming fetch.
  */
-public void add(TopicPartition topicPartition, Uuid topicId, 
PartitionData data) {
-next.put(topicPartition, data);
-// topicIds should not change between adding partitions and 
building, so we can use putIfAbsent
-if (!topicId.equals(Uuid.ZERO_UUID)) {
-topicIds.putIfAbsent(topicPartition.topic(), topicId);
-} else {
+public void add(TopicIdPartition topicIdPartition, PartitionData data) 
{
+next.put(topicIdPartition, data);
+if (topicIdPartition.topicId().equals(Uuid.ZERO_UUID)) {
 partitionsWithoutTopicIds++;
+} else {
+partitionsWithTopicIds++;
+}
+}
+
+private Map buildFullSession(boolean 
canUseTopicIds) {
+if (log.isDebugEnabled()) {
+log.debug("Built full fetch {} for node {} with {}.",
+nextMetadata, node, 
partitionsToLogString(next.keySet()));
 }
+sessionPartitions = next;
+next = null;
+// Only add topic IDs to the session if we are using topic IDs.
+sessionTopicNames = new HashMap<>();
+if (canUseTopicIds) {
+Map> newTopicNames = 
sessionPartitions.keySet().stream().collect(Collectors.groupingByConcurrent(TopicIdPartition::topicId,
+Collectors.mapping(topicIdPartition -> 
topicIdPartition.topicPartition().topic(), Collectors.toSet(;
+
+sessionTopicNames = new HashMap<>(newTopicNames.size());
+// There should only be one topic name per topic ID.
+newTopicNames.forEach((topicId, topicNamesSet) -> 
topicNamesSet.forEach(topicName -> sessionTopicNames.put(topicId, topicName)));
+} else {
+sessionTopicNames = new HashMap<>();
+}
+return Collections.unmodifiableMap(new 
LinkedHashMap<>(sessionPartitions));
 }
 
 public FetchRequestData build() {
 boolean canUseTopicIds = partitionsWithoutTopicIds == 0;
 
 if (nextMetadata.isFull()) {
-if (log.isDebugEnabled()) {
-log.debug("Built full fetch {} for node {} with {}.",
-  nextMetadata, node, 
partitionsToLogString(next.keySet()));
-}
-sessionPartitions = next;
-next = null;
-// Only add topic IDs to the session if we are using topic IDs.
-if (canUseTopicIds) {
-sessionTopicIds = topicIds;
-sessionTopicNames = new HashMap<>(topicIds.size());
-topicIds.forEach((name, id) -> sessionTopicNames.put(id, 
name));
-} else {
-sessionTopicIds = new HashMap<>();
-sessionTopicNames = new HashMap<>();
-}
-topicIds = null;
-Map toSend =
-Collections.unmodifiableMap(new 
LinkedHashMap<>(sessionPartitions));
-Map toSendTopicIds =
-Collections.unmodifiableMap(new 
HashMap<>(sessionTopicIds));
-Map toSendTopicNames =
-Collections.unmodifiableMap(new 
HashMap<>(sessionTopicNames));
-return new FetchRequestData(toSend, Collections.emptyList(), 
toSend, toSendTopicIds, toSendTopicNames, nextMetadata, canUseTopicIds);
+Map toSend = 
buildFullSession(canUseTopicIds);
+return new FetchRequestData(toSend, Collections.emptyList(), 
toSend, nextMetadata, canUseTopicIds);
 }
 
-List added = new ArrayList<>();

[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-10-12 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r727155822



##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -249,115 +215,126 @@ public String toString() {
  * Another reason is because we make use of the list ordering to 
optimize the preparation of
  * incremental fetch requests (see below).
  */
-private LinkedHashMap next;
-private Map topicIds;
+private LinkedHashMap next;
 private final boolean copySessionPartitions;
 private int partitionsWithoutTopicIds = 0;
+private int partitionsWithTopicIds = 0;
 
 Builder() {
 this.next = new LinkedHashMap<>();
-this.topicIds = new HashMap<>();
 this.copySessionPartitions = true;
 }
 
 Builder(int initialSize, boolean copySessionPartitions) {
 this.next = new LinkedHashMap<>(initialSize);
-this.topicIds = new HashMap<>(initialSize);
 this.copySessionPartitions = copySessionPartitions;
 }
 
 /**
  * Mark that we want data from this partition in the upcoming fetch.
  */
-public void add(TopicPartition topicPartition, Uuid topicId, 
PartitionData data) {
-next.put(topicPartition, data);
-// topicIds should not change between adding partitions and 
building, so we can use putIfAbsent
-if (!topicId.equals(Uuid.ZERO_UUID)) {
-topicIds.putIfAbsent(topicPartition.topic(), topicId);
-} else {
+public void add(TopicIdPartition topicIdPartition, PartitionData data) 
{
+next.put(topicIdPartition, data);
+if (topicIdPartition.topicId().equals(Uuid.ZERO_UUID)) {
 partitionsWithoutTopicIds++;
+} else {
+partitionsWithTopicIds++;
+}
+}
+
+private Map buildFullSession(boolean 
canUseTopicIds) {
+if (log.isDebugEnabled()) {
+log.debug("Built full fetch {} for node {} with {}.",
+nextMetadata, node, 
partitionsToLogString(next.keySet()));
 }
+sessionPartitions = next;
+next = null;
+// Only add topic IDs to the session if we are using topic IDs.
+sessionTopicNames = new HashMap<>();
+if (canUseTopicIds) {
+Map> newTopicNames = 
sessionPartitions.keySet().stream().collect(Collectors.groupingByConcurrent(TopicIdPartition::topicId,
+Collectors.mapping(topicIdPartition -> 
topicIdPartition.topicPartition().topic(), Collectors.toSet(;
+
+sessionTopicNames = new HashMap<>(newTopicNames.size());
+// There should only be one topic name per topic ID.
+newTopicNames.forEach((topicId, topicNamesSet) -> 
topicNamesSet.forEach(topicName -> sessionTopicNames.put(topicId, topicName)));
+} else {
+sessionTopicNames = new HashMap<>();
+}
+return Collections.unmodifiableMap(new 
LinkedHashMap<>(sessionPartitions));
 }
 
 public FetchRequestData build() {
 boolean canUseTopicIds = partitionsWithoutTopicIds == 0;
 
 if (nextMetadata.isFull()) {
-if (log.isDebugEnabled()) {
-log.debug("Built full fetch {} for node {} with {}.",
-  nextMetadata, node, 
partitionsToLogString(next.keySet()));
-}
-sessionPartitions = next;
-next = null;
-// Only add topic IDs to the session if we are using topic IDs.
-if (canUseTopicIds) {
-sessionTopicIds = topicIds;
-sessionTopicNames = new HashMap<>(topicIds.size());
-topicIds.forEach((name, id) -> sessionTopicNames.put(id, 
name));
-} else {
-sessionTopicIds = new HashMap<>();
-sessionTopicNames = new HashMap<>();
-}
-topicIds = null;
-Map toSend =
-Collections.unmodifiableMap(new 
LinkedHashMap<>(sessionPartitions));
-Map toSendTopicIds =
-Collections.unmodifiableMap(new 
HashMap<>(sessionTopicIds));
-Map toSendTopicNames =
-Collections.unmodifiableMap(new 
HashMap<>(sessionTopicNames));
-return new FetchRequestData(toSend, Collections.emptyList(), 
toSend, toSendTopicIds, toSendTopicNames, nextMetadata, canUseTopicIds);
+Map toSend = 
buildFullSession(canUseTopicIds);
+return new FetchRequestData(toSend, Collections.emptyList(), 
toSend, nextMetadata, canUseTopicIds);
 }
 
-List added = new ArrayList<>();

[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-09-24 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r715684131



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -672,29 +672,22 @@ class KafkaApis(val requestChannel: RequestChannel,
 val versionId = request.header.apiVersion
 val clientId = request.header.clientId
 val fetchRequest = request.body[FetchRequest]
-val (topicIds, topicNames) =
+val topicNames =
   if (fetchRequest.version() >= 13)
-metadataCache.topicIdInfo()
+metadataCache.topicIdsToNames()
   else
-(Collections.emptyMap[String, Uuid](), Collections.emptyMap[Uuid, 
String]())
+Collections.emptyMap[Uuid, String]()
 
-// If fetchData or forgottenTopics contain an unknown topic ID, return a 
top level error.
-var fetchData: util.Map[TopicPartition, FetchRequest.PartitionData] = null
-var forgottenTopics: util.List[TopicPartition] = null
-try {
-  fetchData = fetchRequest.fetchData(topicNames)
-  forgottenTopics = fetchRequest.forgottenTopics(topicNames)
-} catch {
-  case e: UnknownTopicIdException => throw e
-}
+val fetchData = fetchRequest.fetchData(topicNames)
+val forgottenTopics = fetchRequest.forgottenTopics(topicNames)

Review comment:
   When a session is used, resolving the topic ids is not really necessary 
here because we should already have the names in the session or we would 
resolve them later anyway. I wonder if it would be better to do this entirely 
in the `fetchManager.newConext` based on the context type. Have you considered 
something like this?

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -708,40 +701,45 @@ class KafkaApis(val requestChannel: RequestChannel,
   None
 }
 
-val erroneous = mutable.ArrayBuffer[(TopicPartition, 
FetchResponseData.PartitionData)]()
-val interesting = mutable.ArrayBuffer[(TopicPartition, 
FetchRequest.PartitionData)]()
+val erroneous = mutable.ArrayBuffer[(TopicIdPartition, 
FetchResponseData.PartitionData)]()
+val interesting = mutable.ArrayBuffer[(TopicIdPartition, 
FetchRequest.PartitionData)]()
 val sessionTopicIds = mutable.Map[String, Uuid]()
 if (fetchRequest.isFromFollower) {
   // The follower must have ClusterAction on ClusterResource in order to 
fetch partition data.
   if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, 
CLUSTER_NAME)) {
-fetchContext.foreachPartition { (topicPartition, topicId, data) =>
-  sessionTopicIds.put(topicPartition.topic(), topicId)
-  if (!metadataCache.contains(topicPartition))
-erroneous += topicPartition -> 
FetchResponse.partitionResponse(topicPartition.partition, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+fetchContext.foreachPartition { (topicIdPartition, data) =>
+  sessionTopicIds.put(topicIdPartition.topicPartition.topic, 
topicIdPartition.topicId)

Review comment:
   Do we still need this `sessionTopicIds` mapping if we have the topic id 
in the `topicIdPartition`?

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -708,40 +701,45 @@ class KafkaApis(val requestChannel: RequestChannel,
   None
 }
 
-val erroneous = mutable.ArrayBuffer[(TopicPartition, 
FetchResponseData.PartitionData)]()
-val interesting = mutable.ArrayBuffer[(TopicPartition, 
FetchRequest.PartitionData)]()
+val erroneous = mutable.ArrayBuffer[(TopicIdPartition, 
FetchResponseData.PartitionData)]()
+val interesting = mutable.ArrayBuffer[(TopicIdPartition, 
FetchRequest.PartitionData)]()
 val sessionTopicIds = mutable.Map[String, Uuid]()
 if (fetchRequest.isFromFollower) {
   // The follower must have ClusterAction on ClusterResource in order to 
fetch partition data.
   if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, 
CLUSTER_NAME)) {
-fetchContext.foreachPartition { (topicPartition, topicId, data) =>
-  sessionTopicIds.put(topicPartition.topic(), topicId)
-  if (!metadataCache.contains(topicPartition))
-erroneous += topicPartition -> 
FetchResponse.partitionResponse(topicPartition.partition, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+fetchContext.foreachPartition { (topicIdPartition, data) =>
+  sessionTopicIds.put(topicIdPartition.topicPartition.topic, 
topicIdPartition.topicId)
+  if (topicIdPartition.topicPartition.topic == null )
+erroneous += topicIdPartition -> 
FetchResponse.partitionResponse(topicIdPartition.topicPartition.partition, 
Errors.UNKNOWN_TOPIC_ID)
+  else if (!metadataCache.contains(topicIdPartition.topicPartition))
+erroneous += topicIdPartition -> 
FetchResponse.partitionResponse(topicIdPartition.topicPartition.partition, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
   else
-interesting += (topicPartition -> data)
+