[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743370521



##
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##
@@ -659,88 +670,108 @@ class FetchSessionTest {
   }
 
   @Test
-  def testIncrementalFetchSessionWithIdsWhenSessionDoesNotUseIds() : Unit = {
+  def testFetchSessionWithUnknownId(): Unit = {
 val time = new MockTime()
 val cache = new FetchSessionCache(10, 1000)
 val fetchManager = new FetchManager(time, cache)
-val topicIds = new util.HashMap[String, Uuid]()
-val topicNames = new util.HashMap[Uuid, String]()
+val topicNames = Map(Uuid.randomUuid() -> "foo", Uuid.randomUuid() -> 
"bar").asJava

Review comment:
   I can think more on this. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743370708



##
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##
@@ -794,22 +793,23 @@ class ReplicaManagerTest {
 
   // We receive one valid request from the follower and replica state is 
updated
   var successfulFetch: Option[FetchPartitionData] = None
-  def callback(response: Seq[(TopicPartition, FetchPartitionData)]): Unit 
= {
-successfulFetch = response.headOption.filter { case (topicPartition, 
_) => topicPartition == tp }.map { case (_, data) => data }
+  def callback(response: Seq[(TopicIdPartition, FetchPartitionData)]): 
Unit = {
+// Check the topic partition only since we are reusing this callback 
on different TopicIdPartitions.
+successfulFetch = response.headOption.filter { case (topicIdPartition, 
_) => topicIdPartition.topicPartition == tidp.topicPartition }.map { case (_, 
data) => data }

Review comment:
   STILL TODO for Friday




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743370419



##
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##
@@ -659,88 +670,108 @@ class FetchSessionTest {
   }
 
   @Test
-  def testIncrementalFetchSessionWithIdsWhenSessionDoesNotUseIds() : Unit = {
+  def testFetchSessionWithUnknownId(): Unit = {
 val time = new MockTime()
 val cache = new FetchSessionCache(10, 1000)
 val fetchManager = new FetchManager(time, cache)
-val topicIds = new util.HashMap[String, Uuid]()
-val topicNames = new util.HashMap[Uuid, String]()
+val topicNames = Map(Uuid.randomUuid() -> "foo", Uuid.randomUuid() -> 
"bar").asJava

Review comment:
   We could do that, but then this check will be a bit more complicated. 
   `context2.foreachPartition((topicIdPartition, _) => 
assertEquals(topicNames.get(topicIdPartition.topicId), topicIdPartition.topic))`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743369512



##
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##
@@ -659,88 +670,108 @@ class FetchSessionTest {
   }
 
   @Test
-  def testIncrementalFetchSessionWithIdsWhenSessionDoesNotUseIds() : Unit = {
+  def testFetchSessionWithUnknownId(): Unit = {
 val time = new MockTime()
 val cache = new FetchSessionCache(10, 1000)
 val fetchManager = new FetchManager(time, cache)
-val topicIds = new util.HashMap[String, Uuid]()
-val topicNames = new util.HashMap[Uuid, String]()
+val topicNames = Map(Uuid.randomUuid() -> "foo", Uuid.randomUuid() -> 
"bar").asJava
+val topicIds = topicNames.asScala.map(_.swap).asJava
+val foo0 = new TopicIdPartition(topicIds.getOrDefault("foo", 
Uuid.ZERO_UUID), new TopicPartition("foo", 0))
+val foo1 = new TopicIdPartition(topicIds.getOrDefault("foo", 
Uuid.ZERO_UUID), new TopicPartition("foo", 1))
+val emptyFoo0 = new TopicIdPartition(topicIds.getOrDefault("foo", 
Uuid.ZERO_UUID), new TopicPartition(null, 0))
+val emptyFoo1 = new TopicIdPartition(topicIds.getOrDefault("foo", 
Uuid.ZERO_UUID), new TopicPartition(null, 1))
 
-// Create a new fetch session with foo-0
+// Create a new fetch session with foo-0 and foo-1
 val reqData1 = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
-reqData1.put(new TopicPartition("foo", 0), new 
FetchRequest.PartitionData(0, 0, 100,
+reqData1.put(foo0.topicPartition, new 
FetchRequest.PartitionData(foo0.topicId, 0, 0, 100,
   Optional.empty()))
-val request1 = createRequestWithoutTopicIds(JFetchMetadata.INITIAL, 
reqData1, topicIds, EMPTY_PART_LIST, false)
-// Start a fetch session using a request version that does not use topic 
IDs.
+reqData1.put(foo1.topicPartition, new 
FetchRequest.PartitionData(foo1.topicId,10, 0, 100,
+  Optional.empty()))
+val request1 = createRequest(JFetchMetadata.INITIAL, reqData1, 
EMPTY_PART_LIST, false)
+// Simulate unknown topic ID for foo.
+val topicNamesOnlyBar = Collections.singletonMap(topicIds.get("bar"), 
"bar")
+// We should not throw error since we have an older request version.
 val context1 = fetchManager.newContext(
   request1.version,
   request1.metadata,
   request1.isFromFollower,
-  request1.fetchData(topicNames),
-  request1.forgottenTopics(topicNames),
-  topicIds
+  request1.fetchData(topicNamesOnlyBar),
+  request1.forgottenTopics(topicNamesOnlyBar),
+  topicNamesOnlyBar
 )
 assertEquals(classOf[FullFetchContext], context1.getClass)
-val respData1 = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
-respData1.put(new TopicPartition("foo", 0), new 
FetchResponseData.PartitionData()
+val respData1 = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData]

Review comment:
   Not quite sure what you meant here but I added this for now:
   `context1.foreachPartition((topicIdPartition, _) => 
assertEquals(topicIds.get("foo"), topicIdPartition.topicId))`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743365481



##
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##
@@ -1091,18 +1089,20 @@ class AbstractFetcherThreadTest {
 
 override def buildFetch(partitionMap: Map[TopicPartition, 
PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = {
   val fetchData = mutable.Map.empty[TopicPartition, 
FetchRequest.PartitionData]
-  partitionMap.foreach { case (partition, state) =>
+  partitionMap.foreach { case (partition, state) => 0
+.equals(0)

Review comment:
   I have no idea why this is here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743361897



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##
@@ -2069,10 +2071,10 @@ public void testReturnRecordsDuringRebalance() throws 
InterruptedException {
 ConsumerMetadata metadata = createMetadata(subscription);
 MockClient client = new MockClient(time, metadata);
 ConsumerPartitionAssignor assignor = new CooperativeStickyAssignor();
-KafkaConsumer consumer = newConsumer(time, client, 
subscription, metadata, assignor, true, groupInstanceId);
-
 initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1), 
Utils.mkEntry(topic2, 1), Utils.mkEntry(topic3, 1)));
 
+KafkaConsumer consumer = newConsumer(time, client, 
subscription, metadata, assignor, true, groupInstanceId);

Review comment:
   Looks like most of these changes were done by this commit: 
https://github.com/apache/kafka/pull/11331/commits/32c6297adb685f1863b8c7eb85f2f0965853a9f8
   so I can remove them pretty easily.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743360648



##
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##
@@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() {
 List partitions = Arrays.asList(0, 1);
 partitions.forEach(partition -> {
 String testType = partition == 0 ? "updating a partition" : 
"adding a new partition";
-Map topicIds = Collections.singletonMap("foo", 
Uuid.randomUuid());
+Uuid fooId = Uuid.randomUuid();
 FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
 FetchSessionHandler.Builder builder = handler.newBuilder();
-builder.add(new TopicPartition("foo", 0),  topicIds.get("foo"),
-new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
+builder.add(new TopicPartition("foo", 0),
+new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
 FetchSessionHandler.FetchRequestData data = builder.build();
-assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
 data.toSend(), data.sessionPartitions());
 assertTrue(data.metadata().isFull());
 assertTrue(data.canUseTopicIds());
 
 FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
+respMap(new RespEntry("foo", 0, fooId, 10, 20)));
 handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
 // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
 FetchSessionHandler.Builder builder2 = handler.newBuilder();
-builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
-new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+builder2.add(new TopicPartition("foo", partition),
+new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 
210, Optional.empty()));
 FetchSessionHandler.FetchRequestData data2 = builder2.build();
-// Should have the same session ID and next epoch, but can no 
longer use topic IDs.
-// The receiving broker will close the session if we were 
previously using topic IDs.
+// Should have the same session ID, and next epoch and can no 
longer use topic IDs.
+// The receiving broker will handle closing the session.
 assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
 assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
 assertFalse(data2.canUseTopicIds());
 });
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testTopicIdReplaced(boolean fetchRequestUsesIds) {
+TopicPartition tp = new TopicPartition("foo", 0);
+FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+FetchSessionHandler.Builder builder = handler.newBuilder();
+Uuid topicId1 = Uuid.randomUuid();
+builder.add(tp,
+new FetchRequest.PartitionData(topicId1, 0, 100, 200, 
Optional.empty()));
+FetchSessionHandler.FetchRequestData data = builder.build();
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)),
+data.toSend(), data.sessionPartitions());
+assertTrue(data.metadata().isFull());
+assertTrue(data.canUseTopicIds());
+
+FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
+respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20)));
+handler.handleResponse(resp, (short) 12);
+
+// Try to add a new topic ID.
+FetchSessionHandler.Builder builder2 = handler.newBuilder();
+Uuid topicId2 = Uuid.randomUuid();
+// Use the same data besides the topic ID.
+FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty());
+builder2.add(tp, partitionData);
+FetchSessionHandler.FetchRequestData data2 = builder2.build();
+// The old topic ID partition should be in toReplace, and the new one 
should be in toSend.
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)),
+data2.toSend(), data2.sessionPartitions());
+assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, 
tp)), data2.toReplace());

Review comment:
   Or are you just referring to a case where we don't ever have topic IDs?




-- 
This is an automated message from the Apache Git 

[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743357376



##
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##
@@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() {
 List partitions = Arrays.asList(0, 1);
 partitions.forEach(partition -> {
 String testType = partition == 0 ? "updating a partition" : 
"adding a new partition";
-Map topicIds = Collections.singletonMap("foo", 
Uuid.randomUuid());
+Uuid fooId = Uuid.randomUuid();
 FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
 FetchSessionHandler.Builder builder = handler.newBuilder();
-builder.add(new TopicPartition("foo", 0),  topicIds.get("foo"),
-new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
+builder.add(new TopicPartition("foo", 0),
+new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
 FetchSessionHandler.FetchRequestData data = builder.build();
-assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
 data.toSend(), data.sessionPartitions());
 assertTrue(data.metadata().isFull());
 assertTrue(data.canUseTopicIds());
 
 FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
+respMap(new RespEntry("foo", 0, fooId, 10, 20)));
 handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
 // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
 FetchSessionHandler.Builder builder2 = handler.newBuilder();
-builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
-new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+builder2.add(new TopicPartition("foo", partition),
+new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 
210, Optional.empty()));
 FetchSessionHandler.FetchRequestData data2 = builder2.build();
-// Should have the same session ID and next epoch, but can no 
longer use topic IDs.
-// The receiving broker will close the session if we were 
previously using topic IDs.
+// Should have the same session ID, and next epoch and can no 
longer use topic IDs.
+// The receiving broker will handle closing the session.
 assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
 assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
 assertFalse(data2.canUseTopicIds());
 });
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testTopicIdReplaced(boolean fetchRequestUsesIds) {
+TopicPartition tp = new TopicPartition("foo", 0);
+FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+FetchSessionHandler.Builder builder = handler.newBuilder();
+Uuid topicId1 = Uuid.randomUuid();
+builder.add(tp,
+new FetchRequest.PartitionData(topicId1, 0, 100, 200, 
Optional.empty()));
+FetchSessionHandler.FetchRequestData data = builder.build();
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)),
+data.toSend(), data.sessionPartitions());
+assertTrue(data.metadata().isFull());
+assertTrue(data.canUseTopicIds());
+
+FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
+respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20)));
+handler.handleResponse(resp, (short) 12);
+
+// Try to add a new topic ID.
+FetchSessionHandler.Builder builder2 = handler.newBuilder();
+Uuid topicId2 = Uuid.randomUuid();
+// Use the same data besides the topic ID.
+FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty());
+builder2.add(tp, partitionData);
+FetchSessionHandler.FetchRequestData data2 = builder2.build();
+// The old topic ID partition should be in toReplace, and the new one 
should be in toSend.
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)),
+data2.toSend(), data2.sessionPartitions());
+assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, 
tp)), data2.toReplace());

Review comment:
   Or did you mean just adding something like 
   `assertEquals(fetchRequestUsesIds, data2.toReplace().size() > 0);`




-- 
This is an 

[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743357376



##
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##
@@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() {
 List partitions = Arrays.asList(0, 1);
 partitions.forEach(partition -> {
 String testType = partition == 0 ? "updating a partition" : 
"adding a new partition";
-Map topicIds = Collections.singletonMap("foo", 
Uuid.randomUuid());
+Uuid fooId = Uuid.randomUuid();
 FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
 FetchSessionHandler.Builder builder = handler.newBuilder();
-builder.add(new TopicPartition("foo", 0),  topicIds.get("foo"),
-new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
+builder.add(new TopicPartition("foo", 0),
+new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
 FetchSessionHandler.FetchRequestData data = builder.build();
-assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
 data.toSend(), data.sessionPartitions());
 assertTrue(data.metadata().isFull());
 assertTrue(data.canUseTopicIds());
 
 FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
+respMap(new RespEntry("foo", 0, fooId, 10, 20)));
 handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
 // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
 FetchSessionHandler.Builder builder2 = handler.newBuilder();
-builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
-new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+builder2.add(new TopicPartition("foo", partition),
+new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 
210, Optional.empty()));
 FetchSessionHandler.FetchRequestData data2 = builder2.build();
-// Should have the same session ID and next epoch, but can no 
longer use topic IDs.
-// The receiving broker will close the session if we were 
previously using topic IDs.
+// Should have the same session ID, and next epoch and can no 
longer use topic IDs.
+// The receiving broker will handle closing the session.
 assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
 assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
 assertFalse(data2.canUseTopicIds());
 });
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testTopicIdReplaced(boolean fetchRequestUsesIds) {
+TopicPartition tp = new TopicPartition("foo", 0);
+FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+FetchSessionHandler.Builder builder = handler.newBuilder();
+Uuid topicId1 = Uuid.randomUuid();
+builder.add(tp,
+new FetchRequest.PartitionData(topicId1, 0, 100, 200, 
Optional.empty()));
+FetchSessionHandler.FetchRequestData data = builder.build();
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)),
+data.toSend(), data.sessionPartitions());
+assertTrue(data.metadata().isFull());
+assertTrue(data.canUseTopicIds());
+
+FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
+respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20)));
+handler.handleResponse(resp, (short) 12);
+
+// Try to add a new topic ID.
+FetchSessionHandler.Builder builder2 = handler.newBuilder();
+Uuid topicId2 = Uuid.randomUuid();
+// Use the same data besides the topic ID.
+FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty());
+builder2.add(tp, partitionData);
+FetchSessionHandler.FetchRequestData data2 = builder2.build();
+// The old topic ID partition should be in toReplace, and the new one 
should be in toSend.
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)),
+data2.toSend(), data2.sessionPartitions());
+assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, 
tp)), data2.toReplace());

Review comment:
   Or did you mean just adding something like 
   `assertEquals(fetchRequestUsesIds, data2.toReplace().size() > 0);`




-- 
This is an 

[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743355748



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -708,40 +701,41 @@ class KafkaApis(val requestChannel: RequestChannel,
   None
 }
 
-val erroneous = mutable.ArrayBuffer[(TopicPartition, 
FetchResponseData.PartitionData)]()
-val interesting = mutable.ArrayBuffer[(TopicPartition, 
FetchRequest.PartitionData)]()
-val sessionTopicIds = mutable.Map[String, Uuid]()
+val erroneous = mutable.ArrayBuffer[(TopicIdPartition, 
FetchResponseData.PartitionData)]()
+val interesting = mutable.ArrayBuffer[(TopicIdPartition, 
FetchRequest.PartitionData)]()
 if (fetchRequest.isFromFollower) {
   // The follower must have ClusterAction on ClusterResource in order to 
fetch partition data.
   if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, 
CLUSTER_NAME)) {
-fetchContext.foreachPartition { (topicPartition, topicId, data) =>
-  sessionTopicIds.put(topicPartition.topic(), topicId)
-  if (!metadataCache.contains(topicPartition))
-erroneous += topicPartition -> 
FetchResponse.partitionResponse(topicPartition.partition, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+fetchContext.foreachPartition { (topicIdPartition, data) =>
+  if (topicIdPartition.topicPartition.topic == null )
+erroneous += topicIdPartition -> 
FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)
+  else if (!metadataCache.contains(topicIdPartition.topicPartition))
+erroneous += topicIdPartition -> 
FetchResponse.partitionResponse(topicIdPartition, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
   else
-interesting += (topicPartition -> data)
+interesting += (topicIdPartition -> data)
 }
   } else {
-fetchContext.foreachPartition { (part, topicId, _) =>
-  sessionTopicIds.put(part.topic(), topicId)
-  erroneous += part -> FetchResponse.partitionResponse(part.partition, 
Errors.TOPIC_AUTHORIZATION_FAILED)
+fetchContext.foreachPartition { (topicIdPartition, _) =>
+  erroneous += topicIdPartition -> 
FetchResponse.partitionResponse(topicIdPartition, 
Errors.TOPIC_AUTHORIZATION_FAILED)

Review comment:
   I think we would want to keep the authorization error. Since it just 
logs a message. The UNKNOWN_TOPIC_ID error would request a metadata update 
which doesn't make sense when there is an authorization error.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743354926



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -163,18 +173,35 @@ class CachedPartition(val topic: String,
 mustRespond
   }
 
-  override def hashCode: Int = Objects.hash(new TopicPartition(topic, 
partition), topicId)
+  /**
+   * We have different equality checks depending on whether topic IDs are used.
+   * This means we need a different hash function as well. We use name to 
calculate the hash if the ID is zero and unused.
+   * Otherwise, we use the topic ID in the hash calculation.
+   *
+   * @return the hash code for the CachedPartition depending on what request 
version we are using.
+   */
+  override def hashCode: Int = if (topicId != Uuid.ZERO_UUID) (31 * partition) 
+ topicId.hashCode else
+(31 * partition) + topic.hashCode
 
   def canEqual(that: Any): Boolean = that.isInstanceOf[CachedPartition]
 
+  /**
+   * We have different equality checks depending on whether topic IDs are used.
+   *
+   * This is because when we use topic IDs, a partition with a given ID and an 
unknown name is the same as a partition with that
+   * ID and a known name. This means we can only use topic ID and partition 
when determining equality.
+   *
+   * On the other hand, if we are using topic names, all IDs are zero. This 
means we can only use topic name and partition
+   * when determining equality.
+   */
   override def equals(that: Any): Boolean =
 that match {
   case that: CachedPartition =>
 this.eq(that) ||
   (that.canEqual(this) &&

Review comment:
   Hmm. I'm not quite sure why this would not make sense. I believe it is 
checking the types are correct.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743351017



##
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##
@@ -1361,102 +1542,113 @@ class FetchSessionTest {
 val resp4 = context2.updateAndGenerateResponseData(respData)
 assertEquals(Errors.NONE, resp4.error)
 assertEquals(resp1.sessionId, resp4.sessionId)
-assertEquals(Utils.mkSet(tp1, tp2), resp4.responseData(topicNames, 
request2.version).keySet)
+assertEquals(Utils.mkSet(tp1.topicPartition, tp2.topicPartition), 
resp4.responseData(topicNames, request2.version).keySet)
   }
 
   @Test
   def testDeprioritizesPartitionsWithRecordsOnly(): Unit = {
 val time = new MockTime()
 val cache = new FetchSessionCache(10, 1000)
 val fetchManager = new FetchManager(time, cache)
-val tp1 = new TopicPartition("foo", 1)
-val tp2 = new TopicPartition("bar", 2)
-val tp3 = new TopicPartition("zar", 3)
 val topicIds = Map("foo" -> Uuid.randomUuid(), "bar" -> Uuid.randomUuid(), 
"zar" -> Uuid.randomUuid()).asJava
 val topicNames = topicIds.asScala.map(_.swap).asJava
+val tp1 = new TopicIdPartition(topicIds.get("foo"), new 
TopicPartition("foo", 1))
+val tp2 = new TopicIdPartition(topicIds.get("bar"), new 
TopicPartition("bar", 2))
+val tp3 = new TopicIdPartition(topicIds.get("zar"), new 
TopicPartition("zar", 3))
 
-val reqData = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
-reqData.put(tp1, new FetchRequest.PartitionData(100, 0, 1000, 
Optional.of(5), Optional.of(4)))
-reqData.put(tp2, new FetchRequest.PartitionData(100, 0, 1000, 
Optional.of(5), Optional.of(4)))
-reqData.put(tp3, new FetchRequest.PartitionData(100, 0, 1000, 
Optional.of(5), Optional.of(4)))
+val reqData = new util.LinkedHashMap[TopicIdPartition, 
FetchRequest.PartitionData]
+reqData.put(tp1, new FetchRequest.PartitionData(topicIds.get("foo"), 100, 
0, 1000, Optional.of(5), Optional.of(4)))
+reqData.put(tp2, new FetchRequest.PartitionData(topicIds.get("bar"), 100, 
0, 1000, Optional.of(5), Optional.of(4)))
+reqData.put(tp3, new FetchRequest.PartitionData(topicIds.get("zar"), 100, 
0, 1000, Optional.of(5), Optional.of(4)))
 
 // Full fetch context returns all partitions in the response
 val context1 = fetchManager.newContext(ApiKeys.FETCH.latestVersion(), 
JFetchMetadata.INITIAL, false,
- reqData, Collections.emptyList(), topicIds)
+ reqData, Collections.emptyList(), topicNames)
 assertEquals(classOf[FullFetchContext], context1.getClass)
 
-val respData1 = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
+val respData1 = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData]
 respData1.put(tp1, new FetchResponseData.PartitionData()
-  .setPartitionIndex(tp1.partition)
+  .setPartitionIndex(tp1.topicPartition.partition)
   .setHighWatermark(50)
   .setLastStableOffset(50)
   .setLogStartOffset(0))
 respData1.put(tp2, new FetchResponseData.PartitionData()
-  .setPartitionIndex(tp2.partition)
+  .setPartitionIndex(tp2.topicPartition.partition)
   .setHighWatermark(50)
   .setLastStableOffset(50)
   .setLogStartOffset(0))
 respData1.put(tp3, new FetchResponseData.PartitionData()
-  .setPartitionIndex(tp3.partition)
+  .setPartitionIndex(tp3.topicPartition.partition)
   .setHighWatermark(50)
   .setLastStableOffset(50)
   .setLogStartOffset(0))
 
 val resp1 = context1.updateAndGenerateResponseData(respData1)
 assertEquals(Errors.NONE, resp1.error)
 assertNotEquals(INVALID_SESSION_ID, resp1.sessionId)
-assertEquals(Utils.mkSet(tp1, tp2, tp3), resp1.responseData(topicNames, 
ApiKeys.FETCH.latestVersion()).keySet())
+assertEquals(Utils.mkSet(tp1.topicPartition, tp2.topicPartition, 
tp3.topicPartition), resp1.responseData(topicNames, 
ApiKeys.FETCH.latestVersion()).keySet())
 
 // Incremental fetch context returns partitions with changes but only 
deprioritizes
 // the partitions with records
 val context2 = fetchManager.newContext(ApiKeys.FETCH.latestVersion(), new 
JFetchMetadata(resp1.sessionId, 1), false,
-  reqData, Collections.emptyList(), topicIds)
+  reqData, Collections.emptyList(), topicNames)
 assertEquals(classOf[IncrementalFetchContext], context2.getClass)
 
 // Partitions are ordered in the session as per last response
 assertPartitionsOrder(context2, Seq(tp1, tp2, tp3))
 
 // Response is empty
-val respData2 = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
+val respData2 = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData]
 val resp2 = context2.updateAndGenerateResponseData(respData2)
 assertEquals(Errors.NONE, resp2.error)
 assertEquals(resp1.sessionId, resp2.sessionId)
 

[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743350813



##
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##
@@ -3530,37 +3534,37 @@ class KafkaApisTest {
   def testSizeOfThrottledPartitions(): Unit = {
 val topicNames = new util.HashMap[Uuid, String]
 val topicIds = new util.HashMap[String, Uuid]()
-def fetchResponse(data: Map[TopicPartition, String]): FetchResponse = {
-  val responseData = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData](
+def fetchResponse(data: Map[TopicIdPartition, String]): FetchResponse = {
+  val responseData = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData](
 data.map { case (tp, raw) =>
   tp -> new FetchResponseData.PartitionData()
-.setPartitionIndex(tp.partition)
+.setPartitionIndex(tp.topicPartition.partition)
 .setHighWatermark(105)
 .setLastStableOffset(105)
 .setLogStartOffset(0)
 .setRecords(MemoryRecords.withRecords(CompressionType.NONE, new 
SimpleRecord(100, raw.getBytes(StandardCharsets.UTF_8
   }.toMap.asJava)
 
   data.foreach{case (tp, _) =>
-val id = Uuid.randomUuid()
-topicIds.put(tp.topic(), id)
-topicNames.put(id, tp.topic())
+topicIds.put(tp.topicPartition.topic, tp.topicId)
+topicNames.put(tp.topicId, tp.topicPartition.topic)
   }
-  FetchResponse.of(Errors.NONE, 100, 100, responseData, topicIds)
+  FetchResponse.of(Errors.NONE, 100, 100, responseData)
 }
 
-val throttledPartition = new TopicPartition("throttledData", 0)
+val throttledPartition = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("throttledData", 0))
 val throttledData = Map(throttledPartition -> "throttledData")
 val expectedSize = 
FetchResponse.sizeOf(FetchResponseData.HIGHEST_SUPPORTED_VERSION,
-  fetchResponse(throttledData).responseData(topicNames, 
FetchResponseData.HIGHEST_SUPPORTED_VERSION).entrySet.iterator, topicIds)
+  fetchResponse(throttledData).responseData(topicNames, 
FetchResponseData.HIGHEST_SUPPORTED_VERSION).entrySet.asScala.map( entry =>
+  (new TopicIdPartition(Uuid.ZERO_UUID, entry.getKey), 
entry.getValue)).toMap.asJava.entrySet.iterator)
 
-val response = fetchResponse(throttledData ++ Map(new 
TopicPartition("nonThrottledData", 0) -> "nonThrottledData"))
+val response = fetchResponse(throttledData ++ Map(new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("nonThrottledData", 0)) 
-> "nonThrottledData"))
 
 val quota = Mockito.mock(classOf[ReplicationQuotaManager])
 
Mockito.when(quota.isThrottled(ArgumentMatchers.any(classOf[TopicPartition])))
-  .thenAnswer(invocation => throttledPartition == 
invocation.getArgument(0).asInstanceOf[TopicPartition])
+  .thenAnswer(invocation => throttledPartition.topicPartition == 
invocation.getArgument(0).asInstanceOf[TopicPartition])
 
-assertEquals(expectedSize, 
KafkaApis.sizeOfThrottledPartitions(FetchResponseData.HIGHEST_SUPPORTED_VERSION,
 response, quota, topicIds))
+assertEquals(expectedSize, 
KafkaApis.sizeOfThrottledPartitions(FetchResponseData.HIGHEST_SUPPORTED_VERSION,
 response, quota))
   }
 
   @Test

Review comment:
   What logic are we thinking? Checking that the unresolved topics are 
handled correctly?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743349370



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
##
@@ -4814,6 +4842,7 @@ private void buildDependencies(MetricConfig metricConfig,
 metadata = new ConsumerMetadata(0, metadataExpireMs, false, false,
 subscriptions, logContext, new ClusterResourceListeners());
 client = new MockClient(time, metadata);
+client.setNodeApiVersions(NodeApiVersions.create());
 metrics = new Metrics(metricConfig, time);
 consumerClient = new ConsumerNetworkClient(logContext, client, 
metadata, time,
 100, 1000, Integer.MAX_VALUE);

Review comment:
   These tests changed from returning a top level error to partition level 
error.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743349235



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
##
@@ -4814,6 +4842,7 @@ private void buildDependencies(MetricConfig metricConfig,
 metadata = new ConsumerMetadata(0, metadataExpireMs, false, false,
 subscriptions, logContext, new ClusterResourceListeners());
 client = new MockClient(time, metadata);
+client.setNodeApiVersions(NodeApiVersions.create());
 metrics = new Metrics(metricConfig, time);
 consumerClient = new ConsumerNetworkClient(logContext, client, 
metadata, time,
 100, 1000, Integer.MAX_VALUE);

Review comment:
   Are you referring to how we changed UNKNOWN_TOPIC_ID and 
INCONSISTENT_TOPIC_ID?
   
   For these cases we have testFetchInconsistentTopicId and 
testFetchUnknownTopicId which check that we update the metadata for a partition 
level error.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on pull request #11461: KAFKA-13422: Add verification of duplicate configuration for each type of LoginModule in JaasConfigFile

2021-11-04 Thread GitBox


RivenSun2 commented on pull request #11461:
URL: https://github.com/apache/kafka/pull/11461#issuecomment-961575399


   @guozhangwang @ijuma 
   please help check this PR when available. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743243975



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -374,7 +374,7 @@ abstract class AbstractFetcherThread(name: String,
   }
 }
   } catch {
-case ime@( _: CorruptRecordException | _: 
InvalidRecordException) =>
+case ime@(_: CorruptRecordException | _: 
InvalidRecordException) =>

Review comment:
   > append when the controller fails over to an older IBP during an 
upgrade.
   
   I think I'm misunderstanding something here. Did you mean to say append? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743243975



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -374,7 +374,7 @@ abstract class AbstractFetcherThread(name: String,
   }
 }
   } catch {
-case ime@( _: CorruptRecordException | _: 
InvalidRecordException) =>
+case ime@(_: CorruptRecordException | _: 
InvalidRecordException) =>

Review comment:
   > append when the controller fails over to an older IBP during an 
upgrade.
   I think I'm misunderstanding something here. Did you mean to say append? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13375) Kafka streams apps w/EOS unable to start at InitProducerId

2021-11-04 Thread Lerh Chuan Low (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17438957#comment-17438957
 ] 

Lerh Chuan Low commented on KAFKA-13375:


[~guozhang] Thank you for reading through the ticket! I hadn't seen that KIP 
yet and didn't realise this was a known issue, I'll digest. 

> Kafka streams apps w/EOS unable to start at InitProducerId
> --
>
> Key: KAFKA-13375
> URL: https://issues.apache.org/jira/browse/KAFKA-13375
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.0
>Reporter: Lerh Chuan Low
>Priority: Major
>
> Hello, I'm wondering if this is a Kafka bug. Our environment setup is as 
> follows:
> Kafka streams 2.8 - with *EXACTLY_ONCE* turned on (Not *EOS_BETA*, but I 
> don't think the changes introduced in EOS beta should affect this). 
> Transaction timeout = 60s.
>  Kafka broker 2.8. 
> We have this situation where we were doing a rolling restart of the broker to 
> apply some security changes. After we finished, 4 out of some 15 Stream Apps 
> are unable to start. They can never succeed, no matter what we do. 
> They fail with the error:
> {code:java}
>  2021-10-14 07:20:13,548 WARN 
> [srn-rec-feeder-802c18a1-9512-4a2a-8c2e-00e37550199d-StreamThread-3] 
> o.a.k.s.p.i.StreamsProducer stream-thread 
> [srn-rec-feeder-802c18a1-9512-4a2a-8c2e-00e37550199d-StreamThread-3] task 
> [0_6] Timeout exception caught trying to initialize transactions. The broker 
> is either slow or in bad state (like not having enough replicas) in 
> responding to the request, or the connection to broker was interrupted 
> sending the request or receiving the response. Will retry initializing the 
> task in the next loop. Consider overwriting max.block.ms to a larger value to 
> avoid timeout errors{code}
> We found a previous Jira describing the issue here: 
> https://issues.apache.org/jira/browse/KAFKA-8803. It seems like back then 
> what people did was to rolling restart the brokers. We tried that - we 
> targeted the group coordinators for our failing apps, then transaction 
> coordinators, then all of them. It hasn't resolved our issue so far. 
> A few interesting things we've found so far:
>  - What I can see is that all the failing apps only fail on certain 
> partitions. E.g for the app above, only partition 6 never succeeds. Partition 
> 6 shares the same coordinator as some of the other partitions and those work, 
> so it seems like the issue isn't related to broker memory state. 
>  - All the failing apps have a message similar to this 
> {code:java}
> [2021-10-14 00:54:51,569] INFO [Transaction Marker Request Completion Handler 
> 103]: Sending srn-rec-feeder-0_6's transaction marker for partition 
> srn-bot-003-14 has permanently failed with error 
> org.apache.kafka.common.errors.InvalidProducerEpochException with the current 
> coordinator epoch 143; cancel sending any more transaction markers 
> TxnMarkerEntry{producerId=7001, producerEpoch=610, coordinatorEpoch=143, 
> result=ABORT, partitions=[srn-bot-003-14]} to the brokers 
> (kafka.coordinator.transaction.TransactionMarkerRequestCompletionHandler) 
> {code}
> While we were restarting the brokers. They all failed shortly after. No other 
> consumer groups for the other working partitions/working stream apps logged 
> this message. 
> On digging around in git blame and reading through the source, it looks like 
> this is meant to be benign. 
>  - We tried DEBUG logging for the TransactionCoordinator and 
> TransactionStateManager. We can see (assigner is a functioning app)
> {code:java}
> [2021-10-14 06:48:23,813] DEBUG [TransactionCoordinator id=105] Returning 
> CONCURRENT_TRANSACTIONS error code to client for srn-assigner-0_14's 
> AddPartitions request (kafka.coordinator.transaction.TransactionCoordinator) 
> {code}
> I've seen those before during steady state. I do believe they are benign. We 
> never see it for the problematic partitions/consumer groups for some reason.
>  - We tried turning on TRACE for KafkaApis. We can see
> {code:java}
> [2021-10-14 06:56:58,408] TRACE [KafkaApi-105] Completed srn-rec-feeder-0_6's 
> InitProducerIdRequest with result 
> InitProducerIdResult(-1,-1,CONCURRENT_TRANSACTIONS) from client 
> srn-rec-feeder-802c18a1-9512-4a2a-8c2e-00e37550199d-StreamThread-4-0_6-producer.
>  (kafka.server.KafkaApis) {code}
> It starts to make me wonder if there's a situation where Kafka is unable to 
> abort the transactions if there is never any success in initializing a 
> producer ID. But this is diving deep into insider knowledge territory that I 
> simply don't have. I'm wondering if anyone with more knowledge of how 
> transactions work can shed some light here if we are in the wrong path, or if 
> there's any way to restore operations at all short of a 

[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743243111



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##
@@ -2659,6 +2661,9 @@ private FetchResponse fetchResponse(TopicPartition 
partition, long fetchOffset,
 autoCommitIntervalMs,
 interceptors,
 throwOnStableOffsetNotSupported);
+ApiVersions apiVersions = new ApiVersions();
+metadata.fetch().nodes().forEach(node ->
+apiVersions.update(node.idString(), NodeApiVersions.create()));

Review comment:
   Nope. Looks like another change I forgot to cleanup.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743242849



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##
@@ -2069,10 +2071,10 @@ public void testReturnRecordsDuringRebalance() throws 
InterruptedException {
 ConsumerMetadata metadata = createMetadata(subscription);
 MockClient client = new MockClient(time, metadata);
 ConsumerPartitionAssignor assignor = new CooperativeStickyAssignor();
-KafkaConsumer consumer = newConsumer(time, client, 
subscription, metadata, assignor, true, groupInstanceId);
-
 initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1), 
Utils.mkEntry(topic2, 1), Utils.mkEntry(topic3, 1)));
 
+KafkaConsumer consumer = newConsumer(time, client, 
subscription, metadata, assignor, true, groupInstanceId);

Review comment:
   It likely had something to do with how the mock client was handling 
metadata. But that may have been for the older version where we checked 
NodeApiVersion. I can try to switch it back.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743242346



##
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##
@@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() {
 List partitions = Arrays.asList(0, 1);
 partitions.forEach(partition -> {
 String testType = partition == 0 ? "updating a partition" : 
"adding a new partition";
-Map topicIds = Collections.singletonMap("foo", 
Uuid.randomUuid());
+Uuid fooId = Uuid.randomUuid();
 FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
 FetchSessionHandler.Builder builder = handler.newBuilder();
-builder.add(new TopicPartition("foo", 0),  topicIds.get("foo"),
-new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
+builder.add(new TopicPartition("foo", 0),
+new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
 FetchSessionHandler.FetchRequestData data = builder.build();
-assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
 data.toSend(), data.sessionPartitions());
 assertTrue(data.metadata().isFull());
 assertTrue(data.canUseTopicIds());
 
 FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
+respMap(new RespEntry("foo", 0, fooId, 10, 20)));
 handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
 // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
 FetchSessionHandler.Builder builder2 = handler.newBuilder();
-builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
-new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+builder2.add(new TopicPartition("foo", partition),
+new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 
210, Optional.empty()));
 FetchSessionHandler.FetchRequestData data2 = builder2.build();
-// Should have the same session ID and next epoch, but can no 
longer use topic IDs.
-// The receiving broker will close the session if we were 
previously using topic IDs.
+// Should have the same session ID, and next epoch and can no 
longer use topic IDs.
+// The receiving broker will handle closing the session.
 assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
 assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
 assertFalse(data2.canUseTopicIds());
 });
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testTopicIdReplaced(boolean fetchRequestUsesIds) {
+TopicPartition tp = new TopicPartition("foo", 0);
+FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+FetchSessionHandler.Builder builder = handler.newBuilder();
+Uuid topicId1 = Uuid.randomUuid();
+builder.add(tp,
+new FetchRequest.PartitionData(topicId1, 0, 100, 200, 
Optional.empty()));
+FetchSessionHandler.FetchRequestData data = builder.build();
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)),
+data.toSend(), data.sessionPartitions());
+assertTrue(data.metadata().isFull());
+assertTrue(data.canUseTopicIds());
+
+FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
+respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20)));
+handler.handleResponse(resp, (short) 12);
+
+// Try to add a new topic ID.
+FetchSessionHandler.Builder builder2 = handler.newBuilder();
+Uuid topicId2 = Uuid.randomUuid();
+// Use the same data besides the topic ID.
+FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty());
+builder2.add(tp, partitionData);
+FetchSessionHandler.FetchRequestData data2 = builder2.build();
+// The old topic ID partition should be in toReplace, and the new one 
should be in toSend.
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)),
+data2.toSend(), data2.sessionPartitions());
+assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, 
tp)), data2.toReplace());
+// Should have the same session ID, and next epoch and can use topic 
IDs.
+assertEquals(123, data2.metadata().sessionId(), "Did not use same 

[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743241513



##
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##
@@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() {
 List partitions = Arrays.asList(0, 1);
 partitions.forEach(partition -> {
 String testType = partition == 0 ? "updating a partition" : 
"adding a new partition";
-Map topicIds = Collections.singletonMap("foo", 
Uuid.randomUuid());
+Uuid fooId = Uuid.randomUuid();
 FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
 FetchSessionHandler.Builder builder = handler.newBuilder();
-builder.add(new TopicPartition("foo", 0),  topicIds.get("foo"),
-new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
+builder.add(new TopicPartition("foo", 0),
+new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
 FetchSessionHandler.FetchRequestData data = builder.build();
-assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
 data.toSend(), data.sessionPartitions());
 assertTrue(data.metadata().isFull());
 assertTrue(data.canUseTopicIds());
 
 FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
+respMap(new RespEntry("foo", 0, fooId, 10, 20)));
 handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
 // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
 FetchSessionHandler.Builder builder2 = handler.newBuilder();
-builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
-new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+builder2.add(new TopicPartition("foo", partition),
+new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 
210, Optional.empty()));
 FetchSessionHandler.FetchRequestData data2 = builder2.build();
-// Should have the same session ID and next epoch, but can no 
longer use topic IDs.
-// The receiving broker will close the session if we were 
previously using topic IDs.
+// Should have the same session ID, and next epoch and can no 
longer use topic IDs.
+// The receiving broker will handle closing the session.
 assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
 assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
 assertFalse(data2.canUseTopicIds());
 });
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testTopicIdReplaced(boolean fetchRequestUsesIds) {
+TopicPartition tp = new TopicPartition("foo", 0);
+FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+FetchSessionHandler.Builder builder = handler.newBuilder();
+Uuid topicId1 = Uuid.randomUuid();
+builder.add(tp,
+new FetchRequest.PartitionData(topicId1, 0, 100, 200, 
Optional.empty()));
+FetchSessionHandler.FetchRequestData data = builder.build();
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)),
+data.toSend(), data.sessionPartitions());
+assertTrue(data.metadata().isFull());
+assertTrue(data.canUseTopicIds());
+
+FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
+respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20)));
+handler.handleResponse(resp, (short) 12);
+
+// Try to add a new topic ID.
+FetchSessionHandler.Builder builder2 = handler.newBuilder();
+Uuid topicId2 = Uuid.randomUuid();
+// Use the same data besides the topic ID.
+FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty());
+builder2.add(tp, partitionData);
+FetchSessionHandler.FetchRequestData data2 = builder2.build();
+// The old topic ID partition should be in toReplace, and the new one 
should be in toSend.
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)),
+data2.toSend(), data2.sessionPartitions());
+assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, 
tp)), data2.toReplace());
+// Should have the same session ID, and next epoch and can use topic 
IDs.
+assertEquals(123, data2.metadata().sessionId(), "Did not use same 

[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743241204



##
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##
@@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() {
 List partitions = Arrays.asList(0, 1);
 partitions.forEach(partition -> {
 String testType = partition == 0 ? "updating a partition" : 
"adding a new partition";
-Map topicIds = Collections.singletonMap("foo", 
Uuid.randomUuid());
+Uuid fooId = Uuid.randomUuid();
 FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
 FetchSessionHandler.Builder builder = handler.newBuilder();
-builder.add(new TopicPartition("foo", 0),  topicIds.get("foo"),
-new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
+builder.add(new TopicPartition("foo", 0),
+new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
 FetchSessionHandler.FetchRequestData data = builder.build();
-assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
 data.toSend(), data.sessionPartitions());
 assertTrue(data.metadata().isFull());
 assertTrue(data.canUseTopicIds());
 
 FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
+respMap(new RespEntry("foo", 0, fooId, 10, 20)));
 handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
 // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
 FetchSessionHandler.Builder builder2 = handler.newBuilder();
-builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
-new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+builder2.add(new TopicPartition("foo", partition),
+new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 
210, Optional.empty()));
 FetchSessionHandler.FetchRequestData data2 = builder2.build();
-// Should have the same session ID and next epoch, but can no 
longer use topic IDs.
-// The receiving broker will close the session if we were 
previously using topic IDs.
+// Should have the same session ID, and next epoch and can no 
longer use topic IDs.
+// The receiving broker will handle closing the session.
 assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
 assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
 assertFalse(data2.canUseTopicIds());
 });
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testTopicIdReplaced(boolean fetchRequestUsesIds) {
+TopicPartition tp = new TopicPartition("foo", 0);
+FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+FetchSessionHandler.Builder builder = handler.newBuilder();
+Uuid topicId1 = Uuid.randomUuid();
+builder.add(tp,
+new FetchRequest.PartitionData(topicId1, 0, 100, 200, 
Optional.empty()));
+FetchSessionHandler.FetchRequestData data = builder.build();
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)),
+data.toSend(), data.sessionPartitions());
+assertTrue(data.metadata().isFull());
+assertTrue(data.canUseTopicIds());
+
+FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
+respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20)));
+handler.handleResponse(resp, (short) 12);
+
+// Try to add a new topic ID.
+FetchSessionHandler.Builder builder2 = handler.newBuilder();
+Uuid topicId2 = Uuid.randomUuid();
+// Use the same data besides the topic ID.
+FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty());
+builder2.add(tp, partitionData);
+FetchSessionHandler.FetchRequestData data2 = builder2.build();
+// The old topic ID partition should be in toReplace, and the new one 
should be in toSend.
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)),
+data2.toSend(), data2.sessionPartitions());
+assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, 
tp)), data2.toReplace());

Review comment:
   I could theoretically check replace in the other test that checks 
multiple scenarios




-- 
This is an automated message from the 

[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743240526



##
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##
@@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() {
 List partitions = Arrays.asList(0, 1);
 partitions.forEach(partition -> {
 String testType = partition == 0 ? "updating a partition" : 
"adding a new partition";
-Map topicIds = Collections.singletonMap("foo", 
Uuid.randomUuid());
+Uuid fooId = Uuid.randomUuid();
 FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
 FetchSessionHandler.Builder builder = handler.newBuilder();
-builder.add(new TopicPartition("foo", 0),  topicIds.get("foo"),
-new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
+builder.add(new TopicPartition("foo", 0),
+new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
 FetchSessionHandler.FetchRequestData data = builder.build();
-assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
 data.toSend(), data.sessionPartitions());
 assertTrue(data.metadata().isFull());
 assertTrue(data.canUseTopicIds());
 
 FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
+respMap(new RespEntry("foo", 0, fooId, 10, 20)));
 handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
 // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
 FetchSessionHandler.Builder builder2 = handler.newBuilder();
-builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
-new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+builder2.add(new TopicPartition("foo", partition),
+new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 
210, Optional.empty()));
 FetchSessionHandler.FetchRequestData data2 = builder2.build();
-// Should have the same session ID and next epoch, but can no 
longer use topic IDs.
-// The receiving broker will close the session if we were 
previously using topic IDs.
+// Should have the same session ID, and next epoch and can no 
longer use topic IDs.
+// The receiving broker will handle closing the session.
 assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
 assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
 assertFalse(data2.canUseTopicIds());
 });
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testTopicIdReplaced(boolean fetchRequestUsesIds) {
+TopicPartition tp = new TopicPartition("foo", 0);
+FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+FetchSessionHandler.Builder builder = handler.newBuilder();
+Uuid topicId1 = Uuid.randomUuid();
+builder.add(tp,
+new FetchRequest.PartitionData(topicId1, 0, 100, 200, 
Optional.empty()));
+FetchSessionHandler.FetchRequestData data = builder.build();
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)),
+data.toSend(), data.sessionPartitions());
+assertTrue(data.metadata().isFull());
+assertTrue(data.canUseTopicIds());
+
+FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
+respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20)));
+handler.handleResponse(resp, (short) 12);
+
+// Try to add a new topic ID.
+FetchSessionHandler.Builder builder2 = handler.newBuilder();
+Uuid topicId2 = Uuid.randomUuid();
+// Use the same data besides the topic ID.
+FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty());
+builder2.add(tp, partitionData);
+FetchSessionHandler.FetchRequestData data2 = builder2.build();
+// The old topic ID partition should be in toReplace, and the new one 
should be in toSend.
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)),
+data2.toSend(), data2.sessionPartitions());
+assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, 
tp)), data2.toReplace());

Review comment:
   To clarify -- are you referring to a case where we upgraded? ie, it 
started with no ID in the first request and added one in the 

[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743240074



##
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##
@@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() {
 List partitions = Arrays.asList(0, 1);
 partitions.forEach(partition -> {
 String testType = partition == 0 ? "updating a partition" : 
"adding a new partition";
-Map topicIds = Collections.singletonMap("foo", 
Uuid.randomUuid());
+Uuid fooId = Uuid.randomUuid();
 FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
 FetchSessionHandler.Builder builder = handler.newBuilder();
-builder.add(new TopicPartition("foo", 0),  topicIds.get("foo"),
-new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
+builder.add(new TopicPartition("foo", 0),
+new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
 FetchSessionHandler.FetchRequestData data = builder.build();
-assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
 data.toSend(), data.sessionPartitions());
 assertTrue(data.metadata().isFull());
 assertTrue(data.canUseTopicIds());
 
 FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
+respMap(new RespEntry("foo", 0, fooId, 10, 20)));
 handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
 // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
 FetchSessionHandler.Builder builder2 = handler.newBuilder();
-builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
-new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+builder2.add(new TopicPartition("foo", partition),
+new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 
210, Optional.empty()));
 FetchSessionHandler.FetchRequestData data2 = builder2.build();
-// Should have the same session ID and next epoch, but can no 
longer use topic IDs.
-// The receiving broker will close the session if we were 
previously using topic IDs.
+// Should have the same session ID, and next epoch and can no 
longer use topic IDs.
+// The receiving broker will handle closing the session.
 assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
 assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
 assertFalse(data2.canUseTopicIds());
 });
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testTopicIdReplaced(boolean fetchRequestUsesIds) {
+TopicPartition tp = new TopicPartition("foo", 0);
+FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+FetchSessionHandler.Builder builder = handler.newBuilder();
+Uuid topicId1 = Uuid.randomUuid();
+builder.add(tp,
+new FetchRequest.PartitionData(topicId1, 0, 100, 200, 
Optional.empty()));
+FetchSessionHandler.FetchRequestData data = builder.build();
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)),
+data.toSend(), data.sessionPartitions());
+assertTrue(data.metadata().isFull());
+assertTrue(data.canUseTopicIds());
+
+FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
+respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20)));
+handler.handleResponse(resp, (short) 12);

Review comment:
   ah good catch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743239514



##
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##
@@ -202,29 +203,30 @@ public void testSessionless() {
 FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
 FetchSessionHandler.Builder builder = handler.newBuilder();
 addTopicId(topicIds, topicNames, "foo", version);
-builder.add(new TopicPartition("foo", 0), 
topicIds.getOrDefault("foo", Uuid.ZERO_UUID),
-new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
-builder.add(new TopicPartition("foo", 1), 
topicIds.getOrDefault("foo", Uuid.ZERO_UUID),
-new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+Uuid fooId = topicIds.getOrDefault("foo", Uuid.ZERO_UUID);
+builder.add(new TopicPartition("foo", 0),
+new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));

Review comment:
   I moved some back.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11429: KAFKA-13396: allow create topic without partition/replicaFactor

2021-11-04 Thread GitBox


ijuma commented on pull request #11429:
URL: https://github.com/apache/kafka/pull/11429#issuecomment-961367197


   @dajac Makes sense, pushed to trunk, 3.1 and 3.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11467: MINOR: fix java doc in kafkaProducer

2021-11-04 Thread GitBox


dajac commented on pull request #11467:
URL: https://github.com/apache/kafka/pull/11467#issuecomment-961343975


   @showuon Thanks. I will take a look soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11429: KAFKA-13396: allow create topic without partition/replicaFactor

2021-11-04 Thread GitBox


dajac commented on pull request #11429:
URL: https://github.com/apache/kafka/pull/11429#issuecomment-961343546


   @ijuma Should we pick this one to the 3.1 branch as well? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac opened a new pull request #11468: WIP

2021-11-04 Thread GitBox


dajac opened a new pull request #11468:
URL: https://github.com/apache/kafka/pull/11468


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bdesert commented on a change in pull request #11401: KAFKA-13255: use exclude filter for new topics

2021-11-04 Thread GitBox


bdesert commented on a change in pull request #11401:
URL: https://github.com/apache/kafka/pull/11401#discussion_r743062367



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
##
@@ -152,6 +155,48 @@ public void testConfigPropertyFiltering() {
 .anyMatch(x -> x.name().equals("min.insync.replicas")), "should 
not replicate excluded properties");
 }
 
+@Test
+public void testNewTopicConfigs() throws Exception {
+Map filterConfig = new HashMap<>();
+
filterConfig.put(DefaultConfigPropertyFilter.CONFIG_PROPERTIES_EXCLUDE_CONFIG, 
"follower\\.replication\\.throttled\\.replicas, "
++ "leader\\.replication\\.throttled\\.replicas, "
++ "message\\.timestamp\\.difference\\.max\\.ms, "
++ "message\\.timestamp\\.type, "
++ "unclean\\.leader\\.election\\.enable, "
++ "min\\.insync\\.replicas,"
++ "exclude_param.*");
+DefaultConfigPropertyFilter filter = new DefaultConfigPropertyFilter();
+filter.configure(filterConfig);
+
+MirrorSourceConnector connector = spy(new MirrorSourceConnector(new 
SourceAndTarget("source", "target"),
+new DefaultReplicationPolicy(), x -> true, filter));
+
+final String topic = "testtopic";
+List entries = new ArrayList<>();
+entries.add(new ConfigEntry("name-1", "value-1"));
+entries.add(new ConfigEntry("exclude_param.param1", "value-param1"));
+entries.add(new ConfigEntry("min.insync.replicas", "2"));
+Config config = new Config(entries);
+doReturn(Collections.singletonMap(topic, 
config)).when(connector).describeTopicConfigs(any());
+doAnswer(invocation -> {
+Map newTopics = invocation.getArgument(0);
+assertNotNull(newTopics.get("source." + topic));
+Map targetConfig = newTopics.get("source." + 
topic).configs();
+
+// property 'name-1' isn't defined in the exclude filter -> should 
be replicated
+assertNotNull(targetConfig.get("name-1"), "should replicate 
properties");
+
+// this property is in default list, just double check it:
+String prop1 = "min.insync.replicas";
+assertNull(targetConfig.get(prop1), "should not replicate excluded 
properties " + prop1);
+// this property is only in exclude filter custom parameter, also 
tests regex on the way:
+String prop2 = "exclude_param.param1";
+assertNull(targetConfig.get(prop2), "should not replicate excluded 
properties " + prop2);
+return null;
+}).when(connector).createNewTopics(any());
+connector.createNewTopics(Collections.singleton(topic), 
Collections.singletonMap(topic, 1L));
+}

Review comment:
   agree. with a small correction of `createNewTopics(any(), any())`. since 
the `targetConfig()` is being called from overloaded method with two params.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


dajac commented on pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#issuecomment-961259072


   @jolshan It seems that there are a few compilation errors, at least for `JDK 
8 and Scala 2.12`. Could you check?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743046927



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##
@@ -66,16 +69,28 @@ public PartitionData(
 int maxBytes,
 Optional currentLeaderEpoch
 ) {
-this(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch, 
Optional.empty());
+this(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, 
currentLeaderEpoch, Optional.empty());

Review comment:
   Yeah, that's a good question. I guess that that constructor is 
convenient for tests but might be bug prone in the regular code. I am tempted 
to remove it entirely What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743046003



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -708,40 +701,41 @@ class KafkaApis(val requestChannel: RequestChannel,
   None
 }
 
-val erroneous = mutable.ArrayBuffer[(TopicPartition, 
FetchResponseData.PartitionData)]()
-val interesting = mutable.ArrayBuffer[(TopicPartition, 
FetchRequest.PartitionData)]()
-val sessionTopicIds = mutable.Map[String, Uuid]()
+val erroneous = mutable.ArrayBuffer[(TopicIdPartition, 
FetchResponseData.PartitionData)]()
+val interesting = mutable.ArrayBuffer[(TopicIdPartition, 
FetchRequest.PartitionData)]()
 if (fetchRequest.isFromFollower) {
   // The follower must have ClusterAction on ClusterResource in order to 
fetch partition data.
   if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, 
CLUSTER_NAME)) {
-fetchContext.foreachPartition { (topicPartition, topicId, data) =>
-  sessionTopicIds.put(topicPartition.topic(), topicId)
-  if (!metadataCache.contains(topicPartition))
-erroneous += topicPartition -> 
FetchResponse.partitionResponse(topicPartition.partition, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+fetchContext.foreachPartition { (topicIdPartition, data) =>
+  if (topicIdPartition.topicPartition.topic == null )
+erroneous += topicIdPartition -> 
FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)
+  else if (!metadataCache.contains(topicIdPartition.topicPartition))
+erroneous += topicIdPartition -> 
FetchResponse.partitionResponse(topicIdPartition, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
   else
-interesting += (topicPartition -> data)
+interesting += (topicIdPartition -> data)
 }
   } else {
-fetchContext.foreachPartition { (part, topicId, _) =>
-  sessionTopicIds.put(part.topic(), topicId)
-  erroneous += part -> FetchResponse.partitionResponse(part.partition, 
Errors.TOPIC_AUTHORIZATION_FAILED)
+fetchContext.foreachPartition { (topicIdPartition, _) =>
+  erroneous += topicIdPartition -> 
FetchResponse.partitionResponse(topicIdPartition, 
Errors.TOPIC_AUTHORIZATION_FAILED)

Review comment:
   I guess that it does not change much in the end. I was considering this 
in order to be consistent with how we handle this for the consumer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743044970



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -163,18 +173,35 @@ class CachedPartition(val topic: String,
 mustRespond
   }
 
-  override def hashCode: Int = Objects.hash(new TopicPartition(topic, 
partition), topicId)
+  /**
+   * We have different equality checks depending on whether topic IDs are used.
+   * This means we need a different hash function as well. We use name to 
calculate the hash if the ID is zero and unused.
+   * Otherwise, we use the topic ID in the hash calculation.
+   *
+   * @return the hash code for the CachedPartition depending on what request 
version we are using.
+   */
+  override def hashCode: Int = if (topicId != Uuid.ZERO_UUID) (31 * partition) 
+ topicId.hashCode else
+(31 * partition) + topic.hashCode
 
   def canEqual(that: Any): Boolean = that.isInstanceOf[CachedPartition]
 
+  /**
+   * We have different equality checks depending on whether topic IDs are used.
+   *
+   * This is because when we use topic IDs, a partition with a given ID and an 
unknown name is the same as a partition with that
+   * ID and a known name. This means we can only use topic ID and partition 
when determining equality.
+   *
+   * On the other hand, if we are using topic names, all IDs are zero. This 
means we can only use topic name and partition
+   * when determining equality.
+   */
   override def equals(that: Any): Boolean =
 that match {
   case that: CachedPartition =>
 this.eq(that) ||
   (that.canEqual(this) &&

Review comment:
   Right. It seems to be that the `canEqual(this)` does not make any sense 
here. Could you double check?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743044369



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -374,7 +374,7 @@ abstract class AbstractFetcherThread(name: String,
   }
 }
   } catch {
-case ime@( _: CorruptRecordException | _: 
InvalidRecordException) =>
+case ime@(_: CorruptRecordException | _: 
InvalidRecordException) =>

Review comment:
   I think that would for instance append when the controller fails over to 
an older IBP during an upgrade. This should remove the topic ids which means 
that v12 will be used for the next fetch request and trigger a 
FETCH_SESSION_TOPIC_ID_ERROR. In this particular case, re-trying directly would 
be the optimal way to proceed for a follower. I wonder if they are other cases 
to consider here.
   
   For the consumer, it is definitely different.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743041695



##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -285,52 +268,57 @@ public FetchRequestData build() {
 if (nextMetadata.isFull()) {
 if (log.isDebugEnabled()) {
 log.debug("Built full fetch {} for node {} with {}.",
-  nextMetadata, node, 
partitionsToLogString(next.keySet()));
+nextMetadata, node, 
topicPartitionsToLogString(next.keySet()));
 }
 sessionPartitions = next;
 next = null;
+Map toSend =
+Collections.unmodifiableMap(new 
LinkedHashMap<>(sessionPartitions));
 // Only add topic IDs to the session if we are using topic IDs.
 if (canUseTopicIds) {
-sessionTopicIds = topicIds;
-sessionTopicNames = new HashMap<>(topicIds.size());
-topicIds.forEach((name, id) -> sessionTopicNames.put(id, 
name));
+Map> newTopicNames = 
sessionPartitions.entrySet().stream().collect(Collectors.groupingByConcurrent(entry
 -> entry.getValue().topicId,
+Collectors.mapping(entry -> 
entry.getKey().topic(), Collectors.toSet(;

Review comment:
   I think that the grouping is slower because it has to allocate another 
Map, Sets for each Uuid, etc.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743011890



##
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##
@@ -1361,102 +1542,113 @@ class FetchSessionTest {
 val resp4 = context2.updateAndGenerateResponseData(respData)
 assertEquals(Errors.NONE, resp4.error)
 assertEquals(resp1.sessionId, resp4.sessionId)
-assertEquals(Utils.mkSet(tp1, tp2), resp4.responseData(topicNames, 
request2.version).keySet)
+assertEquals(Utils.mkSet(tp1.topicPartition, tp2.topicPartition), 
resp4.responseData(topicNames, request2.version).keySet)
   }
 
   @Test
   def testDeprioritizesPartitionsWithRecordsOnly(): Unit = {
 val time = new MockTime()
 val cache = new FetchSessionCache(10, 1000)
 val fetchManager = new FetchManager(time, cache)
-val tp1 = new TopicPartition("foo", 1)
-val tp2 = new TopicPartition("bar", 2)
-val tp3 = new TopicPartition("zar", 3)
 val topicIds = Map("foo" -> Uuid.randomUuid(), "bar" -> Uuid.randomUuid(), 
"zar" -> Uuid.randomUuid()).asJava
 val topicNames = topicIds.asScala.map(_.swap).asJava
+val tp1 = new TopicIdPartition(topicIds.get("foo"), new 
TopicPartition("foo", 1))
+val tp2 = new TopicIdPartition(topicIds.get("bar"), new 
TopicPartition("bar", 2))
+val tp3 = new TopicIdPartition(topicIds.get("zar"), new 
TopicPartition("zar", 3))
 
-val reqData = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
-reqData.put(tp1, new FetchRequest.PartitionData(100, 0, 1000, 
Optional.of(5), Optional.of(4)))
-reqData.put(tp2, new FetchRequest.PartitionData(100, 0, 1000, 
Optional.of(5), Optional.of(4)))
-reqData.put(tp3, new FetchRequest.PartitionData(100, 0, 1000, 
Optional.of(5), Optional.of(4)))
+val reqData = new util.LinkedHashMap[TopicIdPartition, 
FetchRequest.PartitionData]
+reqData.put(tp1, new FetchRequest.PartitionData(topicIds.get("foo"), 100, 
0, 1000, Optional.of(5), Optional.of(4)))
+reqData.put(tp2, new FetchRequest.PartitionData(topicIds.get("bar"), 100, 
0, 1000, Optional.of(5), Optional.of(4)))
+reqData.put(tp3, new FetchRequest.PartitionData(topicIds.get("zar"), 100, 
0, 1000, Optional.of(5), Optional.of(4)))
 
 // Full fetch context returns all partitions in the response
 val context1 = fetchManager.newContext(ApiKeys.FETCH.latestVersion(), 
JFetchMetadata.INITIAL, false,
- reqData, Collections.emptyList(), topicIds)
+ reqData, Collections.emptyList(), topicNames)
 assertEquals(classOf[FullFetchContext], context1.getClass)
 
-val respData1 = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
+val respData1 = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData]
 respData1.put(tp1, new FetchResponseData.PartitionData()
-  .setPartitionIndex(tp1.partition)
+  .setPartitionIndex(tp1.topicPartition.partition)
   .setHighWatermark(50)
   .setLastStableOffset(50)
   .setLogStartOffset(0))
 respData1.put(tp2, new FetchResponseData.PartitionData()
-  .setPartitionIndex(tp2.partition)
+  .setPartitionIndex(tp2.topicPartition.partition)
   .setHighWatermark(50)
   .setLastStableOffset(50)
   .setLogStartOffset(0))
 respData1.put(tp3, new FetchResponseData.PartitionData()
-  .setPartitionIndex(tp3.partition)
+  .setPartitionIndex(tp3.topicPartition.partition)
   .setHighWatermark(50)
   .setLastStableOffset(50)
   .setLogStartOffset(0))
 
 val resp1 = context1.updateAndGenerateResponseData(respData1)
 assertEquals(Errors.NONE, resp1.error)
 assertNotEquals(INVALID_SESSION_ID, resp1.sessionId)
-assertEquals(Utils.mkSet(tp1, tp2, tp3), resp1.responseData(topicNames, 
ApiKeys.FETCH.latestVersion()).keySet())
+assertEquals(Utils.mkSet(tp1.topicPartition, tp2.topicPartition, 
tp3.topicPartition), resp1.responseData(topicNames, 
ApiKeys.FETCH.latestVersion()).keySet())
 
 // Incremental fetch context returns partitions with changes but only 
deprioritizes
 // the partitions with records
 val context2 = fetchManager.newContext(ApiKeys.FETCH.latestVersion(), new 
JFetchMetadata(resp1.sessionId, 1), false,
-  reqData, Collections.emptyList(), topicIds)
+  reqData, Collections.emptyList(), topicNames)
 assertEquals(classOf[IncrementalFetchContext], context2.getClass)
 
 // Partitions are ordered in the session as per last response
 assertPartitionsOrder(context2, Seq(tp1, tp2, tp3))
 
 // Response is empty
-val respData2 = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
+val respData2 = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData]
 val resp2 = context2.updateAndGenerateResponseData(respData2)
 assertEquals(Errors.NONE, resp2.error)
 assertEquals(resp1.sessionId, resp2.sessionId)
 

[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-04 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r742933244



##
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##
@@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() {
 List partitions = Arrays.asList(0, 1);
 partitions.forEach(partition -> {
 String testType = partition == 0 ? "updating a partition" : 
"adding a new partition";
-Map topicIds = Collections.singletonMap("foo", 
Uuid.randomUuid());
+Uuid fooId = Uuid.randomUuid();
 FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
 FetchSessionHandler.Builder builder = handler.newBuilder();
-builder.add(new TopicPartition("foo", 0),  topicIds.get("foo"),
-new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
+builder.add(new TopicPartition("foo", 0),
+new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
 FetchSessionHandler.FetchRequestData data = builder.build();
-assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
 data.toSend(), data.sessionPartitions());
 assertTrue(data.metadata().isFull());
 assertTrue(data.canUseTopicIds());
 
 FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
+respMap(new RespEntry("foo", 0, fooId, 10, 20)));
 handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
 // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
 FetchSessionHandler.Builder builder2 = handler.newBuilder();
-builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
-new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+builder2.add(new TopicPartition("foo", partition),
+new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 
210, Optional.empty()));
 FetchSessionHandler.FetchRequestData data2 = builder2.build();
-// Should have the same session ID and next epoch, but can no 
longer use topic IDs.
-// The receiving broker will close the session if we were 
previously using topic IDs.
+// Should have the same session ID, and next epoch and can no 
longer use topic IDs.
+// The receiving broker will handle closing the session.
 assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
 assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
 assertFalse(data2.canUseTopicIds());
 });
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testTopicIdReplaced(boolean fetchRequestUsesIds) {
+TopicPartition tp = new TopicPartition("foo", 0);
+FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+FetchSessionHandler.Builder builder = handler.newBuilder();
+Uuid topicId1 = Uuid.randomUuid();
+builder.add(tp,
+new FetchRequest.PartitionData(topicId1, 0, 100, 200, 
Optional.empty()));
+FetchSessionHandler.FetchRequestData data = builder.build();
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)),
+data.toSend(), data.sessionPartitions());
+assertTrue(data.metadata().isFull());
+assertTrue(data.canUseTopicIds());
+
+FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
+respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20)));
+handler.handleResponse(resp, (short) 12);
+
+// Try to add a new topic ID.
+FetchSessionHandler.Builder builder2 = handler.newBuilder();
+Uuid topicId2 = Uuid.randomUuid();
+// Use the same data besides the topic ID.
+FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty());
+builder2.add(tp, partitionData);
+FetchSessionHandler.FetchRequestData data2 = builder2.build();
+// The old topic ID partition should be in toReplace, and the new one 
should be in toSend.
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)),
+data2.toSend(), data2.sessionPartitions());
+assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, 
tp)), data2.toReplace());
+// Should have the same session ID, and next epoch and can use topic 
IDs.
+assertEquals(123, data2.metadata().sessionId(), "Did not use same 

[GitHub] [kafka] ijuma merged pull request #11429: KAFKA-13396: allow create topic without partition/replicaFactor

2021-11-04 Thread GitBox


ijuma merged pull request #11429:
URL: https://github.com/apache/kafka/pull/11429


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11429: KAFKA-13396: allow create topic without partition/replicaFactor

2021-11-04 Thread GitBox


showuon commented on pull request #11429:
URL: https://github.com/apache/kafka/pull/11429#issuecomment-960951629


   Yes, cherry-pick back to v3.0.1, too, please. Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11429: KAFKA-13396: allow create topic without partition/replicaFactor

2021-11-04 Thread GitBox


ijuma commented on pull request #11429:
URL: https://github.com/apache/kafka/pull/11429#issuecomment-960945328


   @showuon we should have this in 3.0.1 too, right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11467: MINOR: fix java doc in kafkaProducer

2021-11-04 Thread GitBox


showuon commented on pull request #11467:
URL: https://github.com/apache/kafka/pull/11467#issuecomment-960867632


   @dajac , please help review this java doc update, and I think this fix 
should put into V3.1.0. WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11467: MINOR: fix java doc in kafkaProducer

2021-11-04 Thread GitBox


showuon commented on a change in pull request #11467:
URL: https://github.com/apache/kafka/pull/11467#discussion_r742808430



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##
@@ -100,8 +100,6 @@
  * {@code
  * Properties props = new Properties();
  * props.put("bootstrap.servers", "localhost:9092");
- * props.put("acks", "all");
- * props.put("retries", 0);

Review comment:
   In V3.0, `acks` is default to `all`, and `retires` is not recommended to 
set to 0 after 
[KIP-91](https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer)
 .




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11467: MINOR: fix java doc in kafkaProducer

2021-11-04 Thread GitBox


showuon commented on a change in pull request #11467:
URL: https://github.com/apache/kafka/pull/11467#discussion_r742808430



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##
@@ -100,8 +100,6 @@
  * {@code
  * Properties props = new Properties();
  * props.put("bootstrap.servers", "localhost:9092");
- * props.put("acks", "all");
- * props.put("retries", 0);

Review comment:
   In V3.0, `acks` is default to `all`, and `retires` is not recommended to 
set to 0 after 
KIP-91(https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11467: MINOR: fix java doc in kafkaProducer

2021-11-04 Thread GitBox


showuon commented on a change in pull request #11467:
URL: https://github.com/apache/kafka/pull/11467#discussion_r742807104



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##
@@ -117,14 +115,15 @@
  * as well as a background I/O thread that is responsible for turning these 
records into requests and transmitting them
  * to the cluster. Failure to close the producer after use will leak these 
resources.
  * 
- * The {@link #send(ProducerRecord) send()} method is asynchronous. When 
called it adds the record to a buffer of pending record sends
+ * The {@link #send(ProducerRecord) send()} method is asynchronous. When 
called, it adds the record to a buffer of pending sending record
  * and immediately returns. This allows the producer to batch together 
individual records for efficiency.
  * 
  * The acks config controls the criteria under which requests are 
considered complete. The "all" setting
  * we have specified will result in blocking on the full commit of the record, 
the slowest but most durable setting.
  * 
- * If the request fails, the producer can automatically retry, though since we 
have specified retries
- * as 0 it won't. Enabling retries also opens up the possibility of duplicates 
(see the documentation on
+ * If the request fails, the producer can automatically retry. We have 
specified retries default as Integer.MAX_VALUE.
+ * It's recommended to use delivery.timeout.ms to control retry 
behavior, instead of retries.
+ * Enabling retries also opens up the possibility of duplicates (see the 
documentation on

Review comment:
   After 
[KIP-91](https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer),
 we set `retries` as max value, and it's recommended to use 
`delivery.timeout.ms` to control retry behavior. Update it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #11467: MINOR: fix java doc in kafkaProducer

2021-11-04 Thread GitBox


showuon opened a new pull request #11467:
URL: https://github.com/apache/kafka/pull/11467


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Gerrrr commented on pull request #11447: KAFKA-13024: Use not-null filter only in optimizable repartitions

2021-11-04 Thread GitBox


Ge commented on pull request #11447:
URL: https://github.com/apache/kafka/pull/11447#issuecomment-960765623


   Thanks for the review! I agree that solving this issue by removing the 
not-null optimization completely is very much suboptimal. 8f176cb reverts the 
first commit. 7c6e97f introduces another approach where 
`UnoptimizableRepartitionNode` adds the not-null filter iff all downstream 
operations drop the null keys.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-13433) JsonConverter's method convertToJson when field is optional with default value and value is null, return default value.

2021-11-04 Thread Aiden Gong (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17438556#comment-17438556
 ] 

Aiden Gong edited comment on KAFKA-13433 at 11/4/21, 9:38 AM:
--

Class org.apache.kafka.connect.json.JsonConverter line 704, should use 
'struct.getWithoutDefault(field)' instead of 'struct.get(field.name())' .


was (Author: aiden gong):
Class org.apache.kafka.connect.json.JsonConverter line 704, should use 
'struct.getWithoutDefault(field)' instead of 'struct.get(field)' .

> JsonConverter's method convertToJson when field is optional with default 
> value and  value is null, return default value.
> 
>
> Key: KAFKA-13433
> URL: https://issues.apache.org/jira/browse/KAFKA-13433
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.0.0
>Reporter: Aiden Gong
>Priority: Major
>  Labels: connect
> Attachments: image-2021-11-04-16-25-18-890.png, 
> image-2021-11-04-16-25-18-975.png
>
>
> JsonConverter's method convertToJson when field is optional with default 
> value and value is null, return default value.  Please refer attachments for 
> more detail.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison commented on a change in pull request #11401: KAFKA-13255: use exclude filter for new topics

2021-11-04 Thread GitBox


mimaison commented on a change in pull request #11401:
URL: https://github.com/apache/kafka/pull/11401#discussion_r742653123



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
##
@@ -152,6 +155,48 @@ public void testConfigPropertyFiltering() {
 .anyMatch(x -> x.name().equals("min.insync.replicas")), "should 
not replicate excluded properties");
 }
 
+@Test
+public void testNewTopicConfigs() throws Exception {
+Map filterConfig = new HashMap<>();
+
filterConfig.put(DefaultConfigPropertyFilter.CONFIG_PROPERTIES_EXCLUDE_CONFIG, 
"follower\\.replication\\.throttled\\.replicas, "
++ "leader\\.replication\\.throttled\\.replicas, "
++ "message\\.timestamp\\.difference\\.max\\.ms, "
++ "message\\.timestamp\\.type, "
++ "unclean\\.leader\\.election\\.enable, "
++ "min\\.insync\\.replicas,"
++ "exclude_param.*");
+DefaultConfigPropertyFilter filter = new DefaultConfigPropertyFilter();
+filter.configure(filterConfig);
+
+MirrorSourceConnector connector = spy(new MirrorSourceConnector(new 
SourceAndTarget("source", "target"),
+new DefaultReplicationPolicy(), x -> true, filter));
+
+final String topic = "testtopic";
+List entries = new ArrayList<>();
+entries.add(new ConfigEntry("name-1", "value-1"));
+entries.add(new ConfigEntry("exclude_param.param1", "value-param1"));
+entries.add(new ConfigEntry("min.insync.replicas", "2"));
+Config config = new Config(entries);
+doReturn(Collections.singletonMap(topic, 
config)).when(connector).describeTopicConfigs(any());
+doAnswer(invocation -> {
+Map newTopics = invocation.getArgument(0);
+assertNotNull(newTopics.get("source." + topic));
+Map targetConfig = newTopics.get("source." + 
topic).configs();
+
+// property 'name-1' isn't defined in the exclude filter -> should 
be replicated
+assertNotNull(targetConfig.get("name-1"), "should replicate 
properties");
+
+// this property is in default list, just double check it:
+String prop1 = "min.insync.replicas";
+assertNull(targetConfig.get(prop1), "should not replicate excluded 
properties " + prop1);
+// this property is only in exclude filter custom parameter, also 
tests regex on the way:
+String prop2 = "exclude_param.param1";
+assertNull(targetConfig.get(prop2), "should not replicate excluded 
properties " + prop2);
+return null;
+}).when(connector).createNewTopics(any());
+connector.createNewTopics(Collections.singleton(topic), 
Collections.singletonMap(topic, 1L));
+}

Review comment:
   I agree that calling verify on `targetConfig()` is not the best. But I 
think we should call verify on `createNewTopics(any())` to ensure our mocked 
method, and the assertions, actually run.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12487) Sink connectors do not work with the cooperative consumer rebalance protocol

2021-11-04 Thread David Jacot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17438579#comment-17438579
 ] 

David Jacot commented on KAFKA-12487:
-

Hi [~kkonstantine]. My understanding of the issue is that we are fixing a bug 
here. Therefore, merging it to 3.1 and 3.0 is fine for me. Please correct if it 
is not a bug.

> Sink connectors do not work with the cooperative consumer rebalance protocol
> 
>
> Key: KAFKA-12487
> URL: https://issues.apache.org/jira/browse/KAFKA-12487
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2, 3.0.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Blocker
> Fix For: 3.1.0
>
>
> The {{ConsumerRebalanceListener}} used by the framework to respond to 
> rebalance events in consumer groups for sink tasks is hard-coded with the 
> assumption that the consumer performs rebalances eagerly. In other words, it 
> assumes that whenever {{onPartitionsRevoked}} is called, all partitions have 
> been revoked from that consumer, and whenever {{onPartitionsAssigned}} is 
> called, the partitions passed in to that method comprise the complete set of 
> topic partitions assigned to that consumer.
> See the [WorkerSinkTask.HandleRebalance 
> class|https://github.com/apache/kafka/blob/b96fc7892f1e885239d3290cf509e1d1bb41e7db/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L669-L730]
>  for the specifics.
>  
> One issue this can cause is silently ignoring to-be-committed offsets 
> provided by sink tasks, since the framework ignores offsets provided by tasks 
> in their {{preCommit}} method if it does not believe that the consumer for 
> that task is currently assigned the topic partition for that offset. See 
> these lines in the [WorkerSinkTask::commitOffsets 
> method|https://github.com/apache/kafka/blob/b96fc7892f1e885239d3290cf509e1d1bb41e7db/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L429-L430]
>  for reference.
>  
> This may not be the only issue caused by configuring a sink connector's 
> consumer to use cooperative rebalancing. Rigorous unit and integration 
> testing should be added before claiming that the Connect framework supports 
> the use of cooperative consumers with sink connectors.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on pull request #11429: KAFKA-13396: allow create topic without partition/replicaFactor

2021-11-04 Thread GitBox


showuon commented on pull request #11429:
URL: https://github.com/apache/kafka/pull/11429#issuecomment-960575313


   @ijuma , please help check it when available. This is a regression issue, so 
I think we should put it v3.1.0. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13430) Remove broker-wide quota properties from the documentation

2021-11-04 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-13430.
-
Fix Version/s: 3.0.0
   3.1.0
 Reviewer: David Jacot
   Resolution: Fixed

> Remove broker-wide quota properties from the documentation
> --
>
> Key: KAFKA-13430
> URL: https://issues.apache.org/jira/browse/KAFKA-13430
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Affects Versions: 3.0.0
>Reporter: Dongjin Lee
>Assignee: Dongjin Lee
>Priority: Major
> Fix For: 3.1.0, 3.0.0
>
>
> I found this problem while working on 
> [KAFKA-13341|https://issues.apache.org/jira/browse/KAFKA-13341].
> Broker-wide quota properties ({{quota.producer.default}}, 
> {{quota.consumer.default}}) are [removed in 
> 3.0|https://issues.apache.org/jira/browse/KAFKA-12591], but it is not applied 
> to the documentation yet.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13430) Remove broker-wide quota properties from the documentation

2021-11-04 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17438574#comment-17438574
 ] 

ASF GitHub Bot commented on KAFKA-13430:


dajac merged pull request #380:
URL: https://github.com/apache/kafka-site/pull/380


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove broker-wide quota properties from the documentation
> --
>
> Key: KAFKA-13430
> URL: https://issues.apache.org/jira/browse/KAFKA-13430
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Affects Versions: 3.0.0
>Reporter: Dongjin Lee
>Assignee: Dongjin Lee
>Priority: Major
>
> I found this problem while working on 
> [KAFKA-13341|https://issues.apache.org/jira/browse/KAFKA-13341].
> Broker-wide quota properties ({{quota.producer.default}}, 
> {{quota.consumer.default}}) are [removed in 
> 3.0|https://issues.apache.org/jira/browse/KAFKA-12591], but it is not applied 
> to the documentation yet.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13351) Add possibility to write kafka headers in Kafka Console Producer

2021-11-04 Thread Seweryn Habdank-Wojewodzki (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17438573#comment-17438573
 ] 

Seweryn Habdank-Wojewodzki commented on KAFKA-13351:


Thanks a lot!

> Add possibility to write kafka headers in Kafka Console Producer
> 
>
> Key: KAFKA-13351
> URL: https://issues.apache.org/jira/browse/KAFKA-13351
> Project: Kafka
>  Issue Type: Wish
>  Components: tools
>Affects Versions: 2.8.1
>Reporter: Seweryn Habdank-Wojewodzki
>Assignee: Florin Akermann
>Priority: Major
>
> Dears,
> Currently there is an asymetry between Kafka Console Consumer and Producer.
> Kafka Consumer can display headers (KAFKA-6733), but Kafka Producer cannot 
> produce them.
> It would be good to unify this and add possibility to Kafka Console Producer 
> to produce them.
> Similar ticket is here: KAFKA-6574, but it is very old and does not 
> represents current state of the software.
> Please consider this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on pull request #11463: KAFKA-13430: Remove broker-wide quota properties from the documentation

2021-11-04 Thread GitBox


dajac commented on pull request #11463:
URL: https://github.com/apache/kafka/pull/11463#issuecomment-960571438


   Merged to trunk and 3.1.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac merged pull request #11463: KAFKA-13430: Remove broker-wide quota properties from the documentation

2021-11-04 Thread GitBox


dajac merged pull request #11463:
URL: https://github.com/apache/kafka/pull/11463


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13433) JsonConverter's method convertToJson when field is optional with default value and value is null, return default value.

2021-11-04 Thread Aiden Gong (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aiden Gong updated KAFKA-13433:
---
Labels: connect  (was: connect-api)

> JsonConverter's method convertToJson when field is optional with default 
> value and  value is null, return default value.
> 
>
> Key: KAFKA-13433
> URL: https://issues.apache.org/jira/browse/KAFKA-13433
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.0.0
>Reporter: Aiden Gong
>Priority: Major
>  Labels: connect
> Attachments: image-2021-11-04-16-25-18-890.png, 
> image-2021-11-04-16-25-18-975.png
>
>
> JsonConverter's method convertToJson when field is optional with default 
> value and value is null, return default value.  Please refer attachments for 
> more detail.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13433) JsonConverter's method convertToJson when field is optional with default value and value is null, return default value.

2021-11-04 Thread Aiden Gong (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aiden Gong updated KAFKA-13433:
---
Labels: connect-api  (was: )

> JsonConverter's method convertToJson when field is optional with default 
> value and  value is null, return default value.
> 
>
> Key: KAFKA-13433
> URL: https://issues.apache.org/jira/browse/KAFKA-13433
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.0.0
>Reporter: Aiden Gong
>Priority: Major
>  Labels: connect-api
> Attachments: image-2021-11-04-16-25-18-890.png, 
> image-2021-11-04-16-25-18-975.png
>
>
> JsonConverter's method convertToJson when field is optional with default 
> value and value is null, return default value.  Please refer attachments for 
> more detail.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13433) JsonConverter's method convertToJson when field is optional with default value and value is null, return default value.

2021-11-04 Thread Aiden Gong (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aiden Gong updated KAFKA-13433:
---
Priority: Major  (was: Minor)

> JsonConverter's method convertToJson when field is optional with default 
> value and  value is null, return default value.
> 
>
> Key: KAFKA-13433
> URL: https://issues.apache.org/jira/browse/KAFKA-13433
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.0.0
>Reporter: Aiden Gong
>Priority: Major
> Attachments: image-2021-11-04-16-25-18-890.png, 
> image-2021-11-04-16-25-18-975.png
>
>
> JsonConverter's method convertToJson when field is optional with default 
> value and value is null, return default value.  Please refer attachments for 
> more detail.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13433) JsonConverter's method convertToJson when field is optional with default value and value is null, return default value.

2021-11-04 Thread Aiden Gong (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aiden Gong updated KAFKA-13433:
---
Priority: Minor  (was: Blocker)

> JsonConverter's method convertToJson when field is optional with default 
> value and  value is null, return default value.
> 
>
> Key: KAFKA-13433
> URL: https://issues.apache.org/jira/browse/KAFKA-13433
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.0.0
>Reporter: Aiden Gong
>Priority: Minor
> Attachments: image-2021-11-04-16-25-18-890.png, 
> image-2021-11-04-16-25-18-975.png
>
>
> JsonConverter's method convertToJson when field is optional with default 
> value and value is null, return default value.  Please refer attachments for 
> more detail.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13433) JsonConverter's method convertToJson when field is optional with default value and value is null, return default value.

2021-11-04 Thread Aiden Gong (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17438556#comment-17438556
 ] 

Aiden Gong commented on KAFKA-13433:


Class org.apache.kafka.connect.json.JsonConverter line 704, should use 
'struct.getWithoutDefault(field)' instead of 'struct.get(field)' .

> JsonConverter's method convertToJson when field is optional with default 
> value and  value is null, return default value.
> 
>
> Key: KAFKA-13433
> URL: https://issues.apache.org/jira/browse/KAFKA-13433
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.0.0
>Reporter: Aiden Gong
>Priority: Blocker
> Attachments: image-2021-11-04-16-25-18-890.png, 
> image-2021-11-04-16-25-18-975.png
>
>
> JsonConverter's method convertToJson when field is optional with default 
> value and value is null, return default value.  Please refer attachments for 
> more detail.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13433) JsonConverter's method convertToJson when field is optional with default value and value is null, return default value.

2021-11-04 Thread Aiden Gong (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17438553#comment-17438553
 ] 

Aiden Gong commented on KAFKA-13433:


I will submit  a pr to fix this issue. Please assignee to me. Thanks!

> JsonConverter's method convertToJson when field is optional with default 
> value and  value is null, return default value.
> 
>
> Key: KAFKA-13433
> URL: https://issues.apache.org/jira/browse/KAFKA-13433
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.0.0
>Reporter: Aiden Gong
>Priority: Blocker
> Attachments: image-2021-11-04-16-25-18-890.png, 
> image-2021-11-04-16-25-18-975.png
>
>
> JsonConverter's method convertToJson when field is optional with default 
> value and value is null, return default value.  Please refer attachments for 
> more detail.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13433) JsonConverter's method convertToJson when field is optional with default value and value is null, return default value.

2021-11-04 Thread Aiden Gong (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aiden Gong updated KAFKA-13433:
---
Description: JsonConverter's method convertToJson when field is optional 
with default value and value is null, return default value.  Please refer 
attachments for more detail.  (was: JsonConverter's method convertToJson when 
field is optional with default value and value is null, return default value. )

> JsonConverter's method convertToJson when field is optional with default 
> value and  value is null, return default value.
> 
>
> Key: KAFKA-13433
> URL: https://issues.apache.org/jira/browse/KAFKA-13433
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.0.0
>Reporter: Aiden Gong
>Priority: Blocker
> Attachments: image-2021-11-04-16-25-18-890.png, 
> image-2021-11-04-16-25-18-975.png
>
>
> JsonConverter's method convertToJson when field is optional with default 
> value and value is null, return default value.  Please refer attachments for 
> more detail.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13433) JsonConverter's method convertToJson when field is optional with default value and value is null, return default value.

2021-11-04 Thread Aiden Gong (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aiden Gong updated KAFKA-13433:
---
Description: JsonConverter's method convertToJson when field is optional 
with default value and value is null, return default value.   (was: 
JsonConverter's method convertToJson when field is optional with default value 
and value is null, return default value. !image-2021-11-04-16-25-18-975.png!)

> JsonConverter's method convertToJson when field is optional with default 
> value and  value is null, return default value.
> 
>
> Key: KAFKA-13433
> URL: https://issues.apache.org/jira/browse/KAFKA-13433
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.0.0
>Reporter: Aiden Gong
>Priority: Blocker
> Attachments: image-2021-11-04-16-25-18-890.png, 
> image-2021-11-04-16-25-18-975.png
>
>
> JsonConverter's method convertToJson when field is optional with default 
> value and value is null, return default value. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13433) JsonConverter's method convertToJson when field is optional with default value and value is null, return default value.

2021-11-04 Thread Aiden Gong (Jira)
Aiden Gong created KAFKA-13433:
--

 Summary: JsonConverter's method convertToJson when field is 
optional with default value and  value is null, return default value.
 Key: KAFKA-13433
 URL: https://issues.apache.org/jira/browse/KAFKA-13433
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 3.0.0
Reporter: Aiden Gong
 Attachments: image-2021-11-04-16-25-18-890.png, 
image-2021-11-04-16-25-18-975.png

JsonConverter's method convertToJson when field is optional with default value 
and value is null, return default value. !image-2021-11-04-16-25-18-975.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] RivenSun2 commented on a change in pull request #11460: KAFKA-13425: Optimization of KafkaConsumer#pause semantics

2021-11-04 Thread GitBox


RivenSun2 commented on a change in pull request #11460:
URL: https://github.com/apache/kafka/pull/11460#discussion_r742601310



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -326,6 +330,10 @@ private Exception invokePartitionsRevoked(final 
Set revokedParti
 
 private Exception invokePartitionsLost(final Set 
lostPartitions) {
 log.info("Lost previously assigned partitions {}", 
Utils.join(lostPartitions, ", "));
+Set lostPausedPartitions = 
subscriptions.pausedPartitions();
+lostPausedPartitions.retainAll(lostPartitions);
+if (!lostPausedPartitions.isEmpty())
+log.info("The pause flag in partitions [{}] will be removed due to 
lost.", Utils.join(lostPausedPartitions, ", "));

Review comment:
   Thank you for your rigorous attitude.
   Forgive my bad English.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…

2021-11-04 Thread GitBox


RivenSun2 commented on pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#issuecomment-960522880


   @guozhangwang @showuon 
   Thanks for your helpful review.
   
   I just commit a new change
   If you have time, please browse it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11460: KAFKA-13425: Optimization of KafkaConsumer#pause semantics

2021-11-04 Thread GitBox


showuon commented on a change in pull request #11460:
URL: https://github.com/apache/kafka/pull/11460#discussion_r742596144



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -326,6 +330,10 @@ private Exception invokePartitionsRevoked(final 
Set revokedParti
 
 private Exception invokePartitionsLost(final Set 
lostPartitions) {
 log.info("Lost previously assigned partitions {}", 
Utils.join(lostPartitions, ", "));
+Set lostPausedPartitions = 
subscriptions.pausedPartitions();
+lostPausedPartitions.retainAll(lostPartitions);
+if (!lostPausedPartitions.isEmpty())
+log.info("The pause flag in partitions [{}] will be removed due to 
lost.", Utils.join(lostPausedPartitions, ", "));

Review comment:
   nit: due to **partition** lost




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on pull request #11460: KAFKA-13425: Optimization of KafkaConsumer#pause semantics

2021-11-04 Thread GitBox


RivenSun2 commented on pull request #11460:
URL: https://github.com/apache/kafka/pull/11460#issuecomment-960521298


   @showuon 
   Thanks for your review
   I just commited a new change


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on a change in pull request #11460: KAFKA-13425: Optimization of KafkaConsumer#pause semantics

2021-11-04 Thread GitBox


RivenSun2 commented on a change in pull request #11460:
URL: https://github.com/apache/kafka/pull/11460#discussion_r742589702



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -307,6 +307,10 @@ private Exception invokePartitionsAssigned(final 
Set assignedPar
 
 private Exception invokePartitionsRevoked(final Set 
revokedPartitions) {
 log.info("Revoke previously assigned partitions {}", 
Utils.join(revokedPartitions, ", "));
+Set revokePausedPartitions = 
subscriptions.pausedPartitions();
+revokePausedPartitions.retainAll(revokedPartitions);
+if (!revokePausedPartitions.isEmpty())
+log.info("Revoke previously paused partitions {}", 
Utils.join(revokePausedPartitions, ", "));

Review comment:
   I agree with you




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on a change in pull request #11460: KAFKA-13425: Optimization of KafkaConsumer#pause semantics

2021-11-04 Thread GitBox


RivenSun2 commented on a change in pull request #11460:
URL: https://github.com/apache/kafka/pull/11460#discussion_r742589553



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##
@@ -2007,7 +2007,7 @@ public OffsetAndMetadata committed(TopicPartition 
partition, final Duration time
  * Suspend fetching from the requested partitions. Future calls to {@link 
#poll(Duration)} will not return
  * any records from these partitions until they have been resumed using 
{@link #resume(Collection)}.
  * Note that this method does not affect partition subscription. In 
particular, it does not cause a group
- * rebalance when automatic assignment is used.
+ * rebalance when automatic assignment is used. And groupRebalance does 
not preserve pause state.

Review comment:
   Sure




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11460: KAFKA-13425: Optimization of KafkaConsumer#pause semantics

2021-11-04 Thread GitBox


showuon commented on a change in pull request #11460:
URL: https://github.com/apache/kafka/pull/11460#discussion_r742575556



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##
@@ -2007,7 +2007,7 @@ public OffsetAndMetadata committed(TopicPartition 
partition, final Duration time
  * Suspend fetching from the requested partitions. Future calls to {@link 
#poll(Duration)} will not return
  * any records from these partitions until they have been resumed using 
{@link #resume(Collection)}.
  * Note that this method does not affect partition subscription. In 
particular, it does not cause a group
- * rebalance when automatic assignment is used.
+ * rebalance when automatic assignment is used. And groupRebalance does 
not preserve pause state.

Review comment:
   How about this:
   ```
   * ...
   * rebalance when automatic assignment is used.
   * 
   * Note: Rebalance will not preserve the pause/resume state.
   ```
   
   

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -307,6 +307,10 @@ private Exception invokePartitionsAssigned(final 
Set assignedPar
 
 private Exception invokePartitionsRevoked(final Set 
revokedPartitions) {
 log.info("Revoke previously assigned partitions {}", 
Utils.join(revokedPartitions, ", "));
+Set revokePausedPartitions = 
subscriptions.pausedPartitions();
+revokePausedPartitions.retainAll(revokedPartitions);
+if (!revokePausedPartitions.isEmpty())
+log.info("Revoke previously paused partitions {}", 
Utils.join(revokePausedPartitions, ", "));

Review comment:
   I'm afraid that users might get confused, because it looks like we 
revoke twice:
   ```
   Revoke previously assigned partitions tp1, tp2.
   Revoke previously paused partitions tp1
   ```
   
   How about it:
   `log.info("The pause flag in partitions [{}] will be removed due to 
revocation.", Utils.join(revokePausedPartitions, ", "));`
   WDYT?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…

2021-11-04 Thread GitBox


RivenSun2 commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r742575938



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -1073,6 +1086,33 @@ private void doAutoCommitOffsetsAsync() {
 });
 }
 
+private boolean maybeAutoCommitOffsetsAsync() {

Review comment:
   In fact, only onJoinPrepare called the maybeAutoCommitOffsetsAsync 
method before.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…

2021-11-04 Thread GitBox


RivenSun2 commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r742574738



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -420,9 +421,14 @@ boolean joinGroupIfNeeded(final Timer timer) {
 // need to set the flag before calling onJoinPrepare since the 
user callback may throw
 // exception, in which case upon retry we should not retry 
onJoinPrepare either.
 needsJoinPrepare = false;
-onJoinPrepare(generation.generationId, generation.memberId);
+if (!onJoinPrepare(generation.generationId, 
generation.memberId))

Review comment:
   I think it can be simplified like this
```
  if (needsJoinPrepare) {
   // need to set the flag before calling onJoinPrepare since 
the user callback may throw
   // exception, in which case upon retry we should not retry 
onJoinPrepare either.
   needsJoinPrepare = false;
   if (!onJoinPrepare(generation.generationId, 
generation.memberId)) {
   needsJoinPrepare = true;
   //should not initiateJoinGroup if needsJoinPrepare still 
is true
   return false;
   }
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…

2021-11-04 Thread GitBox


RivenSun2 commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r742569946



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
##
@@ -1053,6 +1058,49 @@ public void 
testForceMetadataRefreshForPatternSubscriptionDuringRebalance() {
 assertFalse(coordinator.rejoinNeededOrPending());
 }
 
+@Test
+public void testForceMetadataDeleteForPatternSubscriptionDuringRebalance() 
{

Review comment:
   In fact, only onJoinPrepare called the maybeAutoCommitOffsetsAsync 
method before.
   Follow guozhang's suggestion , I will delete the 
maybeAutoCommitOffsetsSync(timer) method,
   close() also calls maybeAutoCommitOffsetsAsync




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…

2021-11-04 Thread GitBox


RivenSun2 commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r742568417



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
##
@@ -151,6 +151,11 @@
 put(topic2, 1);
 }
 });
+private MetadataResponse deletedMetadataResponse = 
RequestTestUtils.metadataUpdateWith(1, new HashMap() {

Review comment:
   Sure
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…

2021-11-04 Thread GitBox


RivenSun2 commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r742568236



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -692,10 +693,18 @@ private void validateCooperativeAssignment(final 
Map

[GitHub] [kafka] RivenSun2 commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…

2021-11-04 Thread GitBox


RivenSun2 commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r742567404



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -403,7 +404,7 @@ private void closeHeartbeatThread() {
  *
  * @param timer Timer bounding how long this method can block
  * @throws KafkaException if the callback throws exception
- * @return true iff the operation succeeded
+ * @return true if the operation succeeded

Review comment:
   my fault, I thought it was a misspelling of a word, I will restore it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…

2021-11-04 Thread GitBox


showuon commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r742567298



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -420,9 +421,14 @@ boolean joinGroupIfNeeded(final Timer timer) {
 // need to set the flag before calling onJoinPrepare since the 
user callback may throw
 // exception, in which case upon retry we should not retry 
onJoinPrepare either.
 needsJoinPrepare = false;
-onJoinPrepare(generation.generationId, generation.memberId);
+if (!onJoinPrepare(generation.generationId, 
generation.memberId))

Review comment:
   You're right. How about this:
   ```java
   
   if (needsJoinPrepare) {
   try {
   if (onJoinPrepare(generation.generationId, generation.memberId))
   needsJoinPrepare = false;
   else
   return false;
   } catch (KafkaException e) {
   // if onJoinPrepare throws an exception, it would be from the 
rebalance listener.
   // next time we would then not retry {@code onJoinPrepare} any 
more but proceed the join-group procedure.
   needsJoinPrepare = false;
   throw e;
   }
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…

2021-11-04 Thread GitBox


showuon commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r742541638



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -692,10 +693,18 @@ private void validateCooperativeAssignment(final 
Map() {

Review comment:
   Since this variable only used in the new added test, could we put it 
into the test?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -1073,6 +1086,33 @@ private void doAutoCommitOffsetsAsync() {
 });
 }
 
+private boolean maybeAutoCommitOffsetsAsync() {
+if (autoCommitEnabled) {
+invokeCompletedOffsetCommitCallbacks();
+
+if (onJoinPrepareAsyncCommitFuture == null)
+onJoinPrepareAsyncCommitFuture = doAutoCommitOffsetsAsync();
+if (onJoinPrepareAsyncCommitFuture == null)
+return true;
+
+client.pollNoWakeup();
+invokeCompletedOffsetCommitCallbacks();
+
+if (!onJoinPrepareAsyncCommitFuture.isDone())
+return false;
+if (onJoinPrepareAsyncCommitFuture.succeeded()) {
+onJoinPrepareAsyncCommitFuture = null;
+return true;
+}
+if (onJoinPrepareAsyncCommitFuture.failed() && 
!onJoinPrepareAsyncCommitFuture.isRetriable())

Review comment:
   +1




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…

2021-11-04 Thread GitBox


RivenSun2 commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r742564833



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -1073,6 +1086,33 @@ private void doAutoCommitOffsetsAsync() {
 });
 }
 
+private boolean maybeAutoCommitOffsetsAsync() {
+if (autoCommitEnabled) {
+invokeCompletedOffsetCommitCallbacks();

Review comment:
   my fault, already fix it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…

2021-11-04 Thread GitBox


RivenSun2 commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r742560824



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -420,9 +421,14 @@ boolean joinGroupIfNeeded(final Timer timer) {
 // need to set the flag before calling onJoinPrepare since the 
user callback may throw
 // exception, in which case upon retry we should not retry 
onJoinPrepare either.
 needsJoinPrepare = false;
-onJoinPrepare(generation.generationId, generation.memberId);
+if (!onJoinPrepare(generation.generationId, 
generation.memberId))

Review comment:
   If you do
   This goal should not be achieved:
   ```
   if (!onJoinPrepare(generation.generationId, generation.memberId))
needsJoinPrepare = true;
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…

2021-11-04 Thread GitBox


RivenSun2 commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r742560824



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -420,9 +421,14 @@ boolean joinGroupIfNeeded(final Timer timer) {
 // need to set the flag before calling onJoinPrepare since the 
user callback may throw
 // exception, in which case upon retry we should not retry 
onJoinPrepare either.
 needsJoinPrepare = false;
-onJoinPrepare(generation.generationId, generation.memberId);
+if (!onJoinPrepare(generation.generationId, 
generation.memberId))

Review comment:
   If you do
   This goal should not be achieved:
   ```
   if (!onJoinPrepare(generation.generationId, generation.memberId))
needsJoinPrepare = true;
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…

2021-11-04 Thread GitBox


RivenSun2 commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r742560295



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -1073,6 +1086,33 @@ private void doAutoCommitOffsetsAsync() {
 });
 }
 
+private boolean maybeAutoCommitOffsetsAsync() {
+if (autoCommitEnabled) {
+invokeCompletedOffsetCommitCallbacks();
+
+if (onJoinPrepareAsyncCommitFuture == null)
+onJoinPrepareAsyncCommitFuture = doAutoCommitOffsetsAsync();
+if (onJoinPrepareAsyncCommitFuture == null)
+return true;
+
+client.pollNoWakeup();
+invokeCompletedOffsetCommitCallbacks();
+
+if (!onJoinPrepareAsyncCommitFuture.isDone())
+return false;
+if (onJoinPrepareAsyncCommitFuture.succeeded()) {
+onJoinPrepareAsyncCommitFuture = null;
+return true;
+}
+if (onJoinPrepareAsyncCommitFuture.failed() && 
!onJoinPrepareAsyncCommitFuture.isRetriable())

Review comment:
   I agree with your suggestion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…

2021-11-04 Thread GitBox


RivenSun2 commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r742560139



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -1073,6 +1086,33 @@ private void doAutoCommitOffsetsAsync() {
 });
 }
 
+private boolean maybeAutoCommitOffsetsAsync() {

Review comment:
   I agree with your suggestion.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -1054,11 +1067,11 @@ public void maybeAutoCommitOffsetsAsync(long now) {
 }
 }
 
-private void doAutoCommitOffsetsAsync() {

Review comment:
   I agree with your suggestion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…

2021-11-04 Thread GitBox


RivenSun2 commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r742560028



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -692,10 +693,18 @@ private void validateCooperativeAssignment(final 
Map

[GitHub] [kafka] RivenSun2 commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…

2021-11-04 Thread GitBox


RivenSun2 commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r742559955



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -1073,6 +1086,33 @@ private void doAutoCommitOffsetsAsync() {
 });
 }
 
+private boolean maybeAutoCommitOffsetsAsync() {
+if (autoCommitEnabled) {
+invokeCompletedOffsetCommitCallbacks();
+
+if (onJoinPrepareAsyncCommitFuture == null)

Review comment:
   Because we use asynchronous commitOffsets, and at the same time, in 
order not to block the Consumer#poll method
   We cannot call ConsumerNetworkClient#poll(future)
   
   The onJoinPrepareAsyncCommitFuture variable is introduced to deal with the 
"onJoinPrepareAsyncCommitFuture.isDone() == false" situation.
   
   Maybe my consideration is complicated, this situation can be retryed by 
Consumer#poll next time.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org