[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs
jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743370521 ## File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala ## @@ -659,88 +670,108 @@ class FetchSessionTest { } @Test - def testIncrementalFetchSessionWithIdsWhenSessionDoesNotUseIds() : Unit = { + def testFetchSessionWithUnknownId(): Unit = { val time = new MockTime() val cache = new FetchSessionCache(10, 1000) val fetchManager = new FetchManager(time, cache) -val topicIds = new util.HashMap[String, Uuid]() -val topicNames = new util.HashMap[Uuid, String]() +val topicNames = Map(Uuid.randomUuid() -> "foo", Uuid.randomUuid() -> "bar").asJava Review comment: I can think more on this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs
jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743370708 ## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ## @@ -794,22 +793,23 @@ class ReplicaManagerTest { // We receive one valid request from the follower and replica state is updated var successfulFetch: Option[FetchPartitionData] = None - def callback(response: Seq[(TopicPartition, FetchPartitionData)]): Unit = { -successfulFetch = response.headOption.filter { case (topicPartition, _) => topicPartition == tp }.map { case (_, data) => data } + def callback(response: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { +// Check the topic partition only since we are reusing this callback on different TopicIdPartitions. +successfulFetch = response.headOption.filter { case (topicIdPartition, _) => topicIdPartition.topicPartition == tidp.topicPartition }.map { case (_, data) => data } Review comment: STILL TODO for Friday -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs
jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743370419 ## File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala ## @@ -659,88 +670,108 @@ class FetchSessionTest { } @Test - def testIncrementalFetchSessionWithIdsWhenSessionDoesNotUseIds() : Unit = { + def testFetchSessionWithUnknownId(): Unit = { val time = new MockTime() val cache = new FetchSessionCache(10, 1000) val fetchManager = new FetchManager(time, cache) -val topicIds = new util.HashMap[String, Uuid]() -val topicNames = new util.HashMap[Uuid, String]() +val topicNames = Map(Uuid.randomUuid() -> "foo", Uuid.randomUuid() -> "bar").asJava Review comment: We could do that, but then this check will be a bit more complicated. `context2.foreachPartition((topicIdPartition, _) => assertEquals(topicNames.get(topicIdPartition.topicId), topicIdPartition.topic))` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs
jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743369512 ## File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala ## @@ -659,88 +670,108 @@ class FetchSessionTest { } @Test - def testIncrementalFetchSessionWithIdsWhenSessionDoesNotUseIds() : Unit = { + def testFetchSessionWithUnknownId(): Unit = { val time = new MockTime() val cache = new FetchSessionCache(10, 1000) val fetchManager = new FetchManager(time, cache) -val topicIds = new util.HashMap[String, Uuid]() -val topicNames = new util.HashMap[Uuid, String]() +val topicNames = Map(Uuid.randomUuid() -> "foo", Uuid.randomUuid() -> "bar").asJava +val topicIds = topicNames.asScala.map(_.swap).asJava +val foo0 = new TopicIdPartition(topicIds.getOrDefault("foo", Uuid.ZERO_UUID), new TopicPartition("foo", 0)) +val foo1 = new TopicIdPartition(topicIds.getOrDefault("foo", Uuid.ZERO_UUID), new TopicPartition("foo", 1)) +val emptyFoo0 = new TopicIdPartition(topicIds.getOrDefault("foo", Uuid.ZERO_UUID), new TopicPartition(null, 0)) +val emptyFoo1 = new TopicIdPartition(topicIds.getOrDefault("foo", Uuid.ZERO_UUID), new TopicPartition(null, 1)) -// Create a new fetch session with foo-0 +// Create a new fetch session with foo-0 and foo-1 val reqData1 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] -reqData1.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0, 0, 100, +reqData1.put(foo0.topicPartition, new FetchRequest.PartitionData(foo0.topicId, 0, 0, 100, Optional.empty())) -val request1 = createRequestWithoutTopicIds(JFetchMetadata.INITIAL, reqData1, topicIds, EMPTY_PART_LIST, false) -// Start a fetch session using a request version that does not use topic IDs. +reqData1.put(foo1.topicPartition, new FetchRequest.PartitionData(foo1.topicId,10, 0, 100, + Optional.empty())) +val request1 = createRequest(JFetchMetadata.INITIAL, reqData1, EMPTY_PART_LIST, false) +// Simulate unknown topic ID for foo. +val topicNamesOnlyBar = Collections.singletonMap(topicIds.get("bar"), "bar") +// We should not throw error since we have an older request version. val context1 = fetchManager.newContext( request1.version, request1.metadata, request1.isFromFollower, - request1.fetchData(topicNames), - request1.forgottenTopics(topicNames), - topicIds + request1.fetchData(topicNamesOnlyBar), + request1.forgottenTopics(topicNamesOnlyBar), + topicNamesOnlyBar ) assertEquals(classOf[FullFetchContext], context1.getClass) -val respData1 = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData] -respData1.put(new TopicPartition("foo", 0), new FetchResponseData.PartitionData() +val respData1 = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData] Review comment: Not quite sure what you meant here but I added this for now: `context1.foreachPartition((topicIdPartition, _) => assertEquals(topicIds.get("foo"), topicIdPartition.topicId))` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs
jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743365481 ## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala ## @@ -1091,18 +1089,20 @@ class AbstractFetcherThreadTest { override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = { val fetchData = mutable.Map.empty[TopicPartition, FetchRequest.PartitionData] - partitionMap.foreach { case (partition, state) => + partitionMap.foreach { case (partition, state) => 0 +.equals(0) Review comment: I have no idea why this is here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs
jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743361897 ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ## @@ -2069,10 +2071,10 @@ public void testReturnRecordsDuringRebalance() throws InterruptedException { ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); ConsumerPartitionAssignor assignor = new CooperativeStickyAssignor(); -KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); - initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1), Utils.mkEntry(topic2, 1), Utils.mkEntry(topic3, 1))); +KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); Review comment: Looks like most of these changes were done by this commit: https://github.com/apache/kafka/pull/11331/commits/32c6297adb685f1863b8c7eb85f2f0965853a9f8 so I can remove them pretty easily. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs
jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743360648 ## File path: clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java ## @@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() { List partitions = Arrays.asList(0, 1); partitions.forEach(partition -> { String testType = partition == 0 ? "updating a partition" : "adding a new partition"; -Map topicIds = Collections.singletonMap("foo", Uuid.randomUuid()); +Uuid fooId = Uuid.randomUuid(); FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); FetchSessionHandler.Builder builder = handler.newBuilder(); -builder.add(new TopicPartition("foo", 0), topicIds.get("foo"), -new FetchRequest.PartitionData(0, 100, 200, Optional.empty())); +builder.add(new TopicPartition("foo", 0), +new FetchRequest.PartitionData(fooId, 0, 100, 200, Optional.empty())); FetchSessionHandler.FetchRequestData data = builder.build(); -assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)), +assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)), data.toSend(), data.sessionPartitions()); assertTrue(data.metadata().isFull()); assertTrue(data.canUseTopicIds()); FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, -respMap(new RespEntry("foo", 0, 10, 20)), topicIds); +respMap(new RespEntry("foo", 0, fooId, 10, 20))); handler.handleResponse(resp, ApiKeys.FETCH.latestVersion()); // Try to remove a topic ID from an existing topic partition (0) or add a new topic partition (1) without an ID. FetchSessionHandler.Builder builder2 = handler.newBuilder(); -builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID, -new FetchRequest.PartitionData(10, 110, 210, Optional.empty())); +builder2.add(new TopicPartition("foo", partition), +new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 210, Optional.empty())); FetchSessionHandler.FetchRequestData data2 = builder2.build(); -// Should have the same session ID and next epoch, but can no longer use topic IDs. -// The receiving broker will close the session if we were previously using topic IDs. +// Should have the same session ID, and next epoch and can no longer use topic IDs. +// The receiving broker will handle closing the session. assertEquals(123, data2.metadata().sessionId(), "Did not use same session when " + testType); assertEquals(1, data2.metadata().epoch(), "Did not have correct epoch when " + testType); assertFalse(data2.canUseTopicIds()); }); } +@ParameterizedTest +@ValueSource(booleans = {true, false}) +public void testTopicIdReplaced(boolean fetchRequestUsesIds) { +TopicPartition tp = new TopicPartition("foo", 0); +FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); +FetchSessionHandler.Builder builder = handler.newBuilder(); +Uuid topicId1 = Uuid.randomUuid(); +builder.add(tp, +new FetchRequest.PartitionData(topicId1, 0, 100, 200, Optional.empty())); +FetchSessionHandler.FetchRequestData data = builder.build(); +assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)), +data.toSend(), data.sessionPartitions()); +assertTrue(data.metadata().isFull()); +assertTrue(data.canUseTopicIds()); + +FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, +respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20))); +handler.handleResponse(resp, (short) 12); + +// Try to add a new topic ID. +FetchSessionHandler.Builder builder2 = handler.newBuilder(); +Uuid topicId2 = Uuid.randomUuid(); +// Use the same data besides the topic ID. +FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty()); +builder2.add(tp, partitionData); +FetchSessionHandler.FetchRequestData data2 = builder2.build(); +// The old topic ID partition should be in toReplace, and the new one should be in toSend. +assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)), +data2.toSend(), data2.sessionPartitions()); +assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, tp)), data2.toReplace()); Review comment: Or are you just referring to a case where we don't ever have topic IDs? -- This is an automated message from the Apache Git
[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs
jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743357376 ## File path: clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java ## @@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() { List partitions = Arrays.asList(0, 1); partitions.forEach(partition -> { String testType = partition == 0 ? "updating a partition" : "adding a new partition"; -Map topicIds = Collections.singletonMap("foo", Uuid.randomUuid()); +Uuid fooId = Uuid.randomUuid(); FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); FetchSessionHandler.Builder builder = handler.newBuilder(); -builder.add(new TopicPartition("foo", 0), topicIds.get("foo"), -new FetchRequest.PartitionData(0, 100, 200, Optional.empty())); +builder.add(new TopicPartition("foo", 0), +new FetchRequest.PartitionData(fooId, 0, 100, 200, Optional.empty())); FetchSessionHandler.FetchRequestData data = builder.build(); -assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)), +assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)), data.toSend(), data.sessionPartitions()); assertTrue(data.metadata().isFull()); assertTrue(data.canUseTopicIds()); FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, -respMap(new RespEntry("foo", 0, 10, 20)), topicIds); +respMap(new RespEntry("foo", 0, fooId, 10, 20))); handler.handleResponse(resp, ApiKeys.FETCH.latestVersion()); // Try to remove a topic ID from an existing topic partition (0) or add a new topic partition (1) without an ID. FetchSessionHandler.Builder builder2 = handler.newBuilder(); -builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID, -new FetchRequest.PartitionData(10, 110, 210, Optional.empty())); +builder2.add(new TopicPartition("foo", partition), +new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 210, Optional.empty())); FetchSessionHandler.FetchRequestData data2 = builder2.build(); -// Should have the same session ID and next epoch, but can no longer use topic IDs. -// The receiving broker will close the session if we were previously using topic IDs. +// Should have the same session ID, and next epoch and can no longer use topic IDs. +// The receiving broker will handle closing the session. assertEquals(123, data2.metadata().sessionId(), "Did not use same session when " + testType); assertEquals(1, data2.metadata().epoch(), "Did not have correct epoch when " + testType); assertFalse(data2.canUseTopicIds()); }); } +@ParameterizedTest +@ValueSource(booleans = {true, false}) +public void testTopicIdReplaced(boolean fetchRequestUsesIds) { +TopicPartition tp = new TopicPartition("foo", 0); +FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); +FetchSessionHandler.Builder builder = handler.newBuilder(); +Uuid topicId1 = Uuid.randomUuid(); +builder.add(tp, +new FetchRequest.PartitionData(topicId1, 0, 100, 200, Optional.empty())); +FetchSessionHandler.FetchRequestData data = builder.build(); +assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)), +data.toSend(), data.sessionPartitions()); +assertTrue(data.metadata().isFull()); +assertTrue(data.canUseTopicIds()); + +FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, +respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20))); +handler.handleResponse(resp, (short) 12); + +// Try to add a new topic ID. +FetchSessionHandler.Builder builder2 = handler.newBuilder(); +Uuid topicId2 = Uuid.randomUuid(); +// Use the same data besides the topic ID. +FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty()); +builder2.add(tp, partitionData); +FetchSessionHandler.FetchRequestData data2 = builder2.build(); +// The old topic ID partition should be in toReplace, and the new one should be in toSend. +assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)), +data2.toSend(), data2.sessionPartitions()); +assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, tp)), data2.toReplace()); Review comment: Or did you mean just adding something like `assertEquals(fetchRequestUsesIds, data2.toReplace().size() > 0);` -- This is an
[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs
jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743357376 ## File path: clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java ## @@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() { List partitions = Arrays.asList(0, 1); partitions.forEach(partition -> { String testType = partition == 0 ? "updating a partition" : "adding a new partition"; -Map topicIds = Collections.singletonMap("foo", Uuid.randomUuid()); +Uuid fooId = Uuid.randomUuid(); FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); FetchSessionHandler.Builder builder = handler.newBuilder(); -builder.add(new TopicPartition("foo", 0), topicIds.get("foo"), -new FetchRequest.PartitionData(0, 100, 200, Optional.empty())); +builder.add(new TopicPartition("foo", 0), +new FetchRequest.PartitionData(fooId, 0, 100, 200, Optional.empty())); FetchSessionHandler.FetchRequestData data = builder.build(); -assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)), +assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)), data.toSend(), data.sessionPartitions()); assertTrue(data.metadata().isFull()); assertTrue(data.canUseTopicIds()); FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, -respMap(new RespEntry("foo", 0, 10, 20)), topicIds); +respMap(new RespEntry("foo", 0, fooId, 10, 20))); handler.handleResponse(resp, ApiKeys.FETCH.latestVersion()); // Try to remove a topic ID from an existing topic partition (0) or add a new topic partition (1) without an ID. FetchSessionHandler.Builder builder2 = handler.newBuilder(); -builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID, -new FetchRequest.PartitionData(10, 110, 210, Optional.empty())); +builder2.add(new TopicPartition("foo", partition), +new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 210, Optional.empty())); FetchSessionHandler.FetchRequestData data2 = builder2.build(); -// Should have the same session ID and next epoch, but can no longer use topic IDs. -// The receiving broker will close the session if we were previously using topic IDs. +// Should have the same session ID, and next epoch and can no longer use topic IDs. +// The receiving broker will handle closing the session. assertEquals(123, data2.metadata().sessionId(), "Did not use same session when " + testType); assertEquals(1, data2.metadata().epoch(), "Did not have correct epoch when " + testType); assertFalse(data2.canUseTopicIds()); }); } +@ParameterizedTest +@ValueSource(booleans = {true, false}) +public void testTopicIdReplaced(boolean fetchRequestUsesIds) { +TopicPartition tp = new TopicPartition("foo", 0); +FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); +FetchSessionHandler.Builder builder = handler.newBuilder(); +Uuid topicId1 = Uuid.randomUuid(); +builder.add(tp, +new FetchRequest.PartitionData(topicId1, 0, 100, 200, Optional.empty())); +FetchSessionHandler.FetchRequestData data = builder.build(); +assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)), +data.toSend(), data.sessionPartitions()); +assertTrue(data.metadata().isFull()); +assertTrue(data.canUseTopicIds()); + +FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, +respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20))); +handler.handleResponse(resp, (short) 12); + +// Try to add a new topic ID. +FetchSessionHandler.Builder builder2 = handler.newBuilder(); +Uuid topicId2 = Uuid.randomUuid(); +// Use the same data besides the topic ID. +FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty()); +builder2.add(tp, partitionData); +FetchSessionHandler.FetchRequestData data2 = builder2.build(); +// The old topic ID partition should be in toReplace, and the new one should be in toSend. +assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)), +data2.toSend(), data2.sessionPartitions()); +assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, tp)), data2.toReplace()); Review comment: Or did you mean just adding something like `assertEquals(fetchRequestUsesIds, data2.toReplace().size() > 0);` -- This is an
[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs
jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743355748 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -708,40 +701,41 @@ class KafkaApis(val requestChannel: RequestChannel, None } -val erroneous = mutable.ArrayBuffer[(TopicPartition, FetchResponseData.PartitionData)]() -val interesting = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]() -val sessionTopicIds = mutable.Map[String, Uuid]() +val erroneous = mutable.ArrayBuffer[(TopicIdPartition, FetchResponseData.PartitionData)]() +val interesting = mutable.ArrayBuffer[(TopicIdPartition, FetchRequest.PartitionData)]() if (fetchRequest.isFromFollower) { // The follower must have ClusterAction on ClusterResource in order to fetch partition data. if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) { -fetchContext.foreachPartition { (topicPartition, topicId, data) => - sessionTopicIds.put(topicPartition.topic(), topicId) - if (!metadataCache.contains(topicPartition)) -erroneous += topicPartition -> FetchResponse.partitionResponse(topicPartition.partition, Errors.UNKNOWN_TOPIC_OR_PARTITION) +fetchContext.foreachPartition { (topicIdPartition, data) => + if (topicIdPartition.topicPartition.topic == null ) +erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID) + else if (!metadataCache.contains(topicIdPartition.topicPartition)) +erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION) else -interesting += (topicPartition -> data) +interesting += (topicIdPartition -> data) } } else { -fetchContext.foreachPartition { (part, topicId, _) => - sessionTopicIds.put(part.topic(), topicId) - erroneous += part -> FetchResponse.partitionResponse(part.partition, Errors.TOPIC_AUTHORIZATION_FAILED) +fetchContext.foreachPartition { (topicIdPartition, _) => + erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.TOPIC_AUTHORIZATION_FAILED) Review comment: I think we would want to keep the authorization error. Since it just logs a message. The UNKNOWN_TOPIC_ID error would request a metadata update which doesn't make sense when there is an authorization error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs
jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743354926 ## File path: core/src/main/scala/kafka/server/FetchSession.scala ## @@ -163,18 +173,35 @@ class CachedPartition(val topic: String, mustRespond } - override def hashCode: Int = Objects.hash(new TopicPartition(topic, partition), topicId) + /** + * We have different equality checks depending on whether topic IDs are used. + * This means we need a different hash function as well. We use name to calculate the hash if the ID is zero and unused. + * Otherwise, we use the topic ID in the hash calculation. + * + * @return the hash code for the CachedPartition depending on what request version we are using. + */ + override def hashCode: Int = if (topicId != Uuid.ZERO_UUID) (31 * partition) + topicId.hashCode else +(31 * partition) + topic.hashCode def canEqual(that: Any): Boolean = that.isInstanceOf[CachedPartition] + /** + * We have different equality checks depending on whether topic IDs are used. + * + * This is because when we use topic IDs, a partition with a given ID and an unknown name is the same as a partition with that + * ID and a known name. This means we can only use topic ID and partition when determining equality. + * + * On the other hand, if we are using topic names, all IDs are zero. This means we can only use topic name and partition + * when determining equality. + */ override def equals(that: Any): Boolean = that match { case that: CachedPartition => this.eq(that) || (that.canEqual(this) && Review comment: Hmm. I'm not quite sure why this would not make sense. I believe it is checking the types are correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs
jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743351017 ## File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala ## @@ -1361,102 +1542,113 @@ class FetchSessionTest { val resp4 = context2.updateAndGenerateResponseData(respData) assertEquals(Errors.NONE, resp4.error) assertEquals(resp1.sessionId, resp4.sessionId) -assertEquals(Utils.mkSet(tp1, tp2), resp4.responseData(topicNames, request2.version).keySet) +assertEquals(Utils.mkSet(tp1.topicPartition, tp2.topicPartition), resp4.responseData(topicNames, request2.version).keySet) } @Test def testDeprioritizesPartitionsWithRecordsOnly(): Unit = { val time = new MockTime() val cache = new FetchSessionCache(10, 1000) val fetchManager = new FetchManager(time, cache) -val tp1 = new TopicPartition("foo", 1) -val tp2 = new TopicPartition("bar", 2) -val tp3 = new TopicPartition("zar", 3) val topicIds = Map("foo" -> Uuid.randomUuid(), "bar" -> Uuid.randomUuid(), "zar" -> Uuid.randomUuid()).asJava val topicNames = topicIds.asScala.map(_.swap).asJava +val tp1 = new TopicIdPartition(topicIds.get("foo"), new TopicPartition("foo", 1)) +val tp2 = new TopicIdPartition(topicIds.get("bar"), new TopicPartition("bar", 2)) +val tp3 = new TopicIdPartition(topicIds.get("zar"), new TopicPartition("zar", 3)) -val reqData = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] -reqData.put(tp1, new FetchRequest.PartitionData(100, 0, 1000, Optional.of(5), Optional.of(4))) -reqData.put(tp2, new FetchRequest.PartitionData(100, 0, 1000, Optional.of(5), Optional.of(4))) -reqData.put(tp3, new FetchRequest.PartitionData(100, 0, 1000, Optional.of(5), Optional.of(4))) +val reqData = new util.LinkedHashMap[TopicIdPartition, FetchRequest.PartitionData] +reqData.put(tp1, new FetchRequest.PartitionData(topicIds.get("foo"), 100, 0, 1000, Optional.of(5), Optional.of(4))) +reqData.put(tp2, new FetchRequest.PartitionData(topicIds.get("bar"), 100, 0, 1000, Optional.of(5), Optional.of(4))) +reqData.put(tp3, new FetchRequest.PartitionData(topicIds.get("zar"), 100, 0, 1000, Optional.of(5), Optional.of(4))) // Full fetch context returns all partitions in the response val context1 = fetchManager.newContext(ApiKeys.FETCH.latestVersion(), JFetchMetadata.INITIAL, false, - reqData, Collections.emptyList(), topicIds) + reqData, Collections.emptyList(), topicNames) assertEquals(classOf[FullFetchContext], context1.getClass) -val respData1 = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData] +val respData1 = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData] respData1.put(tp1, new FetchResponseData.PartitionData() - .setPartitionIndex(tp1.partition) + .setPartitionIndex(tp1.topicPartition.partition) .setHighWatermark(50) .setLastStableOffset(50) .setLogStartOffset(0)) respData1.put(tp2, new FetchResponseData.PartitionData() - .setPartitionIndex(tp2.partition) + .setPartitionIndex(tp2.topicPartition.partition) .setHighWatermark(50) .setLastStableOffset(50) .setLogStartOffset(0)) respData1.put(tp3, new FetchResponseData.PartitionData() - .setPartitionIndex(tp3.partition) + .setPartitionIndex(tp3.topicPartition.partition) .setHighWatermark(50) .setLastStableOffset(50) .setLogStartOffset(0)) val resp1 = context1.updateAndGenerateResponseData(respData1) assertEquals(Errors.NONE, resp1.error) assertNotEquals(INVALID_SESSION_ID, resp1.sessionId) -assertEquals(Utils.mkSet(tp1, tp2, tp3), resp1.responseData(topicNames, ApiKeys.FETCH.latestVersion()).keySet()) +assertEquals(Utils.mkSet(tp1.topicPartition, tp2.topicPartition, tp3.topicPartition), resp1.responseData(topicNames, ApiKeys.FETCH.latestVersion()).keySet()) // Incremental fetch context returns partitions with changes but only deprioritizes // the partitions with records val context2 = fetchManager.newContext(ApiKeys.FETCH.latestVersion(), new JFetchMetadata(resp1.sessionId, 1), false, - reqData, Collections.emptyList(), topicIds) + reqData, Collections.emptyList(), topicNames) assertEquals(classOf[IncrementalFetchContext], context2.getClass) // Partitions are ordered in the session as per last response assertPartitionsOrder(context2, Seq(tp1, tp2, tp3)) // Response is empty -val respData2 = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData] +val respData2 = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData] val resp2 = context2.updateAndGenerateResponseData(respData2) assertEquals(Errors.NONE, resp2.error) assertEquals(resp1.sessionId, resp2.sessionId)
[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs
jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743350813 ## File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ## @@ -3530,37 +3534,37 @@ class KafkaApisTest { def testSizeOfThrottledPartitions(): Unit = { val topicNames = new util.HashMap[Uuid, String] val topicIds = new util.HashMap[String, Uuid]() -def fetchResponse(data: Map[TopicPartition, String]): FetchResponse = { - val responseData = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData]( +def fetchResponse(data: Map[TopicIdPartition, String]): FetchResponse = { + val responseData = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData]( data.map { case (tp, raw) => tp -> new FetchResponseData.PartitionData() -.setPartitionIndex(tp.partition) +.setPartitionIndex(tp.topicPartition.partition) .setHighWatermark(105) .setLastStableOffset(105) .setLogStartOffset(0) .setRecords(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(100, raw.getBytes(StandardCharsets.UTF_8 }.toMap.asJava) data.foreach{case (tp, _) => -val id = Uuid.randomUuid() -topicIds.put(tp.topic(), id) -topicNames.put(id, tp.topic()) +topicIds.put(tp.topicPartition.topic, tp.topicId) +topicNames.put(tp.topicId, tp.topicPartition.topic) } - FetchResponse.of(Errors.NONE, 100, 100, responseData, topicIds) + FetchResponse.of(Errors.NONE, 100, 100, responseData) } -val throttledPartition = new TopicPartition("throttledData", 0) +val throttledPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("throttledData", 0)) val throttledData = Map(throttledPartition -> "throttledData") val expectedSize = FetchResponse.sizeOf(FetchResponseData.HIGHEST_SUPPORTED_VERSION, - fetchResponse(throttledData).responseData(topicNames, FetchResponseData.HIGHEST_SUPPORTED_VERSION).entrySet.iterator, topicIds) + fetchResponse(throttledData).responseData(topicNames, FetchResponseData.HIGHEST_SUPPORTED_VERSION).entrySet.asScala.map( entry => + (new TopicIdPartition(Uuid.ZERO_UUID, entry.getKey), entry.getValue)).toMap.asJava.entrySet.iterator) -val response = fetchResponse(throttledData ++ Map(new TopicPartition("nonThrottledData", 0) -> "nonThrottledData")) +val response = fetchResponse(throttledData ++ Map(new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("nonThrottledData", 0)) -> "nonThrottledData")) val quota = Mockito.mock(classOf[ReplicationQuotaManager]) Mockito.when(quota.isThrottled(ArgumentMatchers.any(classOf[TopicPartition]))) - .thenAnswer(invocation => throttledPartition == invocation.getArgument(0).asInstanceOf[TopicPartition]) + .thenAnswer(invocation => throttledPartition.topicPartition == invocation.getArgument(0).asInstanceOf[TopicPartition]) -assertEquals(expectedSize, KafkaApis.sizeOfThrottledPartitions(FetchResponseData.HIGHEST_SUPPORTED_VERSION, response, quota, topicIds)) +assertEquals(expectedSize, KafkaApis.sizeOfThrottledPartitions(FetchResponseData.HIGHEST_SUPPORTED_VERSION, response, quota)) } @Test Review comment: What logic are we thinking? Checking that the unresolved topics are handled correctly? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs
jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743349370 ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ## @@ -4814,6 +4842,7 @@ private void buildDependencies(MetricConfig metricConfig, metadata = new ConsumerMetadata(0, metadataExpireMs, false, false, subscriptions, logContext, new ClusterResourceListeners()); client = new MockClient(time, metadata); +client.setNodeApiVersions(NodeApiVersions.create()); metrics = new Metrics(metricConfig, time); consumerClient = new ConsumerNetworkClient(logContext, client, metadata, time, 100, 1000, Integer.MAX_VALUE); Review comment: These tests changed from returning a top level error to partition level error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs
jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743349235 ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ## @@ -4814,6 +4842,7 @@ private void buildDependencies(MetricConfig metricConfig, metadata = new ConsumerMetadata(0, metadataExpireMs, false, false, subscriptions, logContext, new ClusterResourceListeners()); client = new MockClient(time, metadata); +client.setNodeApiVersions(NodeApiVersions.create()); metrics = new Metrics(metricConfig, time); consumerClient = new ConsumerNetworkClient(logContext, client, metadata, time, 100, 1000, Integer.MAX_VALUE); Review comment: Are you referring to how we changed UNKNOWN_TOPIC_ID and INCONSISTENT_TOPIC_ID? For these cases we have testFetchInconsistentTopicId and testFetchUnknownTopicId which check that we update the metadata for a partition level error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RivenSun2 commented on pull request #11461: KAFKA-13422: Add verification of duplicate configuration for each type of LoginModule in JaasConfigFile
RivenSun2 commented on pull request #11461: URL: https://github.com/apache/kafka/pull/11461#issuecomment-961575399 @guozhangwang @ijuma please help check this PR when available. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs
jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743243975 ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -374,7 +374,7 @@ abstract class AbstractFetcherThread(name: String, } } } catch { -case ime@( _: CorruptRecordException | _: InvalidRecordException) => +case ime@(_: CorruptRecordException | _: InvalidRecordException) => Review comment: > append when the controller fails over to an older IBP during an upgrade. I think I'm misunderstanding something here. Did you mean to say append? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs
jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743243975 ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -374,7 +374,7 @@ abstract class AbstractFetcherThread(name: String, } } } catch { -case ime@( _: CorruptRecordException | _: InvalidRecordException) => +case ime@(_: CorruptRecordException | _: InvalidRecordException) => Review comment: > append when the controller fails over to an older IBP during an upgrade. I think I'm misunderstanding something here. Did you mean to say append? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13375) Kafka streams apps w/EOS unable to start at InitProducerId
[ https://issues.apache.org/jira/browse/KAFKA-13375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17438957#comment-17438957 ] Lerh Chuan Low commented on KAFKA-13375: [~guozhang] Thank you for reading through the ticket! I hadn't seen that KIP yet and didn't realise this was a known issue, I'll digest. > Kafka streams apps w/EOS unable to start at InitProducerId > -- > > Key: KAFKA-13375 > URL: https://issues.apache.org/jira/browse/KAFKA-13375 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.8.0 >Reporter: Lerh Chuan Low >Priority: Major > > Hello, I'm wondering if this is a Kafka bug. Our environment setup is as > follows: > Kafka streams 2.8 - with *EXACTLY_ONCE* turned on (Not *EOS_BETA*, but I > don't think the changes introduced in EOS beta should affect this). > Transaction timeout = 60s. > Kafka broker 2.8. > We have this situation where we were doing a rolling restart of the broker to > apply some security changes. After we finished, 4 out of some 15 Stream Apps > are unable to start. They can never succeed, no matter what we do. > They fail with the error: > {code:java} > 2021-10-14 07:20:13,548 WARN > [srn-rec-feeder-802c18a1-9512-4a2a-8c2e-00e37550199d-StreamThread-3] > o.a.k.s.p.i.StreamsProducer stream-thread > [srn-rec-feeder-802c18a1-9512-4a2a-8c2e-00e37550199d-StreamThread-3] task > [0_6] Timeout exception caught trying to initialize transactions. The broker > is either slow or in bad state (like not having enough replicas) in > responding to the request, or the connection to broker was interrupted > sending the request or receiving the response. Will retry initializing the > task in the next loop. Consider overwriting max.block.ms to a larger value to > avoid timeout errors{code} > We found a previous Jira describing the issue here: > https://issues.apache.org/jira/browse/KAFKA-8803. It seems like back then > what people did was to rolling restart the brokers. We tried that - we > targeted the group coordinators for our failing apps, then transaction > coordinators, then all of them. It hasn't resolved our issue so far. > A few interesting things we've found so far: > - What I can see is that all the failing apps only fail on certain > partitions. E.g for the app above, only partition 6 never succeeds. Partition > 6 shares the same coordinator as some of the other partitions and those work, > so it seems like the issue isn't related to broker memory state. > - All the failing apps have a message similar to this > {code:java} > [2021-10-14 00:54:51,569] INFO [Transaction Marker Request Completion Handler > 103]: Sending srn-rec-feeder-0_6's transaction marker for partition > srn-bot-003-14 has permanently failed with error > org.apache.kafka.common.errors.InvalidProducerEpochException with the current > coordinator epoch 143; cancel sending any more transaction markers > TxnMarkerEntry{producerId=7001, producerEpoch=610, coordinatorEpoch=143, > result=ABORT, partitions=[srn-bot-003-14]} to the brokers > (kafka.coordinator.transaction.TransactionMarkerRequestCompletionHandler) > {code} > While we were restarting the brokers. They all failed shortly after. No other > consumer groups for the other working partitions/working stream apps logged > this message. > On digging around in git blame and reading through the source, it looks like > this is meant to be benign. > - We tried DEBUG logging for the TransactionCoordinator and > TransactionStateManager. We can see (assigner is a functioning app) > {code:java} > [2021-10-14 06:48:23,813] DEBUG [TransactionCoordinator id=105] Returning > CONCURRENT_TRANSACTIONS error code to client for srn-assigner-0_14's > AddPartitions request (kafka.coordinator.transaction.TransactionCoordinator) > {code} > I've seen those before during steady state. I do believe they are benign. We > never see it for the problematic partitions/consumer groups for some reason. > - We tried turning on TRACE for KafkaApis. We can see > {code:java} > [2021-10-14 06:56:58,408] TRACE [KafkaApi-105] Completed srn-rec-feeder-0_6's > InitProducerIdRequest with result > InitProducerIdResult(-1,-1,CONCURRENT_TRANSACTIONS) from client > srn-rec-feeder-802c18a1-9512-4a2a-8c2e-00e37550199d-StreamThread-4-0_6-producer. > (kafka.server.KafkaApis) {code} > It starts to make me wonder if there's a situation where Kafka is unable to > abort the transactions if there is never any success in initializing a > producer ID. But this is diving deep into insider knowledge territory that I > simply don't have. I'm wondering if anyone with more knowledge of how > transactions work can shed some light here if we are in the wrong path, or if > there's any way to restore operations at all short of a
[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs
jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743243111 ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ## @@ -2659,6 +2661,9 @@ private FetchResponse fetchResponse(TopicPartition partition, long fetchOffset, autoCommitIntervalMs, interceptors, throwOnStableOffsetNotSupported); +ApiVersions apiVersions = new ApiVersions(); +metadata.fetch().nodes().forEach(node -> +apiVersions.update(node.idString(), NodeApiVersions.create())); Review comment: Nope. Looks like another change I forgot to cleanup. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs
jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743242849 ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ## @@ -2069,10 +2071,10 @@ public void testReturnRecordsDuringRebalance() throws InterruptedException { ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); ConsumerPartitionAssignor assignor = new CooperativeStickyAssignor(); -KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); - initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1), Utils.mkEntry(topic2, 1), Utils.mkEntry(topic3, 1))); +KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); Review comment: It likely had something to do with how the mock client was handling metadata. But that may have been for the older version where we checked NodeApiVersion. I can try to switch it back. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs
jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743242346 ## File path: clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java ## @@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() { List partitions = Arrays.asList(0, 1); partitions.forEach(partition -> { String testType = partition == 0 ? "updating a partition" : "adding a new partition"; -Map topicIds = Collections.singletonMap("foo", Uuid.randomUuid()); +Uuid fooId = Uuid.randomUuid(); FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); FetchSessionHandler.Builder builder = handler.newBuilder(); -builder.add(new TopicPartition("foo", 0), topicIds.get("foo"), -new FetchRequest.PartitionData(0, 100, 200, Optional.empty())); +builder.add(new TopicPartition("foo", 0), +new FetchRequest.PartitionData(fooId, 0, 100, 200, Optional.empty())); FetchSessionHandler.FetchRequestData data = builder.build(); -assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)), +assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)), data.toSend(), data.sessionPartitions()); assertTrue(data.metadata().isFull()); assertTrue(data.canUseTopicIds()); FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, -respMap(new RespEntry("foo", 0, 10, 20)), topicIds); +respMap(new RespEntry("foo", 0, fooId, 10, 20))); handler.handleResponse(resp, ApiKeys.FETCH.latestVersion()); // Try to remove a topic ID from an existing topic partition (0) or add a new topic partition (1) without an ID. FetchSessionHandler.Builder builder2 = handler.newBuilder(); -builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID, -new FetchRequest.PartitionData(10, 110, 210, Optional.empty())); +builder2.add(new TopicPartition("foo", partition), +new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 210, Optional.empty())); FetchSessionHandler.FetchRequestData data2 = builder2.build(); -// Should have the same session ID and next epoch, but can no longer use topic IDs. -// The receiving broker will close the session if we were previously using topic IDs. +// Should have the same session ID, and next epoch and can no longer use topic IDs. +// The receiving broker will handle closing the session. assertEquals(123, data2.metadata().sessionId(), "Did not use same session when " + testType); assertEquals(1, data2.metadata().epoch(), "Did not have correct epoch when " + testType); assertFalse(data2.canUseTopicIds()); }); } +@ParameterizedTest +@ValueSource(booleans = {true, false}) +public void testTopicIdReplaced(boolean fetchRequestUsesIds) { +TopicPartition tp = new TopicPartition("foo", 0); +FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); +FetchSessionHandler.Builder builder = handler.newBuilder(); +Uuid topicId1 = Uuid.randomUuid(); +builder.add(tp, +new FetchRequest.PartitionData(topicId1, 0, 100, 200, Optional.empty())); +FetchSessionHandler.FetchRequestData data = builder.build(); +assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)), +data.toSend(), data.sessionPartitions()); +assertTrue(data.metadata().isFull()); +assertTrue(data.canUseTopicIds()); + +FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, +respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20))); +handler.handleResponse(resp, (short) 12); + +// Try to add a new topic ID. +FetchSessionHandler.Builder builder2 = handler.newBuilder(); +Uuid topicId2 = Uuid.randomUuid(); +// Use the same data besides the topic ID. +FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty()); +builder2.add(tp, partitionData); +FetchSessionHandler.FetchRequestData data2 = builder2.build(); +// The old topic ID partition should be in toReplace, and the new one should be in toSend. +assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)), +data2.toSend(), data2.sessionPartitions()); +assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, tp)), data2.toReplace()); +// Should have the same session ID, and next epoch and can use topic IDs. +assertEquals(123, data2.metadata().sessionId(), "Did not use same
[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs
jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743241513 ## File path: clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java ## @@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() { List partitions = Arrays.asList(0, 1); partitions.forEach(partition -> { String testType = partition == 0 ? "updating a partition" : "adding a new partition"; -Map topicIds = Collections.singletonMap("foo", Uuid.randomUuid()); +Uuid fooId = Uuid.randomUuid(); FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); FetchSessionHandler.Builder builder = handler.newBuilder(); -builder.add(new TopicPartition("foo", 0), topicIds.get("foo"), -new FetchRequest.PartitionData(0, 100, 200, Optional.empty())); +builder.add(new TopicPartition("foo", 0), +new FetchRequest.PartitionData(fooId, 0, 100, 200, Optional.empty())); FetchSessionHandler.FetchRequestData data = builder.build(); -assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)), +assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)), data.toSend(), data.sessionPartitions()); assertTrue(data.metadata().isFull()); assertTrue(data.canUseTopicIds()); FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, -respMap(new RespEntry("foo", 0, 10, 20)), topicIds); +respMap(new RespEntry("foo", 0, fooId, 10, 20))); handler.handleResponse(resp, ApiKeys.FETCH.latestVersion()); // Try to remove a topic ID from an existing topic partition (0) or add a new topic partition (1) without an ID. FetchSessionHandler.Builder builder2 = handler.newBuilder(); -builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID, -new FetchRequest.PartitionData(10, 110, 210, Optional.empty())); +builder2.add(new TopicPartition("foo", partition), +new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 210, Optional.empty())); FetchSessionHandler.FetchRequestData data2 = builder2.build(); -// Should have the same session ID and next epoch, but can no longer use topic IDs. -// The receiving broker will close the session if we were previously using topic IDs. +// Should have the same session ID, and next epoch and can no longer use topic IDs. +// The receiving broker will handle closing the session. assertEquals(123, data2.metadata().sessionId(), "Did not use same session when " + testType); assertEquals(1, data2.metadata().epoch(), "Did not have correct epoch when " + testType); assertFalse(data2.canUseTopicIds()); }); } +@ParameterizedTest +@ValueSource(booleans = {true, false}) +public void testTopicIdReplaced(boolean fetchRequestUsesIds) { +TopicPartition tp = new TopicPartition("foo", 0); +FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); +FetchSessionHandler.Builder builder = handler.newBuilder(); +Uuid topicId1 = Uuid.randomUuid(); +builder.add(tp, +new FetchRequest.PartitionData(topicId1, 0, 100, 200, Optional.empty())); +FetchSessionHandler.FetchRequestData data = builder.build(); +assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)), +data.toSend(), data.sessionPartitions()); +assertTrue(data.metadata().isFull()); +assertTrue(data.canUseTopicIds()); + +FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, +respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20))); +handler.handleResponse(resp, (short) 12); + +// Try to add a new topic ID. +FetchSessionHandler.Builder builder2 = handler.newBuilder(); +Uuid topicId2 = Uuid.randomUuid(); +// Use the same data besides the topic ID. +FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty()); +builder2.add(tp, partitionData); +FetchSessionHandler.FetchRequestData data2 = builder2.build(); +// The old topic ID partition should be in toReplace, and the new one should be in toSend. +assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)), +data2.toSend(), data2.sessionPartitions()); +assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, tp)), data2.toReplace()); +// Should have the same session ID, and next epoch and can use topic IDs. +assertEquals(123, data2.metadata().sessionId(), "Did not use same
[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs
jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743241204 ## File path: clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java ## @@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() { List partitions = Arrays.asList(0, 1); partitions.forEach(partition -> { String testType = partition == 0 ? "updating a partition" : "adding a new partition"; -Map topicIds = Collections.singletonMap("foo", Uuid.randomUuid()); +Uuid fooId = Uuid.randomUuid(); FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); FetchSessionHandler.Builder builder = handler.newBuilder(); -builder.add(new TopicPartition("foo", 0), topicIds.get("foo"), -new FetchRequest.PartitionData(0, 100, 200, Optional.empty())); +builder.add(new TopicPartition("foo", 0), +new FetchRequest.PartitionData(fooId, 0, 100, 200, Optional.empty())); FetchSessionHandler.FetchRequestData data = builder.build(); -assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)), +assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)), data.toSend(), data.sessionPartitions()); assertTrue(data.metadata().isFull()); assertTrue(data.canUseTopicIds()); FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, -respMap(new RespEntry("foo", 0, 10, 20)), topicIds); +respMap(new RespEntry("foo", 0, fooId, 10, 20))); handler.handleResponse(resp, ApiKeys.FETCH.latestVersion()); // Try to remove a topic ID from an existing topic partition (0) or add a new topic partition (1) without an ID. FetchSessionHandler.Builder builder2 = handler.newBuilder(); -builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID, -new FetchRequest.PartitionData(10, 110, 210, Optional.empty())); +builder2.add(new TopicPartition("foo", partition), +new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 210, Optional.empty())); FetchSessionHandler.FetchRequestData data2 = builder2.build(); -// Should have the same session ID and next epoch, but can no longer use topic IDs. -// The receiving broker will close the session if we were previously using topic IDs. +// Should have the same session ID, and next epoch and can no longer use topic IDs. +// The receiving broker will handle closing the session. assertEquals(123, data2.metadata().sessionId(), "Did not use same session when " + testType); assertEquals(1, data2.metadata().epoch(), "Did not have correct epoch when " + testType); assertFalse(data2.canUseTopicIds()); }); } +@ParameterizedTest +@ValueSource(booleans = {true, false}) +public void testTopicIdReplaced(boolean fetchRequestUsesIds) { +TopicPartition tp = new TopicPartition("foo", 0); +FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); +FetchSessionHandler.Builder builder = handler.newBuilder(); +Uuid topicId1 = Uuid.randomUuid(); +builder.add(tp, +new FetchRequest.PartitionData(topicId1, 0, 100, 200, Optional.empty())); +FetchSessionHandler.FetchRequestData data = builder.build(); +assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)), +data.toSend(), data.sessionPartitions()); +assertTrue(data.metadata().isFull()); +assertTrue(data.canUseTopicIds()); + +FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, +respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20))); +handler.handleResponse(resp, (short) 12); + +// Try to add a new topic ID. +FetchSessionHandler.Builder builder2 = handler.newBuilder(); +Uuid topicId2 = Uuid.randomUuid(); +// Use the same data besides the topic ID. +FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty()); +builder2.add(tp, partitionData); +FetchSessionHandler.FetchRequestData data2 = builder2.build(); +// The old topic ID partition should be in toReplace, and the new one should be in toSend. +assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)), +data2.toSend(), data2.sessionPartitions()); +assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, tp)), data2.toReplace()); Review comment: I could theoretically check replace in the other test that checks multiple scenarios -- This is an automated message from the
[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs
jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743240526 ## File path: clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java ## @@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() { List partitions = Arrays.asList(0, 1); partitions.forEach(partition -> { String testType = partition == 0 ? "updating a partition" : "adding a new partition"; -Map topicIds = Collections.singletonMap("foo", Uuid.randomUuid()); +Uuid fooId = Uuid.randomUuid(); FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); FetchSessionHandler.Builder builder = handler.newBuilder(); -builder.add(new TopicPartition("foo", 0), topicIds.get("foo"), -new FetchRequest.PartitionData(0, 100, 200, Optional.empty())); +builder.add(new TopicPartition("foo", 0), +new FetchRequest.PartitionData(fooId, 0, 100, 200, Optional.empty())); FetchSessionHandler.FetchRequestData data = builder.build(); -assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)), +assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)), data.toSend(), data.sessionPartitions()); assertTrue(data.metadata().isFull()); assertTrue(data.canUseTopicIds()); FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, -respMap(new RespEntry("foo", 0, 10, 20)), topicIds); +respMap(new RespEntry("foo", 0, fooId, 10, 20))); handler.handleResponse(resp, ApiKeys.FETCH.latestVersion()); // Try to remove a topic ID from an existing topic partition (0) or add a new topic partition (1) without an ID. FetchSessionHandler.Builder builder2 = handler.newBuilder(); -builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID, -new FetchRequest.PartitionData(10, 110, 210, Optional.empty())); +builder2.add(new TopicPartition("foo", partition), +new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 210, Optional.empty())); FetchSessionHandler.FetchRequestData data2 = builder2.build(); -// Should have the same session ID and next epoch, but can no longer use topic IDs. -// The receiving broker will close the session if we were previously using topic IDs. +// Should have the same session ID, and next epoch and can no longer use topic IDs. +// The receiving broker will handle closing the session. assertEquals(123, data2.metadata().sessionId(), "Did not use same session when " + testType); assertEquals(1, data2.metadata().epoch(), "Did not have correct epoch when " + testType); assertFalse(data2.canUseTopicIds()); }); } +@ParameterizedTest +@ValueSource(booleans = {true, false}) +public void testTopicIdReplaced(boolean fetchRequestUsesIds) { +TopicPartition tp = new TopicPartition("foo", 0); +FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); +FetchSessionHandler.Builder builder = handler.newBuilder(); +Uuid topicId1 = Uuid.randomUuid(); +builder.add(tp, +new FetchRequest.PartitionData(topicId1, 0, 100, 200, Optional.empty())); +FetchSessionHandler.FetchRequestData data = builder.build(); +assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)), +data.toSend(), data.sessionPartitions()); +assertTrue(data.metadata().isFull()); +assertTrue(data.canUseTopicIds()); + +FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, +respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20))); +handler.handleResponse(resp, (short) 12); + +// Try to add a new topic ID. +FetchSessionHandler.Builder builder2 = handler.newBuilder(); +Uuid topicId2 = Uuid.randomUuid(); +// Use the same data besides the topic ID. +FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty()); +builder2.add(tp, partitionData); +FetchSessionHandler.FetchRequestData data2 = builder2.build(); +// The old topic ID partition should be in toReplace, and the new one should be in toSend. +assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)), +data2.toSend(), data2.sessionPartitions()); +assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, tp)), data2.toReplace()); Review comment: To clarify -- are you referring to a case where we upgraded? ie, it started with no ID in the first request and added one in the
[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs
jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743240074 ## File path: clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java ## @@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() { List partitions = Arrays.asList(0, 1); partitions.forEach(partition -> { String testType = partition == 0 ? "updating a partition" : "adding a new partition"; -Map topicIds = Collections.singletonMap("foo", Uuid.randomUuid()); +Uuid fooId = Uuid.randomUuid(); FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); FetchSessionHandler.Builder builder = handler.newBuilder(); -builder.add(new TopicPartition("foo", 0), topicIds.get("foo"), -new FetchRequest.PartitionData(0, 100, 200, Optional.empty())); +builder.add(new TopicPartition("foo", 0), +new FetchRequest.PartitionData(fooId, 0, 100, 200, Optional.empty())); FetchSessionHandler.FetchRequestData data = builder.build(); -assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)), +assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)), data.toSend(), data.sessionPartitions()); assertTrue(data.metadata().isFull()); assertTrue(data.canUseTopicIds()); FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, -respMap(new RespEntry("foo", 0, 10, 20)), topicIds); +respMap(new RespEntry("foo", 0, fooId, 10, 20))); handler.handleResponse(resp, ApiKeys.FETCH.latestVersion()); // Try to remove a topic ID from an existing topic partition (0) or add a new topic partition (1) without an ID. FetchSessionHandler.Builder builder2 = handler.newBuilder(); -builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID, -new FetchRequest.PartitionData(10, 110, 210, Optional.empty())); +builder2.add(new TopicPartition("foo", partition), +new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 210, Optional.empty())); FetchSessionHandler.FetchRequestData data2 = builder2.build(); -// Should have the same session ID and next epoch, but can no longer use topic IDs. -// The receiving broker will close the session if we were previously using topic IDs. +// Should have the same session ID, and next epoch and can no longer use topic IDs. +// The receiving broker will handle closing the session. assertEquals(123, data2.metadata().sessionId(), "Did not use same session when " + testType); assertEquals(1, data2.metadata().epoch(), "Did not have correct epoch when " + testType); assertFalse(data2.canUseTopicIds()); }); } +@ParameterizedTest +@ValueSource(booleans = {true, false}) +public void testTopicIdReplaced(boolean fetchRequestUsesIds) { +TopicPartition tp = new TopicPartition("foo", 0); +FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); +FetchSessionHandler.Builder builder = handler.newBuilder(); +Uuid topicId1 = Uuid.randomUuid(); +builder.add(tp, +new FetchRequest.PartitionData(topicId1, 0, 100, 200, Optional.empty())); +FetchSessionHandler.FetchRequestData data = builder.build(); +assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)), +data.toSend(), data.sessionPartitions()); +assertTrue(data.metadata().isFull()); +assertTrue(data.canUseTopicIds()); + +FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, +respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20))); +handler.handleResponse(resp, (short) 12); Review comment: ah good catch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs
jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743239514 ## File path: clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java ## @@ -202,29 +203,30 @@ public void testSessionless() { FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); FetchSessionHandler.Builder builder = handler.newBuilder(); addTopicId(topicIds, topicNames, "foo", version); -builder.add(new TopicPartition("foo", 0), topicIds.getOrDefault("foo", Uuid.ZERO_UUID), -new FetchRequest.PartitionData(0, 100, 200, Optional.empty())); -builder.add(new TopicPartition("foo", 1), topicIds.getOrDefault("foo", Uuid.ZERO_UUID), -new FetchRequest.PartitionData(10, 110, 210, Optional.empty())); +Uuid fooId = topicIds.getOrDefault("foo", Uuid.ZERO_UUID); +builder.add(new TopicPartition("foo", 0), +new FetchRequest.PartitionData(fooId, 0, 100, 200, Optional.empty())); Review comment: I moved some back. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #11429: KAFKA-13396: allow create topic without partition/replicaFactor
ijuma commented on pull request #11429: URL: https://github.com/apache/kafka/pull/11429#issuecomment-961367197 @dajac Makes sense, pushed to trunk, 3.1 and 3.0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11467: MINOR: fix java doc in kafkaProducer
dajac commented on pull request #11467: URL: https://github.com/apache/kafka/pull/11467#issuecomment-961343975 @showuon Thanks. I will take a look soon. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11429: KAFKA-13396: allow create topic without partition/replicaFactor
dajac commented on pull request #11429: URL: https://github.com/apache/kafka/pull/11429#issuecomment-961343546 @ijuma Should we pick this one to the 3.1 branch as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request #11468: WIP
dajac opened a new pull request #11468: URL: https://github.com/apache/kafka/pull/11468 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bdesert commented on a change in pull request #11401: KAFKA-13255: use exclude filter for new topics
bdesert commented on a change in pull request #11401: URL: https://github.com/apache/kafka/pull/11401#discussion_r743062367 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java ## @@ -152,6 +155,48 @@ public void testConfigPropertyFiltering() { .anyMatch(x -> x.name().equals("min.insync.replicas")), "should not replicate excluded properties"); } +@Test +public void testNewTopicConfigs() throws Exception { +Map filterConfig = new HashMap<>(); + filterConfig.put(DefaultConfigPropertyFilter.CONFIG_PROPERTIES_EXCLUDE_CONFIG, "follower\\.replication\\.throttled\\.replicas, " ++ "leader\\.replication\\.throttled\\.replicas, " ++ "message\\.timestamp\\.difference\\.max\\.ms, " ++ "message\\.timestamp\\.type, " ++ "unclean\\.leader\\.election\\.enable, " ++ "min\\.insync\\.replicas," ++ "exclude_param.*"); +DefaultConfigPropertyFilter filter = new DefaultConfigPropertyFilter(); +filter.configure(filterConfig); + +MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"), +new DefaultReplicationPolicy(), x -> true, filter)); + +final String topic = "testtopic"; +List entries = new ArrayList<>(); +entries.add(new ConfigEntry("name-1", "value-1")); +entries.add(new ConfigEntry("exclude_param.param1", "value-param1")); +entries.add(new ConfigEntry("min.insync.replicas", "2")); +Config config = new Config(entries); +doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any()); +doAnswer(invocation -> { +Map newTopics = invocation.getArgument(0); +assertNotNull(newTopics.get("source." + topic)); +Map targetConfig = newTopics.get("source." + topic).configs(); + +// property 'name-1' isn't defined in the exclude filter -> should be replicated +assertNotNull(targetConfig.get("name-1"), "should replicate properties"); + +// this property is in default list, just double check it: +String prop1 = "min.insync.replicas"; +assertNull(targetConfig.get(prop1), "should not replicate excluded properties " + prop1); +// this property is only in exclude filter custom parameter, also tests regex on the way: +String prop2 = "exclude_param.param1"; +assertNull(targetConfig.get(prop2), "should not replicate excluded properties " + prop2); +return null; +}).when(connector).createNewTopics(any()); +connector.createNewTopics(Collections.singleton(topic), Collections.singletonMap(topic, 1L)); +} Review comment: agree. with a small correction of `createNewTopics(any(), any())`. since the `targetConfig()` is being called from overloaded method with two params. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs
dajac commented on pull request #11331: URL: https://github.com/apache/kafka/pull/11331#issuecomment-961259072 @jolshan It seems that there are a few compilation errors, at least for `JDK 8 and Scala 2.12`. Could you check? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs
dajac commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743046927 ## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ## @@ -66,16 +69,28 @@ public PartitionData( int maxBytes, Optional currentLeaderEpoch ) { -this(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch, Optional.empty()); +this(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch, Optional.empty()); Review comment: Yeah, that's a good question. I guess that that constructor is convenient for tests but might be bug prone in the regular code. I am tempted to remove it entirely What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs
dajac commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743046003 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -708,40 +701,41 @@ class KafkaApis(val requestChannel: RequestChannel, None } -val erroneous = mutable.ArrayBuffer[(TopicPartition, FetchResponseData.PartitionData)]() -val interesting = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]() -val sessionTopicIds = mutable.Map[String, Uuid]() +val erroneous = mutable.ArrayBuffer[(TopicIdPartition, FetchResponseData.PartitionData)]() +val interesting = mutable.ArrayBuffer[(TopicIdPartition, FetchRequest.PartitionData)]() if (fetchRequest.isFromFollower) { // The follower must have ClusterAction on ClusterResource in order to fetch partition data. if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) { -fetchContext.foreachPartition { (topicPartition, topicId, data) => - sessionTopicIds.put(topicPartition.topic(), topicId) - if (!metadataCache.contains(topicPartition)) -erroneous += topicPartition -> FetchResponse.partitionResponse(topicPartition.partition, Errors.UNKNOWN_TOPIC_OR_PARTITION) +fetchContext.foreachPartition { (topicIdPartition, data) => + if (topicIdPartition.topicPartition.topic == null ) +erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID) + else if (!metadataCache.contains(topicIdPartition.topicPartition)) +erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION) else -interesting += (topicPartition -> data) +interesting += (topicIdPartition -> data) } } else { -fetchContext.foreachPartition { (part, topicId, _) => - sessionTopicIds.put(part.topic(), topicId) - erroneous += part -> FetchResponse.partitionResponse(part.partition, Errors.TOPIC_AUTHORIZATION_FAILED) +fetchContext.foreachPartition { (topicIdPartition, _) => + erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.TOPIC_AUTHORIZATION_FAILED) Review comment: I guess that it does not change much in the end. I was considering this in order to be consistent with how we handle this for the consumer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs
dajac commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743044970 ## File path: core/src/main/scala/kafka/server/FetchSession.scala ## @@ -163,18 +173,35 @@ class CachedPartition(val topic: String, mustRespond } - override def hashCode: Int = Objects.hash(new TopicPartition(topic, partition), topicId) + /** + * We have different equality checks depending on whether topic IDs are used. + * This means we need a different hash function as well. We use name to calculate the hash if the ID is zero and unused. + * Otherwise, we use the topic ID in the hash calculation. + * + * @return the hash code for the CachedPartition depending on what request version we are using. + */ + override def hashCode: Int = if (topicId != Uuid.ZERO_UUID) (31 * partition) + topicId.hashCode else +(31 * partition) + topic.hashCode def canEqual(that: Any): Boolean = that.isInstanceOf[CachedPartition] + /** + * We have different equality checks depending on whether topic IDs are used. + * + * This is because when we use topic IDs, a partition with a given ID and an unknown name is the same as a partition with that + * ID and a known name. This means we can only use topic ID and partition when determining equality. + * + * On the other hand, if we are using topic names, all IDs are zero. This means we can only use topic name and partition + * when determining equality. + */ override def equals(that: Any): Boolean = that match { case that: CachedPartition => this.eq(that) || (that.canEqual(this) && Review comment: Right. It seems to be that the `canEqual(this)` does not make any sense here. Could you double check? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs
dajac commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743044369 ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -374,7 +374,7 @@ abstract class AbstractFetcherThread(name: String, } } } catch { -case ime@( _: CorruptRecordException | _: InvalidRecordException) => +case ime@(_: CorruptRecordException | _: InvalidRecordException) => Review comment: I think that would for instance append when the controller fails over to an older IBP during an upgrade. This should remove the topic ids which means that v12 will be used for the next fetch request and trigger a FETCH_SESSION_TOPIC_ID_ERROR. In this particular case, re-trying directly would be the optimal way to proceed for a follower. I wonder if they are other cases to consider here. For the consumer, it is definitely different. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs
dajac commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743041695 ## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ## @@ -285,52 +268,57 @@ public FetchRequestData build() { if (nextMetadata.isFull()) { if (log.isDebugEnabled()) { log.debug("Built full fetch {} for node {} with {}.", - nextMetadata, node, partitionsToLogString(next.keySet())); +nextMetadata, node, topicPartitionsToLogString(next.keySet())); } sessionPartitions = next; next = null; +Map toSend = +Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions)); // Only add topic IDs to the session if we are using topic IDs. if (canUseTopicIds) { -sessionTopicIds = topicIds; -sessionTopicNames = new HashMap<>(topicIds.size()); -topicIds.forEach((name, id) -> sessionTopicNames.put(id, name)); +Map> newTopicNames = sessionPartitions.entrySet().stream().collect(Collectors.groupingByConcurrent(entry -> entry.getValue().topicId, +Collectors.mapping(entry -> entry.getKey().topic(), Collectors.toSet(; Review comment: I think that the grouping is slower because it has to allocate another Map, Sets for each Uuid, etc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs
dajac commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743011890 ## File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala ## @@ -1361,102 +1542,113 @@ class FetchSessionTest { val resp4 = context2.updateAndGenerateResponseData(respData) assertEquals(Errors.NONE, resp4.error) assertEquals(resp1.sessionId, resp4.sessionId) -assertEquals(Utils.mkSet(tp1, tp2), resp4.responseData(topicNames, request2.version).keySet) +assertEquals(Utils.mkSet(tp1.topicPartition, tp2.topicPartition), resp4.responseData(topicNames, request2.version).keySet) } @Test def testDeprioritizesPartitionsWithRecordsOnly(): Unit = { val time = new MockTime() val cache = new FetchSessionCache(10, 1000) val fetchManager = new FetchManager(time, cache) -val tp1 = new TopicPartition("foo", 1) -val tp2 = new TopicPartition("bar", 2) -val tp3 = new TopicPartition("zar", 3) val topicIds = Map("foo" -> Uuid.randomUuid(), "bar" -> Uuid.randomUuid(), "zar" -> Uuid.randomUuid()).asJava val topicNames = topicIds.asScala.map(_.swap).asJava +val tp1 = new TopicIdPartition(topicIds.get("foo"), new TopicPartition("foo", 1)) +val tp2 = new TopicIdPartition(topicIds.get("bar"), new TopicPartition("bar", 2)) +val tp3 = new TopicIdPartition(topicIds.get("zar"), new TopicPartition("zar", 3)) -val reqData = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] -reqData.put(tp1, new FetchRequest.PartitionData(100, 0, 1000, Optional.of(5), Optional.of(4))) -reqData.put(tp2, new FetchRequest.PartitionData(100, 0, 1000, Optional.of(5), Optional.of(4))) -reqData.put(tp3, new FetchRequest.PartitionData(100, 0, 1000, Optional.of(5), Optional.of(4))) +val reqData = new util.LinkedHashMap[TopicIdPartition, FetchRequest.PartitionData] +reqData.put(tp1, new FetchRequest.PartitionData(topicIds.get("foo"), 100, 0, 1000, Optional.of(5), Optional.of(4))) +reqData.put(tp2, new FetchRequest.PartitionData(topicIds.get("bar"), 100, 0, 1000, Optional.of(5), Optional.of(4))) +reqData.put(tp3, new FetchRequest.PartitionData(topicIds.get("zar"), 100, 0, 1000, Optional.of(5), Optional.of(4))) // Full fetch context returns all partitions in the response val context1 = fetchManager.newContext(ApiKeys.FETCH.latestVersion(), JFetchMetadata.INITIAL, false, - reqData, Collections.emptyList(), topicIds) + reqData, Collections.emptyList(), topicNames) assertEquals(classOf[FullFetchContext], context1.getClass) -val respData1 = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData] +val respData1 = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData] respData1.put(tp1, new FetchResponseData.PartitionData() - .setPartitionIndex(tp1.partition) + .setPartitionIndex(tp1.topicPartition.partition) .setHighWatermark(50) .setLastStableOffset(50) .setLogStartOffset(0)) respData1.put(tp2, new FetchResponseData.PartitionData() - .setPartitionIndex(tp2.partition) + .setPartitionIndex(tp2.topicPartition.partition) .setHighWatermark(50) .setLastStableOffset(50) .setLogStartOffset(0)) respData1.put(tp3, new FetchResponseData.PartitionData() - .setPartitionIndex(tp3.partition) + .setPartitionIndex(tp3.topicPartition.partition) .setHighWatermark(50) .setLastStableOffset(50) .setLogStartOffset(0)) val resp1 = context1.updateAndGenerateResponseData(respData1) assertEquals(Errors.NONE, resp1.error) assertNotEquals(INVALID_SESSION_ID, resp1.sessionId) -assertEquals(Utils.mkSet(tp1, tp2, tp3), resp1.responseData(topicNames, ApiKeys.FETCH.latestVersion()).keySet()) +assertEquals(Utils.mkSet(tp1.topicPartition, tp2.topicPartition, tp3.topicPartition), resp1.responseData(topicNames, ApiKeys.FETCH.latestVersion()).keySet()) // Incremental fetch context returns partitions with changes but only deprioritizes // the partitions with records val context2 = fetchManager.newContext(ApiKeys.FETCH.latestVersion(), new JFetchMetadata(resp1.sessionId, 1), false, - reqData, Collections.emptyList(), topicIds) + reqData, Collections.emptyList(), topicNames) assertEquals(classOf[IncrementalFetchContext], context2.getClass) // Partitions are ordered in the session as per last response assertPartitionsOrder(context2, Seq(tp1, tp2, tp3)) // Response is empty -val respData2 = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData] +val respData2 = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData] val resp2 = context2.updateAndGenerateResponseData(respData2) assertEquals(Errors.NONE, resp2.error) assertEquals(resp1.sessionId, resp2.sessionId)
[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs
dajac commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r742933244 ## File path: clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java ## @@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() { List partitions = Arrays.asList(0, 1); partitions.forEach(partition -> { String testType = partition == 0 ? "updating a partition" : "adding a new partition"; -Map topicIds = Collections.singletonMap("foo", Uuid.randomUuid()); +Uuid fooId = Uuid.randomUuid(); FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); FetchSessionHandler.Builder builder = handler.newBuilder(); -builder.add(new TopicPartition("foo", 0), topicIds.get("foo"), -new FetchRequest.PartitionData(0, 100, 200, Optional.empty())); +builder.add(new TopicPartition("foo", 0), +new FetchRequest.PartitionData(fooId, 0, 100, 200, Optional.empty())); FetchSessionHandler.FetchRequestData data = builder.build(); -assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)), +assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)), data.toSend(), data.sessionPartitions()); assertTrue(data.metadata().isFull()); assertTrue(data.canUseTopicIds()); FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, -respMap(new RespEntry("foo", 0, 10, 20)), topicIds); +respMap(new RespEntry("foo", 0, fooId, 10, 20))); handler.handleResponse(resp, ApiKeys.FETCH.latestVersion()); // Try to remove a topic ID from an existing topic partition (0) or add a new topic partition (1) without an ID. FetchSessionHandler.Builder builder2 = handler.newBuilder(); -builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID, -new FetchRequest.PartitionData(10, 110, 210, Optional.empty())); +builder2.add(new TopicPartition("foo", partition), +new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 210, Optional.empty())); FetchSessionHandler.FetchRequestData data2 = builder2.build(); -// Should have the same session ID and next epoch, but can no longer use topic IDs. -// The receiving broker will close the session if we were previously using topic IDs. +// Should have the same session ID, and next epoch and can no longer use topic IDs. +// The receiving broker will handle closing the session. assertEquals(123, data2.metadata().sessionId(), "Did not use same session when " + testType); assertEquals(1, data2.metadata().epoch(), "Did not have correct epoch when " + testType); assertFalse(data2.canUseTopicIds()); }); } +@ParameterizedTest +@ValueSource(booleans = {true, false}) +public void testTopicIdReplaced(boolean fetchRequestUsesIds) { +TopicPartition tp = new TopicPartition("foo", 0); +FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); +FetchSessionHandler.Builder builder = handler.newBuilder(); +Uuid topicId1 = Uuid.randomUuid(); +builder.add(tp, +new FetchRequest.PartitionData(topicId1, 0, 100, 200, Optional.empty())); +FetchSessionHandler.FetchRequestData data = builder.build(); +assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)), +data.toSend(), data.sessionPartitions()); +assertTrue(data.metadata().isFull()); +assertTrue(data.canUseTopicIds()); + +FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, +respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20))); +handler.handleResponse(resp, (short) 12); + +// Try to add a new topic ID. +FetchSessionHandler.Builder builder2 = handler.newBuilder(); +Uuid topicId2 = Uuid.randomUuid(); +// Use the same data besides the topic ID. +FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty()); +builder2.add(tp, partitionData); +FetchSessionHandler.FetchRequestData data2 = builder2.build(); +// The old topic ID partition should be in toReplace, and the new one should be in toSend. +assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)), +data2.toSend(), data2.sessionPartitions()); +assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, tp)), data2.toReplace()); +// Should have the same session ID, and next epoch and can use topic IDs. +assertEquals(123, data2.metadata().sessionId(), "Did not use same
[GitHub] [kafka] ijuma merged pull request #11429: KAFKA-13396: allow create topic without partition/replicaFactor
ijuma merged pull request #11429: URL: https://github.com/apache/kafka/pull/11429 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11429: KAFKA-13396: allow create topic without partition/replicaFactor
showuon commented on pull request #11429: URL: https://github.com/apache/kafka/pull/11429#issuecomment-960951629 Yes, cherry-pick back to v3.0.1, too, please. Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #11429: KAFKA-13396: allow create topic without partition/replicaFactor
ijuma commented on pull request #11429: URL: https://github.com/apache/kafka/pull/11429#issuecomment-960945328 @showuon we should have this in 3.0.1 too, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11467: MINOR: fix java doc in kafkaProducer
showuon commented on pull request #11467: URL: https://github.com/apache/kafka/pull/11467#issuecomment-960867632 @dajac , please help review this java doc update, and I think this fix should put into V3.1.0. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11467: MINOR: fix java doc in kafkaProducer
showuon commented on a change in pull request #11467: URL: https://github.com/apache/kafka/pull/11467#discussion_r742808430 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ## @@ -100,8 +100,6 @@ * {@code * Properties props = new Properties(); * props.put("bootstrap.servers", "localhost:9092"); - * props.put("acks", "all"); - * props.put("retries", 0); Review comment: In V3.0, `acks` is default to `all`, and `retires` is not recommended to set to 0 after [KIP-91](https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer) . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11467: MINOR: fix java doc in kafkaProducer
showuon commented on a change in pull request #11467: URL: https://github.com/apache/kafka/pull/11467#discussion_r742808430 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ## @@ -100,8 +100,6 @@ * {@code * Properties props = new Properties(); * props.put("bootstrap.servers", "localhost:9092"); - * props.put("acks", "all"); - * props.put("retries", 0); Review comment: In V3.0, `acks` is default to `all`, and `retires` is not recommended to set to 0 after KIP-91(https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11467: MINOR: fix java doc in kafkaProducer
showuon commented on a change in pull request #11467: URL: https://github.com/apache/kafka/pull/11467#discussion_r742807104 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ## @@ -117,14 +115,15 @@ * as well as a background I/O thread that is responsible for turning these records into requests and transmitting them * to the cluster. Failure to close the producer after use will leak these resources. * - * The {@link #send(ProducerRecord) send()} method is asynchronous. When called it adds the record to a buffer of pending record sends + * The {@link #send(ProducerRecord) send()} method is asynchronous. When called, it adds the record to a buffer of pending sending record * and immediately returns. This allows the producer to batch together individual records for efficiency. * * The acks config controls the criteria under which requests are considered complete. The "all" setting * we have specified will result in blocking on the full commit of the record, the slowest but most durable setting. * - * If the request fails, the producer can automatically retry, though since we have specified retries - * as 0 it won't. Enabling retries also opens up the possibility of duplicates (see the documentation on + * If the request fails, the producer can automatically retry. We have specified retries default as Integer.MAX_VALUE. + * It's recommended to use delivery.timeout.ms to control retry behavior, instead of retries. + * Enabling retries also opens up the possibility of duplicates (see the documentation on Review comment: After [KIP-91](https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer), we set `retries` as max value, and it's recommended to use `delivery.timeout.ms` to control retry behavior. Update it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request #11467: MINOR: fix java doc in kafkaProducer
showuon opened a new pull request #11467: URL: https://github.com/apache/kafka/pull/11467 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Gerrrr commented on pull request #11447: KAFKA-13024: Use not-null filter only in optimizable repartitions
Ge commented on pull request #11447: URL: https://github.com/apache/kafka/pull/11447#issuecomment-960765623 Thanks for the review! I agree that solving this issue by removing the not-null optimization completely is very much suboptimal. 8f176cb reverts the first commit. 7c6e97f introduces another approach where `UnoptimizableRepartitionNode` adds the not-null filter iff all downstream operations drop the null keys. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-13433) JsonConverter's method convertToJson when field is optional with default value and value is null, return default value.
[ https://issues.apache.org/jira/browse/KAFKA-13433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17438556#comment-17438556 ] Aiden Gong edited comment on KAFKA-13433 at 11/4/21, 9:38 AM: -- Class org.apache.kafka.connect.json.JsonConverter line 704, should use 'struct.getWithoutDefault(field)' instead of 'struct.get(field.name())' . was (Author: aiden gong): Class org.apache.kafka.connect.json.JsonConverter line 704, should use 'struct.getWithoutDefault(field)' instead of 'struct.get(field)' . > JsonConverter's method convertToJson when field is optional with default > value and value is null, return default value. > > > Key: KAFKA-13433 > URL: https://issues.apache.org/jira/browse/KAFKA-13433 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.0.0 >Reporter: Aiden Gong >Priority: Major > Labels: connect > Attachments: image-2021-11-04-16-25-18-890.png, > image-2021-11-04-16-25-18-975.png > > > JsonConverter's method convertToJson when field is optional with default > value and value is null, return default value. Please refer attachments for > more detail. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mimaison commented on a change in pull request #11401: KAFKA-13255: use exclude filter for new topics
mimaison commented on a change in pull request #11401: URL: https://github.com/apache/kafka/pull/11401#discussion_r742653123 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java ## @@ -152,6 +155,48 @@ public void testConfigPropertyFiltering() { .anyMatch(x -> x.name().equals("min.insync.replicas")), "should not replicate excluded properties"); } +@Test +public void testNewTopicConfigs() throws Exception { +Map filterConfig = new HashMap<>(); + filterConfig.put(DefaultConfigPropertyFilter.CONFIG_PROPERTIES_EXCLUDE_CONFIG, "follower\\.replication\\.throttled\\.replicas, " ++ "leader\\.replication\\.throttled\\.replicas, " ++ "message\\.timestamp\\.difference\\.max\\.ms, " ++ "message\\.timestamp\\.type, " ++ "unclean\\.leader\\.election\\.enable, " ++ "min\\.insync\\.replicas," ++ "exclude_param.*"); +DefaultConfigPropertyFilter filter = new DefaultConfigPropertyFilter(); +filter.configure(filterConfig); + +MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"), +new DefaultReplicationPolicy(), x -> true, filter)); + +final String topic = "testtopic"; +List entries = new ArrayList<>(); +entries.add(new ConfigEntry("name-1", "value-1")); +entries.add(new ConfigEntry("exclude_param.param1", "value-param1")); +entries.add(new ConfigEntry("min.insync.replicas", "2")); +Config config = new Config(entries); +doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any()); +doAnswer(invocation -> { +Map newTopics = invocation.getArgument(0); +assertNotNull(newTopics.get("source." + topic)); +Map targetConfig = newTopics.get("source." + topic).configs(); + +// property 'name-1' isn't defined in the exclude filter -> should be replicated +assertNotNull(targetConfig.get("name-1"), "should replicate properties"); + +// this property is in default list, just double check it: +String prop1 = "min.insync.replicas"; +assertNull(targetConfig.get(prop1), "should not replicate excluded properties " + prop1); +// this property is only in exclude filter custom parameter, also tests regex on the way: +String prop2 = "exclude_param.param1"; +assertNull(targetConfig.get(prop2), "should not replicate excluded properties " + prop2); +return null; +}).when(connector).createNewTopics(any()); +connector.createNewTopics(Collections.singleton(topic), Collections.singletonMap(topic, 1L)); +} Review comment: I agree that calling verify on `targetConfig()` is not the best. But I think we should call verify on `createNewTopics(any())` to ensure our mocked method, and the assertions, actually run. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12487) Sink connectors do not work with the cooperative consumer rebalance protocol
[ https://issues.apache.org/jira/browse/KAFKA-12487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17438579#comment-17438579 ] David Jacot commented on KAFKA-12487: - Hi [~kkonstantine]. My understanding of the issue is that we are fixing a bug here. Therefore, merging it to 3.1 and 3.0 is fine for me. Please correct if it is not a bug. > Sink connectors do not work with the cooperative consumer rebalance protocol > > > Key: KAFKA-12487 > URL: https://issues.apache.org/jira/browse/KAFKA-12487 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2, 3.0.0 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Blocker > Fix For: 3.1.0 > > > The {{ConsumerRebalanceListener}} used by the framework to respond to > rebalance events in consumer groups for sink tasks is hard-coded with the > assumption that the consumer performs rebalances eagerly. In other words, it > assumes that whenever {{onPartitionsRevoked}} is called, all partitions have > been revoked from that consumer, and whenever {{onPartitionsAssigned}} is > called, the partitions passed in to that method comprise the complete set of > topic partitions assigned to that consumer. > See the [WorkerSinkTask.HandleRebalance > class|https://github.com/apache/kafka/blob/b96fc7892f1e885239d3290cf509e1d1bb41e7db/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L669-L730] > for the specifics. > > One issue this can cause is silently ignoring to-be-committed offsets > provided by sink tasks, since the framework ignores offsets provided by tasks > in their {{preCommit}} method if it does not believe that the consumer for > that task is currently assigned the topic partition for that offset. See > these lines in the [WorkerSinkTask::commitOffsets > method|https://github.com/apache/kafka/blob/b96fc7892f1e885239d3290cf509e1d1bb41e7db/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L429-L430] > for reference. > > This may not be the only issue caused by configuring a sink connector's > consumer to use cooperative rebalancing. Rigorous unit and integration > testing should be added before claiming that the Connect framework supports > the use of cooperative consumers with sink connectors. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon commented on pull request #11429: KAFKA-13396: allow create topic without partition/replicaFactor
showuon commented on pull request #11429: URL: https://github.com/apache/kafka/pull/11429#issuecomment-960575313 @ijuma , please help check it when available. This is a regression issue, so I think we should put it v3.1.0. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13430) Remove broker-wide quota properties from the documentation
[ https://issues.apache.org/jira/browse/KAFKA-13430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-13430. - Fix Version/s: 3.0.0 3.1.0 Reviewer: David Jacot Resolution: Fixed > Remove broker-wide quota properties from the documentation > -- > > Key: KAFKA-13430 > URL: https://issues.apache.org/jira/browse/KAFKA-13430 > Project: Kafka > Issue Type: Bug > Components: documentation >Affects Versions: 3.0.0 >Reporter: Dongjin Lee >Assignee: Dongjin Lee >Priority: Major > Fix For: 3.1.0, 3.0.0 > > > I found this problem while working on > [KAFKA-13341|https://issues.apache.org/jira/browse/KAFKA-13341]. > Broker-wide quota properties ({{quota.producer.default}}, > {{quota.consumer.default}}) are [removed in > 3.0|https://issues.apache.org/jira/browse/KAFKA-12591], but it is not applied > to the documentation yet. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13430) Remove broker-wide quota properties from the documentation
[ https://issues.apache.org/jira/browse/KAFKA-13430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17438574#comment-17438574 ] ASF GitHub Bot commented on KAFKA-13430: dajac merged pull request #380: URL: https://github.com/apache/kafka-site/pull/380 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove broker-wide quota properties from the documentation > -- > > Key: KAFKA-13430 > URL: https://issues.apache.org/jira/browse/KAFKA-13430 > Project: Kafka > Issue Type: Bug > Components: documentation >Affects Versions: 3.0.0 >Reporter: Dongjin Lee >Assignee: Dongjin Lee >Priority: Major > > I found this problem while working on > [KAFKA-13341|https://issues.apache.org/jira/browse/KAFKA-13341]. > Broker-wide quota properties ({{quota.producer.default}}, > {{quota.consumer.default}}) are [removed in > 3.0|https://issues.apache.org/jira/browse/KAFKA-12591], but it is not applied > to the documentation yet. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13351) Add possibility to write kafka headers in Kafka Console Producer
[ https://issues.apache.org/jira/browse/KAFKA-13351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17438573#comment-17438573 ] Seweryn Habdank-Wojewodzki commented on KAFKA-13351: Thanks a lot! > Add possibility to write kafka headers in Kafka Console Producer > > > Key: KAFKA-13351 > URL: https://issues.apache.org/jira/browse/KAFKA-13351 > Project: Kafka > Issue Type: Wish > Components: tools >Affects Versions: 2.8.1 >Reporter: Seweryn Habdank-Wojewodzki >Assignee: Florin Akermann >Priority: Major > > Dears, > Currently there is an asymetry between Kafka Console Consumer and Producer. > Kafka Consumer can display headers (KAFKA-6733), but Kafka Producer cannot > produce them. > It would be good to unify this and add possibility to Kafka Console Producer > to produce them. > Similar ticket is here: KAFKA-6574, but it is very old and does not > represents current state of the software. > Please consider this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac commented on pull request #11463: KAFKA-13430: Remove broker-wide quota properties from the documentation
dajac commented on pull request #11463: URL: https://github.com/apache/kafka/pull/11463#issuecomment-960571438 Merged to trunk and 3.1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #11463: KAFKA-13430: Remove broker-wide quota properties from the documentation
dajac merged pull request #11463: URL: https://github.com/apache/kafka/pull/11463 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13433) JsonConverter's method convertToJson when field is optional with default value and value is null, return default value.
[ https://issues.apache.org/jira/browse/KAFKA-13433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aiden Gong updated KAFKA-13433: --- Labels: connect (was: connect-api) > JsonConverter's method convertToJson when field is optional with default > value and value is null, return default value. > > > Key: KAFKA-13433 > URL: https://issues.apache.org/jira/browse/KAFKA-13433 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.0.0 >Reporter: Aiden Gong >Priority: Major > Labels: connect > Attachments: image-2021-11-04-16-25-18-890.png, > image-2021-11-04-16-25-18-975.png > > > JsonConverter's method convertToJson when field is optional with default > value and value is null, return default value. Please refer attachments for > more detail. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13433) JsonConverter's method convertToJson when field is optional with default value and value is null, return default value.
[ https://issues.apache.org/jira/browse/KAFKA-13433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aiden Gong updated KAFKA-13433: --- Labels: connect-api (was: ) > JsonConverter's method convertToJson when field is optional with default > value and value is null, return default value. > > > Key: KAFKA-13433 > URL: https://issues.apache.org/jira/browse/KAFKA-13433 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.0.0 >Reporter: Aiden Gong >Priority: Major > Labels: connect-api > Attachments: image-2021-11-04-16-25-18-890.png, > image-2021-11-04-16-25-18-975.png > > > JsonConverter's method convertToJson when field is optional with default > value and value is null, return default value. Please refer attachments for > more detail. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13433) JsonConverter's method convertToJson when field is optional with default value and value is null, return default value.
[ https://issues.apache.org/jira/browse/KAFKA-13433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aiden Gong updated KAFKA-13433: --- Priority: Major (was: Minor) > JsonConverter's method convertToJson when field is optional with default > value and value is null, return default value. > > > Key: KAFKA-13433 > URL: https://issues.apache.org/jira/browse/KAFKA-13433 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.0.0 >Reporter: Aiden Gong >Priority: Major > Attachments: image-2021-11-04-16-25-18-890.png, > image-2021-11-04-16-25-18-975.png > > > JsonConverter's method convertToJson when field is optional with default > value and value is null, return default value. Please refer attachments for > more detail. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13433) JsonConverter's method convertToJson when field is optional with default value and value is null, return default value.
[ https://issues.apache.org/jira/browse/KAFKA-13433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aiden Gong updated KAFKA-13433: --- Priority: Minor (was: Blocker) > JsonConverter's method convertToJson when field is optional with default > value and value is null, return default value. > > > Key: KAFKA-13433 > URL: https://issues.apache.org/jira/browse/KAFKA-13433 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.0.0 >Reporter: Aiden Gong >Priority: Minor > Attachments: image-2021-11-04-16-25-18-890.png, > image-2021-11-04-16-25-18-975.png > > > JsonConverter's method convertToJson when field is optional with default > value and value is null, return default value. Please refer attachments for > more detail. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13433) JsonConverter's method convertToJson when field is optional with default value and value is null, return default value.
[ https://issues.apache.org/jira/browse/KAFKA-13433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17438556#comment-17438556 ] Aiden Gong commented on KAFKA-13433: Class org.apache.kafka.connect.json.JsonConverter line 704, should use 'struct.getWithoutDefault(field)' instead of 'struct.get(field)' . > JsonConverter's method convertToJson when field is optional with default > value and value is null, return default value. > > > Key: KAFKA-13433 > URL: https://issues.apache.org/jira/browse/KAFKA-13433 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.0.0 >Reporter: Aiden Gong >Priority: Blocker > Attachments: image-2021-11-04-16-25-18-890.png, > image-2021-11-04-16-25-18-975.png > > > JsonConverter's method convertToJson when field is optional with default > value and value is null, return default value. Please refer attachments for > more detail. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13433) JsonConverter's method convertToJson when field is optional with default value and value is null, return default value.
[ https://issues.apache.org/jira/browse/KAFKA-13433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17438553#comment-17438553 ] Aiden Gong commented on KAFKA-13433: I will submit a pr to fix this issue. Please assignee to me. Thanks! > JsonConverter's method convertToJson when field is optional with default > value and value is null, return default value. > > > Key: KAFKA-13433 > URL: https://issues.apache.org/jira/browse/KAFKA-13433 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.0.0 >Reporter: Aiden Gong >Priority: Blocker > Attachments: image-2021-11-04-16-25-18-890.png, > image-2021-11-04-16-25-18-975.png > > > JsonConverter's method convertToJson when field is optional with default > value and value is null, return default value. Please refer attachments for > more detail. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13433) JsonConverter's method convertToJson when field is optional with default value and value is null, return default value.
[ https://issues.apache.org/jira/browse/KAFKA-13433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aiden Gong updated KAFKA-13433: --- Description: JsonConverter's method convertToJson when field is optional with default value and value is null, return default value. Please refer attachments for more detail. (was: JsonConverter's method convertToJson when field is optional with default value and value is null, return default value. ) > JsonConverter's method convertToJson when field is optional with default > value and value is null, return default value. > > > Key: KAFKA-13433 > URL: https://issues.apache.org/jira/browse/KAFKA-13433 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.0.0 >Reporter: Aiden Gong >Priority: Blocker > Attachments: image-2021-11-04-16-25-18-890.png, > image-2021-11-04-16-25-18-975.png > > > JsonConverter's method convertToJson when field is optional with default > value and value is null, return default value. Please refer attachments for > more detail. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13433) JsonConverter's method convertToJson when field is optional with default value and value is null, return default value.
[ https://issues.apache.org/jira/browse/KAFKA-13433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aiden Gong updated KAFKA-13433: --- Description: JsonConverter's method convertToJson when field is optional with default value and value is null, return default value. (was: JsonConverter's method convertToJson when field is optional with default value and value is null, return default value. !image-2021-11-04-16-25-18-975.png!) > JsonConverter's method convertToJson when field is optional with default > value and value is null, return default value. > > > Key: KAFKA-13433 > URL: https://issues.apache.org/jira/browse/KAFKA-13433 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.0.0 >Reporter: Aiden Gong >Priority: Blocker > Attachments: image-2021-11-04-16-25-18-890.png, > image-2021-11-04-16-25-18-975.png > > > JsonConverter's method convertToJson when field is optional with default > value and value is null, return default value. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13433) JsonConverter's method convertToJson when field is optional with default value and value is null, return default value.
Aiden Gong created KAFKA-13433: -- Summary: JsonConverter's method convertToJson when field is optional with default value and value is null, return default value. Key: KAFKA-13433 URL: https://issues.apache.org/jira/browse/KAFKA-13433 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 3.0.0 Reporter: Aiden Gong Attachments: image-2021-11-04-16-25-18-890.png, image-2021-11-04-16-25-18-975.png JsonConverter's method convertToJson when field is optional with default value and value is null, return default value. !image-2021-11-04-16-25-18-975.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] RivenSun2 commented on a change in pull request #11460: KAFKA-13425: Optimization of KafkaConsumer#pause semantics
RivenSun2 commented on a change in pull request #11460: URL: https://github.com/apache/kafka/pull/11460#discussion_r742601310 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -326,6 +330,10 @@ private Exception invokePartitionsRevoked(final Set revokedParti private Exception invokePartitionsLost(final Set lostPartitions) { log.info("Lost previously assigned partitions {}", Utils.join(lostPartitions, ", ")); +Set lostPausedPartitions = subscriptions.pausedPartitions(); +lostPausedPartitions.retainAll(lostPartitions); +if (!lostPausedPartitions.isEmpty()) +log.info("The pause flag in partitions [{}] will be removed due to lost.", Utils.join(lostPausedPartitions, ", ")); Review comment: Thank you for your rigorous attitude. Forgive my bad English. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RivenSun2 commented on pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…
RivenSun2 commented on pull request #11340: URL: https://github.com/apache/kafka/pull/11340#issuecomment-960522880 @guozhangwang @showuon Thanks for your helpful review. I just commit a new change If you have time, please browse it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11460: KAFKA-13425: Optimization of KafkaConsumer#pause semantics
showuon commented on a change in pull request #11460: URL: https://github.com/apache/kafka/pull/11460#discussion_r742596144 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -326,6 +330,10 @@ private Exception invokePartitionsRevoked(final Set revokedParti private Exception invokePartitionsLost(final Set lostPartitions) { log.info("Lost previously assigned partitions {}", Utils.join(lostPartitions, ", ")); +Set lostPausedPartitions = subscriptions.pausedPartitions(); +lostPausedPartitions.retainAll(lostPartitions); +if (!lostPausedPartitions.isEmpty()) +log.info("The pause flag in partitions [{}] will be removed due to lost.", Utils.join(lostPausedPartitions, ", ")); Review comment: nit: due to **partition** lost -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RivenSun2 commented on pull request #11460: KAFKA-13425: Optimization of KafkaConsumer#pause semantics
RivenSun2 commented on pull request #11460: URL: https://github.com/apache/kafka/pull/11460#issuecomment-960521298 @showuon Thanks for your review I just commited a new change -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RivenSun2 commented on a change in pull request #11460: KAFKA-13425: Optimization of KafkaConsumer#pause semantics
RivenSun2 commented on a change in pull request #11460: URL: https://github.com/apache/kafka/pull/11460#discussion_r742589702 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -307,6 +307,10 @@ private Exception invokePartitionsAssigned(final Set assignedPar private Exception invokePartitionsRevoked(final Set revokedPartitions) { log.info("Revoke previously assigned partitions {}", Utils.join(revokedPartitions, ", ")); +Set revokePausedPartitions = subscriptions.pausedPartitions(); +revokePausedPartitions.retainAll(revokedPartitions); +if (!revokePausedPartitions.isEmpty()) +log.info("Revoke previously paused partitions {}", Utils.join(revokePausedPartitions, ", ")); Review comment: I agree with you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RivenSun2 commented on a change in pull request #11460: KAFKA-13425: Optimization of KafkaConsumer#pause semantics
RivenSun2 commented on a change in pull request #11460: URL: https://github.com/apache/kafka/pull/11460#discussion_r742589553 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ## @@ -2007,7 +2007,7 @@ public OffsetAndMetadata committed(TopicPartition partition, final Duration time * Suspend fetching from the requested partitions. Future calls to {@link #poll(Duration)} will not return * any records from these partitions until they have been resumed using {@link #resume(Collection)}. * Note that this method does not affect partition subscription. In particular, it does not cause a group - * rebalance when automatic assignment is used. + * rebalance when automatic assignment is used. And groupRebalance does not preserve pause state. Review comment: Sure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11460: KAFKA-13425: Optimization of KafkaConsumer#pause semantics
showuon commented on a change in pull request #11460: URL: https://github.com/apache/kafka/pull/11460#discussion_r742575556 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ## @@ -2007,7 +2007,7 @@ public OffsetAndMetadata committed(TopicPartition partition, final Duration time * Suspend fetching from the requested partitions. Future calls to {@link #poll(Duration)} will not return * any records from these partitions until they have been resumed using {@link #resume(Collection)}. * Note that this method does not affect partition subscription. In particular, it does not cause a group - * rebalance when automatic assignment is used. + * rebalance when automatic assignment is used. And groupRebalance does not preserve pause state. Review comment: How about this: ``` * ... * rebalance when automatic assignment is used. * * Note: Rebalance will not preserve the pause/resume state. ``` ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -307,6 +307,10 @@ private Exception invokePartitionsAssigned(final Set assignedPar private Exception invokePartitionsRevoked(final Set revokedPartitions) { log.info("Revoke previously assigned partitions {}", Utils.join(revokedPartitions, ", ")); +Set revokePausedPartitions = subscriptions.pausedPartitions(); +revokePausedPartitions.retainAll(revokedPartitions); +if (!revokePausedPartitions.isEmpty()) +log.info("Revoke previously paused partitions {}", Utils.join(revokePausedPartitions, ", ")); Review comment: I'm afraid that users might get confused, because it looks like we revoke twice: ``` Revoke previously assigned partitions tp1, tp2. Revoke previously paused partitions tp1 ``` How about it: `log.info("The pause flag in partitions [{}] will be removed due to revocation.", Utils.join(revokePausedPartitions, ", "));` WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RivenSun2 commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…
RivenSun2 commented on a change in pull request #11340: URL: https://github.com/apache/kafka/pull/11340#discussion_r742575938 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -1073,6 +1086,33 @@ private void doAutoCommitOffsetsAsync() { }); } +private boolean maybeAutoCommitOffsetsAsync() { Review comment: In fact, only onJoinPrepare called the maybeAutoCommitOffsetsAsync method before. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RivenSun2 commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…
RivenSun2 commented on a change in pull request #11340: URL: https://github.com/apache/kafka/pull/11340#discussion_r742574738 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -420,9 +421,14 @@ boolean joinGroupIfNeeded(final Timer timer) { // need to set the flag before calling onJoinPrepare since the user callback may throw // exception, in which case upon retry we should not retry onJoinPrepare either. needsJoinPrepare = false; -onJoinPrepare(generation.generationId, generation.memberId); +if (!onJoinPrepare(generation.generationId, generation.memberId)) Review comment: I think it can be simplified like this ``` if (needsJoinPrepare) { // need to set the flag before calling onJoinPrepare since the user callback may throw // exception, in which case upon retry we should not retry onJoinPrepare either. needsJoinPrepare = false; if (!onJoinPrepare(generation.generationId, generation.memberId)) { needsJoinPrepare = true; //should not initiateJoinGroup if needsJoinPrepare still is true return false; } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RivenSun2 commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…
RivenSun2 commented on a change in pull request #11340: URL: https://github.com/apache/kafka/pull/11340#discussion_r742569946 ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java ## @@ -1053,6 +1058,49 @@ public void testForceMetadataRefreshForPatternSubscriptionDuringRebalance() { assertFalse(coordinator.rejoinNeededOrPending()); } +@Test +public void testForceMetadataDeleteForPatternSubscriptionDuringRebalance() { Review comment: In fact, only onJoinPrepare called the maybeAutoCommitOffsetsAsync method before. Follow guozhang's suggestion , I will delete the maybeAutoCommitOffsetsSync(timer) method, close() also calls maybeAutoCommitOffsetsAsync -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RivenSun2 commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…
RivenSun2 commented on a change in pull request #11340: URL: https://github.com/apache/kafka/pull/11340#discussion_r742568417 ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java ## @@ -151,6 +151,11 @@ put(topic2, 1); } }); +private MetadataResponse deletedMetadataResponse = RequestTestUtils.metadataUpdateWith(1, new HashMap() { Review comment: Sure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RivenSun2 commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…
RivenSun2 commented on a change in pull request #11340: URL: https://github.com/apache/kafka/pull/11340#discussion_r742568236 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -692,10 +693,18 @@ private void validateCooperativeAssignment(final Map
[GitHub] [kafka] RivenSun2 commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…
RivenSun2 commented on a change in pull request #11340: URL: https://github.com/apache/kafka/pull/11340#discussion_r742567404 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -403,7 +404,7 @@ private void closeHeartbeatThread() { * * @param timer Timer bounding how long this method can block * @throws KafkaException if the callback throws exception - * @return true iff the operation succeeded + * @return true if the operation succeeded Review comment: my fault, I thought it was a misspelling of a word, I will restore it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…
showuon commented on a change in pull request #11340: URL: https://github.com/apache/kafka/pull/11340#discussion_r742567298 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -420,9 +421,14 @@ boolean joinGroupIfNeeded(final Timer timer) { // need to set the flag before calling onJoinPrepare since the user callback may throw // exception, in which case upon retry we should not retry onJoinPrepare either. needsJoinPrepare = false; -onJoinPrepare(generation.generationId, generation.memberId); +if (!onJoinPrepare(generation.generationId, generation.memberId)) Review comment: You're right. How about this: ```java if (needsJoinPrepare) { try { if (onJoinPrepare(generation.generationId, generation.memberId)) needsJoinPrepare = false; else return false; } catch (KafkaException e) { // if onJoinPrepare throws an exception, it would be from the rebalance listener. // next time we would then not retry {@code onJoinPrepare} any more but proceed the join-group procedure. needsJoinPrepare = false; throw e; } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…
showuon commented on a change in pull request #11340: URL: https://github.com/apache/kafka/pull/11340#discussion_r742541638 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -692,10 +693,18 @@ private void validateCooperativeAssignment(final Map() { Review comment: Since this variable only used in the new added test, could we put it into the test? ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -1073,6 +1086,33 @@ private void doAutoCommitOffsetsAsync() { }); } +private boolean maybeAutoCommitOffsetsAsync() { +if (autoCommitEnabled) { +invokeCompletedOffsetCommitCallbacks(); + +if (onJoinPrepareAsyncCommitFuture == null) +onJoinPrepareAsyncCommitFuture = doAutoCommitOffsetsAsync(); +if (onJoinPrepareAsyncCommitFuture == null) +return true; + +client.pollNoWakeup(); +invokeCompletedOffsetCommitCallbacks(); + +if (!onJoinPrepareAsyncCommitFuture.isDone()) +return false; +if (onJoinPrepareAsyncCommitFuture.succeeded()) { +onJoinPrepareAsyncCommitFuture = null; +return true; +} +if (onJoinPrepareAsyncCommitFuture.failed() && !onJoinPrepareAsyncCommitFuture.isRetriable()) Review comment: +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RivenSun2 commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…
RivenSun2 commented on a change in pull request #11340: URL: https://github.com/apache/kafka/pull/11340#discussion_r742564833 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -1073,6 +1086,33 @@ private void doAutoCommitOffsetsAsync() { }); } +private boolean maybeAutoCommitOffsetsAsync() { +if (autoCommitEnabled) { +invokeCompletedOffsetCommitCallbacks(); Review comment: my fault, already fix it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RivenSun2 commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…
RivenSun2 commented on a change in pull request #11340: URL: https://github.com/apache/kafka/pull/11340#discussion_r742560824 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -420,9 +421,14 @@ boolean joinGroupIfNeeded(final Timer timer) { // need to set the flag before calling onJoinPrepare since the user callback may throw // exception, in which case upon retry we should not retry onJoinPrepare either. needsJoinPrepare = false; -onJoinPrepare(generation.generationId, generation.memberId); +if (!onJoinPrepare(generation.generationId, generation.memberId)) Review comment: If you do This goal should not be achieved: ``` if (!onJoinPrepare(generation.generationId, generation.memberId)) needsJoinPrepare = true; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RivenSun2 commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…
RivenSun2 commented on a change in pull request #11340: URL: https://github.com/apache/kafka/pull/11340#discussion_r742560824 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -420,9 +421,14 @@ boolean joinGroupIfNeeded(final Timer timer) { // need to set the flag before calling onJoinPrepare since the user callback may throw // exception, in which case upon retry we should not retry onJoinPrepare either. needsJoinPrepare = false; -onJoinPrepare(generation.generationId, generation.memberId); +if (!onJoinPrepare(generation.generationId, generation.memberId)) Review comment: If you do This goal should not be achieved: ``` if (!onJoinPrepare(generation.generationId, generation.memberId)) needsJoinPrepare = true; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RivenSun2 commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…
RivenSun2 commented on a change in pull request #11340: URL: https://github.com/apache/kafka/pull/11340#discussion_r742560295 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -1073,6 +1086,33 @@ private void doAutoCommitOffsetsAsync() { }); } +private boolean maybeAutoCommitOffsetsAsync() { +if (autoCommitEnabled) { +invokeCompletedOffsetCommitCallbacks(); + +if (onJoinPrepareAsyncCommitFuture == null) +onJoinPrepareAsyncCommitFuture = doAutoCommitOffsetsAsync(); +if (onJoinPrepareAsyncCommitFuture == null) +return true; + +client.pollNoWakeup(); +invokeCompletedOffsetCommitCallbacks(); + +if (!onJoinPrepareAsyncCommitFuture.isDone()) +return false; +if (onJoinPrepareAsyncCommitFuture.succeeded()) { +onJoinPrepareAsyncCommitFuture = null; +return true; +} +if (onJoinPrepareAsyncCommitFuture.failed() && !onJoinPrepareAsyncCommitFuture.isRetriable()) Review comment: I agree with your suggestion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RivenSun2 commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…
RivenSun2 commented on a change in pull request #11340: URL: https://github.com/apache/kafka/pull/11340#discussion_r742560139 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -1073,6 +1086,33 @@ private void doAutoCommitOffsetsAsync() { }); } +private boolean maybeAutoCommitOffsetsAsync() { Review comment: I agree with your suggestion. ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -1054,11 +1067,11 @@ public void maybeAutoCommitOffsetsAsync(long now) { } } -private void doAutoCommitOffsetsAsync() { Review comment: I agree with your suggestion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RivenSun2 commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…
RivenSun2 commented on a change in pull request #11340: URL: https://github.com/apache/kafka/pull/11340#discussion_r742560028 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -692,10 +693,18 @@ private void validateCooperativeAssignment(final Map
[GitHub] [kafka] RivenSun2 commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…
RivenSun2 commented on a change in pull request #11340: URL: https://github.com/apache/kafka/pull/11340#discussion_r742559955 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -1073,6 +1086,33 @@ private void doAutoCommitOffsetsAsync() { }); } +private boolean maybeAutoCommitOffsetsAsync() { +if (autoCommitEnabled) { +invokeCompletedOffsetCommitCallbacks(); + +if (onJoinPrepareAsyncCommitFuture == null) Review comment: Because we use asynchronous commitOffsets, and at the same time, in order not to block the Consumer#poll method We cannot call ConsumerNetworkClient#poll(future) The onJoinPrepareAsyncCommitFuture variable is introduced to deal with the "onJoinPrepareAsyncCommitFuture.isDone() == false" situation. Maybe my consideration is complicated, this situation can be retryed by Consumer#poll next time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org