[jira] [Assigned] (KAFKA-15955) Migrating ZK brokers send dir assignments
[ https://issues.apache.org/jira/browse/KAFKA-15955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Proven Provenzano reassigned KAFKA-15955: - Assignee: Proven Provenzano > Migrating ZK brokers send dir assignments > - > > Key: KAFKA-15955 > URL: https://issues.apache.org/jira/browse/KAFKA-15955 > Project: Kafka > Issue Type: Sub-task >Reporter: Igor Soarez >Assignee: Proven Provenzano >Priority: Major > > Broker in ZooKeeper mode, while in migration mode, should start sending > directory assignments to the KRaft Controller using AssignmentsManager. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
cadonna commented on PR #14879: URL: https://github.com/apache/kafka/pull/14879#issuecomment-1836362310 > Can you elaborate on the direction to remove the background queue from the 'test builder' instead of using the one it constructed? I had issues with tests using the spy on the `AsyncKafkaConsumer`. More precisely, a test failed with the spy but did not fail with a consumer not wrapped into a spy. BTW, IMO, spies (or partial mocks) should be used really carefully. Actually, good code does not need spies (with a few exceptions). Spies avoid to structure the code well. They do not force one to loosely couple objects. Even the [Mockito documentation](https://javadoc.io/doc/org.mockito/mockito-core/latest/org/mockito/Mockito.html#spy) warns against their own spies. Additionally, the code under test, i.e., the async Kafka consumer, should not be wrapped into a spy. We should test that code directly to avoid possible side effects coming from the wrapping or from mistakes in specifying stubs on the spy. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14516: [2/N] New Group Coordinator IT for Static Member Replacement [kafka]
vamossagar12 commented on code in PR #14882: URL: https://github.com/apache/kafka/pull/14882#discussion_r1412299630 ## core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala: ## @@ -137,6 +137,98 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { assertEquals(-1, consumerGroupHeartbeatResponse.data.memberEpoch) } + @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array( +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), +new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testRejoiningStaticMemberGetsAssignmentsBackWhenNewGroupCoordinatorIsEnabled(): Unit = { +val raftCluster = cluster.asInstanceOf[RaftClusterInstance] +val admin = cluster.createAdminClient() +val instanceId = "instanceId" + +// Creates the __consumer_offsets topics because it won't be created automatically +// in this test because it does not use FindCoordinator API. +TestUtils.createOffsetsTopicWithAdmin( + admin = admin, + brokers = raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala, + controllers = raftCluster.controllerServers().asScala.toSeq +) + +// Create the topic. +val topicId = TestUtils.createTopicWithAdminRaw( + admin = admin, + topic = "foo", + numPartitions = 3 +) + +// Heartbeat request so that a static member joins the group +var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() +.setGroupId("grp") +.setInstanceId(instanceId) +.setMemberEpoch(0) +.setRebalanceTimeoutMs(5 * 60 * 1000) +.setSubscribedTopicNames(List("foo").asJava) +.setTopicPartitions(List.empty.asJava) +).build() + +// This is the expected assignment. +val expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(List(new ConsumerGroupHeartbeatResponseData.TopicPartitions() +.setTopicId(topicId) +.setPartitions(List[Integer](0, 1, 2).asJava)).asJava) + +// Send the request until receiving a successful response. There is a delay +// here because the group coordinator is loaded in the background. +var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null +TestUtils.waitUntilTrue(() => { + consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest) + consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code +}, msg = s"Could not join the group successfully and get partitions assigned. Last response $consumerGroupHeartbeatResponse.") Review Comment: Got it, yeah that makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15953: Refactor polling delays [kafka]
AndrewJSchofield opened a new pull request, #14897: URL: https://github.com/apache/kafka/pull/14897 Caches the maximum time to wait in the consumer network thread so the application thread is better isolated from the request managers. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Add @Timeout annotation to consumer integration tests [kafka]
dajac opened a new pull request, #14896: URL: https://github.com/apache/kafka/pull/14896 In this [buid](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14826/11/pipeline/12/), the following test hang forever. ``` Gradle Test Run :core:test > Gradle Test Executor 93 > PlaintextConsumerTest > testSeek(String, String) > testSeek(String, String).quorum=kraft+kip848.groupProtocol=consumer STARTED ``` As the new consumer is not extremely stable yet, we should add a Timeout to all those integration tests to ensure that builds are not blocked unnecessarily. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15955) Migrating ZK brokers send dir assignments
Igor Soarez created KAFKA-15955: --- Summary: Migrating ZK brokers send dir assignments Key: KAFKA-15955 URL: https://issues.apache.org/jira/browse/KAFKA-15955 Project: Kafka Issue Type: Sub-task Reporter: Igor Soarez Broker in ZooKeeper mode, while in migration mode, should start sending directory assignments to the KRaft Controller using AssignmentsManager. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15574; [2/N]: Client state machine updates [kafka]
lianetm commented on PR #14857: URL: https://github.com/apache/kafka/pull/14857#issuecomment-1836284959 All test pass locally, I will keep an eye on the build here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [2/N]: Client state machine updates [kafka]
lianetm commented on PR #14857: URL: https://github.com/apache/kafka/pull/14857#issuecomment-1836283568 Done, latest changes merged @dajac, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14509: [2/N] Implement server side logic for ConsumerGroupDescribe API [kafka]
riedelmax commented on PR #14544: URL: https://github.com/apache/kafka/pull/14544#issuecomment-1836279426 Hey @dajac thanks for the review. I addressed all the points except for the ones we intent to do a follow up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [2/N]: Client state machine updates [kafka]
dajac commented on code in PR #14857: URL: https://github.com/apache/kafka/pull/14857#discussion_r1412217786 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -365,6 +366,16 @@ private void replaceUnresolvedAssignmentWithNewAssignment( */ @Override public void transitionToFenced() { +if (state == MemberState.PREPARE_LEAVING || state == MemberState.LEAVING) { +log.debug("Member {} with epoch {} got fenced but it is already leaving the group " + +"with state {}, so it won't attempt to rejoin.", memberId, memberEpoch, state); +return; +} +if (state == MemberState.UNSUBSCRIBED) { +log.debug("Member {} with epoch {} got fenced but it already left the group, so it " + +"won't attempt to rejoin.", memberId, memberEpoch); +return; +} Review Comment: Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [2/N]: Client state machine updates [kafka]
dajac commented on PR #14857: URL: https://github.com/apache/kafka/pull/14857#issuecomment-1836265424 @lianetm There are conflicts. Could you please rebase or merge trunk? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [2/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14857: URL: https://github.com/apache/kafka/pull/14857#discussion_r1412204434 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -365,6 +366,16 @@ private void replaceUnresolvedAssignmentWithNewAssignment( */ @Override public void transitionToFenced() { +if (state == MemberState.PREPARE_LEAVING || state == MemberState.LEAVING) { +log.debug("Member {} with epoch {} got fenced but it is already leaving the group " + +"with state {}, so it won't attempt to rejoin.", memberId, memberEpoch, state); +return; +} +if (state == MemberState.UNSUBSCRIBED) { +log.debug("Member {} with epoch {} got fenced but it already left the group, so it " + +"won't attempt to rejoin.", memberId, memberEpoch); +return; +} Review Comment: I see, agree that makes total sense in the close (kind of "terminal" state for the consumer), but for the unsubscribe would probably make sense to put a little more effort in making sure that the member "properly" leaves the group (sending last HB). We would need to think a bit more about the implications so I filed [KAFKA-15954](https://issues.apache.org/jira/browse/KAFKA-15954) to review that as a follow-up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [2/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14857: URL: https://github.com/apache/kafka/pull/14857#discussion_r1412204434 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -365,6 +366,16 @@ private void replaceUnresolvedAssignmentWithNewAssignment( */ @Override public void transitionToFenced() { +if (state == MemberState.PREPARE_LEAVING || state == MemberState.LEAVING) { +log.debug("Member {} with epoch {} got fenced but it is already leaving the group " + +"with state {}, so it won't attempt to rejoin.", memberId, memberEpoch, state); +return; +} +if (state == MemberState.UNSUBSCRIBED) { +log.debug("Member {} with epoch {} got fenced but it already left the group, so it " + +"won't attempt to rejoin.", memberId, memberEpoch); +return; +} Review Comment: I see, agree that makes total sense in the close (kind of "terminal" state for the consumer), but for the unsubscribe would probably make sense to put a little more effort in making sure that the member "properly" leaves the group (sending last HB). We would need to think a bit more about the implications so I filed https://issues.apache.org/jira/browse/KAFKA-15954 to review that as a follow-up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15954) Review minimal effort approach on consumer last heartbeat on unsubscribe
[ https://issues.apache.org/jira/browse/KAFKA-15954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans reassigned KAFKA-15954: -- Assignee: Lianet Magrans > Review minimal effort approach on consumer last heartbeat on unsubscribe > > > Key: KAFKA-15954 > URL: https://issues.apache.org/jira/browse/KAFKA-15954 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > > Currently the legacy and new consumer follows a minimal effort approach when > sending a leave group (legacy) or last heartbeat request (new consumer). The > request is sent without waiting/handling any response. This behaviour applies > when the consumer is being closed or when it unsubscribes. > For the case when the consumer is being closed, (which is a "terminal" > state), it makes sense to just follow a minimal effort approach for > "properly" leaving the group. But for the case of unsubscribe, it would maybe > make sense to put a little more effort in making sure that the last heartbeat > is sent and received by the broker. Note that unsubscribe could a temporary > state, where the consumer might want to re-join the group at any time. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15953) Refactor polling delays
Andrew Schofield created KAFKA-15953: Summary: Refactor polling delays Key: KAFKA-15953 URL: https://issues.apache.org/jira/browse/KAFKA-15953 Project: Kafka Issue Type: Sub-task Components: clients Affects Versions: 3.7.0 Reporter: Andrew Schofield Assignee: Andrew Schofield Fix For: 3.7.0 This is a follow-on tasks for https://issues.apache.org/jira/browse/KAFKA-15890. The idea is to reduce the interaction between the application thread and the request managers which was introduced in that earlier JIRA's patch. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15954) Review minimal effort approach on consumer last heartbeat on unsubscribe
Lianet Magrans created KAFKA-15954: -- Summary: Review minimal effort approach on consumer last heartbeat on unsubscribe Key: KAFKA-15954 URL: https://issues.apache.org/jira/browse/KAFKA-15954 Project: Kafka Issue Type: Task Components: clients, consumer Reporter: Lianet Magrans Currently the legacy and new consumer follows a minimal effort approach when sending a leave group (legacy) or last heartbeat request (new consumer). The request is sent without waiting/handling any response. This behaviour applies when the consumer is being closed or when it unsubscribes. For the case when the consumer is being closed, (which is a "terminal" state), it makes sense to just follow a minimal effort approach for "properly" leaving the group. But for the case of unsubscribe, it would maybe make sense to put a little more effort in making sure that the last heartbeat is sent and received by the broker. Note that unsubscribe could a temporary state, where the consumer might want to re-join the group at any time. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15426: Process and persist directory assignments [kafka]
soarez commented on PR #14863: URL: https://github.com/apache/kafka/pull/14863#issuecomment-1836233799 Ready for review. Draft because it depends on https://github.com/apache/kafka/pull/14838 so ignore the first commit. @cmccabe @rondagostino @pprovenzano -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15890) Consumer.poll with long timeout unaware of assigned partitions
[ https://issues.apache.org/jira/browse/KAFKA-15890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Schofield resolved KAFKA-15890. -- Resolution: Fixed > Consumer.poll with long timeout unaware of assigned partitions > -- > > Key: KAFKA-15890 > URL: https://issues.apache.org/jira/browse/KAFKA-15890 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support, > kip-848-preview > > Various problems found testing `kafka-console-consumer.sh` with the new > consumer, including NPEs, never-ending reconcilation states and failure to > fetch records. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15890: Consumer.poll with long timeout unaware of assigned partitions [kafka]
lucasbru merged PR #14835: URL: https://github.com/apache/kafka/pull/14835 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
AndrewJSchofield commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1412179127 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -666,12 +766,16 @@ public Map committed(final Set
Re: [PR] KAFKA-14133: Migrate ProcessorStateManagerTest and StreamThreadTest to Mockito [kafka]
clolov commented on PR #13932: URL: https://github.com/apache/kafka/pull/13932#issuecomment-1836217374 Heya @cadonna! Many thanks for the review and apologies for the delay. Yes, it makes perfect sense, I also came across ``` I=0; while ./gradlew clients:test --tests RequestResponseTest --rerun --fail-fast; do (( I=$I+1 )); echo "Completed run: $I"; sleep 1; done ``` from the Kafka README. Hopefully I have addressed everything! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [2/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14857: URL: https://github.com/apache/kafka/pull/14857#discussion_r1412178458 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -322,16 +314,14 @@ private void onErrorResponse(final ConsumerGroupHeartbeatResponse response, break; case FENCED_MEMBER_EPOCH: -message = String.format("GroupHeartbeatRequest failed because member ID %s with epoch %s is invalid. " + -"Will abandon all partitions and rejoin the group", +message = String.format("GroupHeartbeatRequest failed because member ID %s with epoch %s is invalid.", Review Comment: Sounds good, (just added the member ID too, so `GroupHeartbeatRequest failed for member %s because epoch %s is fenced.`) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Migrate ProcessorStateManagerTest and StreamThreadTest to Mockito [kafka]
clolov commented on code in PR #13932: URL: https://github.com/apache/kafka/pull/13932#discussion_r1412177600 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ## @@ -3494,32 +3323,27 @@ private StreamThread setUpThread(final Properties streamsConfigProps) { } private TaskManager mockTaskManager(final Task runningTask) { -final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); +final TaskManager taskManager = mock(TaskManager.class); final TaskId taskId = new TaskId(0, 0); -expect(runningTask.state()).andStubReturn(Task.State.RUNNING); -expect(runningTask.id()).andStubReturn(taskId); - expect(taskManager.allOwnedTasks()).andStubReturn(Collections.singletonMap(taskId, runningTask)); - expect(taskManager.commit(Collections.singleton(runningTask))).andStubReturn(1); +when(runningTask.state()).thenReturn(Task.State.RUNNING); + when(taskManager.allOwnedTasks()).thenReturn(Collections.singletonMap(taskId, runningTask)); return taskManager; } private TaskManager mockTaskManagerPurge(final int numberOfPurges) { final Task runningTask = mock(Task.class); final TaskManager taskManager = mockTaskManager(runningTask); -taskManager.maybePurgeCommittedRecords(); -EasyMock.expectLastCall().times(numberOfPurges); Review Comment: Yes, sorry, I don't know why I left/change it to this, I changed it everywhere else to what you have suggested. This has been remedied in the latest version of the pull request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15891) Flaky test: testResetSinkConnectorOffsetsOverriddenConsumerGroupId – org.apache.kafka.connect.integration.OffsetsApiIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-15891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-15891: --- Labels: flaky-test (was: ) > Flaky test: testResetSinkConnectorOffsetsOverriddenConsumerGroupId – > org.apache.kafka.connect.integration.OffsetsApiIntegrationTest > --- > > Key: KAFKA-15891 > URL: https://issues.apache.org/jira/browse/KAFKA-15891 > Project: Kafka > Issue Type: Bug >Reporter: Apoorv Mittal >Priority: Major > Labels: flaky-test > > h4. Error > org.opentest4j.AssertionFailedError: Condition not met within timeout 3. > Sink connector consumer group offsets should catch up to the topic end > offsets ==> expected: but was: > h4. Stacktrace > org.opentest4j.AssertionFailedError: Condition not met within timeout 3. > Sink connector consumer group offsets should catch up to the topic end > offsets ==> expected: but was: > at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210) > at > app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:331) > at > app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:328) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:312) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:302) > at > app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.verifyExpectedSinkConnectorOffsets(OffsetsApiIntegrationTest.java:917) > at > app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.resetAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:725) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
cadonna commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1412171456 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandlerTest.java: ## @@ -51,91 +61,91 @@ public void tearDown() { testBuilder.close(); } -@Test -public void testNoEvents() { -assertTrue(backgroundEventQueue.isEmpty()); -backgroundEventProcessor.process((event, error) -> { }); -assertTrue(backgroundEventQueue.isEmpty()); -} - -@Test -public void testSingleEvent() { -BackgroundEvent event = new ErrorBackgroundEvent(new RuntimeException("A")); -backgroundEventQueue.add(event); -assertPeeked(event); -backgroundEventProcessor.process((e, error) -> { }); -assertTrue(backgroundEventQueue.isEmpty()); -} - -@Test -public void testSingleErrorEvent() { -KafkaException error = new KafkaException("error"); -BackgroundEvent event = new ErrorBackgroundEvent(error); -backgroundEventHandler.add(new ErrorBackgroundEvent(error)); -assertPeeked(event); -assertProcessThrows(error); -} - -@Test -public void testMultipleEvents() { -BackgroundEvent event1 = new ErrorBackgroundEvent(new RuntimeException("A")); -backgroundEventQueue.add(event1); -backgroundEventQueue.add(new ErrorBackgroundEvent(new RuntimeException("B"))); -backgroundEventQueue.add(new ErrorBackgroundEvent(new RuntimeException("C"))); - -assertPeeked(event1); -backgroundEventProcessor.process((event, error) -> { }); -assertTrue(backgroundEventQueue.isEmpty()); -} - -@Test -public void testMultipleErrorEvents() { -Throwable error1 = new Throwable("error1"); -KafkaException error2 = new KafkaException("error2"); -KafkaException error3 = new KafkaException("error3"); - -backgroundEventHandler.add(new ErrorBackgroundEvent(error1)); -backgroundEventHandler.add(new ErrorBackgroundEvent(error2)); -backgroundEventHandler.add(new ErrorBackgroundEvent(error3)); - -assertProcessThrows(new KafkaException(error1)); -} - -@Test -public void testMixedEventsWithErrorEvents() { -Throwable error1 = new Throwable("error1"); -KafkaException error2 = new KafkaException("error2"); -KafkaException error3 = new KafkaException("error3"); - -RuntimeException errorToCheck = new RuntimeException("A"); -backgroundEventQueue.add(new ErrorBackgroundEvent(errorToCheck)); -backgroundEventHandler.add(new ErrorBackgroundEvent(error1)); -backgroundEventQueue.add(new ErrorBackgroundEvent(new RuntimeException("B"))); -backgroundEventHandler.add(new ErrorBackgroundEvent(error2)); -backgroundEventQueue.add(new ErrorBackgroundEvent(new RuntimeException("C"))); -backgroundEventHandler.add(new ErrorBackgroundEvent(error3)); -backgroundEventQueue.add(new ErrorBackgroundEvent(new RuntimeException("D"))); - -assertProcessThrows(new KafkaException(errorToCheck)); -} - -private void assertPeeked(BackgroundEvent event) { -BackgroundEvent peekEvent = backgroundEventQueue.peek(); -assertNotNull(peekEvent); -assertEquals(event, peekEvent); -} - -private void assertProcessThrows(Throwable error) { -assertFalse(backgroundEventQueue.isEmpty()); - -try { -backgroundEventProcessor.process(); -fail("Should have thrown error: " + error); -} catch (Throwable t) { -assertEquals(error.getClass(), t.getClass()); -assertEquals(error.getMessage(), t.getMessage()); -} - -assertTrue(backgroundEventQueue.isEmpty()); -} +//@Test Review Comment: You need to have those commented out code in a draft PR, otherwise GitHub let's you not mark it as draft Jokes aside, yes they will be removed. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandlerTest.java: ## @@ -51,91 +61,91 @@ public void tearDown() { testBuilder.close(); } -@Test -public void testNoEvents() { -assertTrue(backgroundEventQueue.isEmpty()); -backgroundEventProcessor.process((event, error) -> { }); -assertTrue(backgroundEventQueue.isEmpty()); -} - -@Test -public void testSingleEvent() { -BackgroundEvent event = new ErrorBackgroundEvent(new RuntimeException("A")); -backgroundEventQueue.add(event); -assertPeeked(event); -backgroundEventProcessor.process((e, error) -> { }); -assertTrue(backgroundEventQueue.isEmpty()); -} - -@Test -public void testSingleErrorEvent() { -KafkaException error = new KafkaException("error"); -BackgroundEvent event = new
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
cadonna commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1412169763 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1365,7 +1482,11 @@ public KafkaConsumerMetrics kafkaConsumerMetrics() { private void maybeThrowFencedInstanceException() { if (isFenced) { throw new FencedInstanceIdException("Get fenced exception for group.instance.id " + Review Comment: Touché ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1365,7 +1482,11 @@ public KafkaConsumerMetrics kafkaConsumerMetrics() { private void maybeThrowFencedInstanceException() { if (isFenced) { throw new FencedInstanceIdException("Get fenced exception for group.instance.id " + Review Comment: Touché -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
cadonna commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1412168452 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -666,12 +766,16 @@ public Map committed(final Sethttps://github.com/apache/kafka/pull/14872 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
AndrewJSchofield commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1412158807 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1365,7 +1482,11 @@ public KafkaConsumerMetrics kafkaConsumerMetrics() { private void maybeThrowFencedInstanceException() { if (isFenced) { throw new FencedInstanceIdException("Get fenced exception for group.instance.id " + Review Comment: Please use the `ConsumerConfigs` constant for the `"group.instance.id"`. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition topicPartition) { } } +public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) { +this.groupMetadata = Optional.of(groupMetadata); +} + @Override public ConsumerGroupMetadata groupMetadata() { -throw new KafkaException("method not implemented"); +acquireAndEnsureOpen(); +try { +maybeThrowInvalidGroupIdException(); +backgroundEventProcessor.process(); +return groupMetadata.orElseThrow( +() -> new IllegalStateException("No group metadata found although a valid group ID exists. This is a bug!") Review Comment: You *know* that groupMetadata is present because of the earlier `maybeThrowInvalidGroupIdException`. I suppose one pattern would be to return an unwrapped `GroupMetadata` from maybeThrowInvalidGroupIdException so that you've eliminated the possibility of the `Optional` being empty. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -666,12 +766,16 @@ public Map committed(final Set { }); -assertTrue(backgroundEventQueue.isEmpty()); -} - -@Test -public void testSingleEvent() { -BackgroundEvent event = new ErrorBackgroundEvent(new RuntimeException("A")); -backgroundEventQueue.add(event); -assertPeeked(event); -backgroundEventProcessor.process((e, error) -> { }); -assertTrue(backgroundEventQueue.isEmpty()); -} - -@Test -public void testSingleErrorEvent() { -KafkaException error = new KafkaException("error"); -BackgroundEvent event = new ErrorBackgroundEvent(error); -backgroundEventHandler.add(new ErrorBackgroundEvent(error)); -assertPeeked(event); -assertProcessThrows(error); -} - -@Test -public void testMultipleEvents() { -BackgroundEvent event1 = new ErrorBackgroundEvent(new RuntimeException("A")); -backgroundEventQueue.add(event1); -backgroundEventQueue.add(new ErrorBackgroundEvent(new RuntimeException("B"))); -backgroundEventQueue.add(new ErrorBackgroundEvent(new RuntimeException("C"))); - -assertPeeked(event1); -backgroundEventProcessor.process((event, error) -> { }); -assertTrue(backgroundEventQueue.isEmpty()); -} - -@Test -public void testMultipleErrorEvents() { -Throwable error1 = new Throwable("error1"); -KafkaException error2 = new KafkaException("error2"); -KafkaException error3 = new KafkaException("error3"); - -backgroundEventHandler.add(new ErrorBackgroundEvent(error1)); -backgroundEventHandler.add(new ErrorBackgroundEvent(error2)); -backgroundEventHandler.add(new ErrorBackgroundEvent(error3)); - -assertProcessThrows(new KafkaException(error1)); -} - -@Test -public void testMixedEventsWithErrorEvents() { -Throwable error1 = new Throwable("error1"); -KafkaException error2 = new KafkaException("error2"); -KafkaException error3 = new KafkaException("error3"); - -RuntimeException errorToCheck = new RuntimeException("A"); -backgroundEventQueue.add(new ErrorBackgroundEvent(errorToCheck)); -backgroundEventHandler.add(new ErrorBackgroundEvent(error1)); -backgroundEventQueue.add(new ErrorBackgroundEvent(new RuntimeException("B"))); -backgroundEventHandler.add(new ErrorBackgroundEvent(error2)); -backgroundEventQueue.add(new ErrorBackgroundEvent(new RuntimeException("C"))); -backgroundEventHandler.add(new ErrorBackgroundEvent(error3)); -backgroundEventQueue.add(new ErrorBackgroundEvent(new RuntimeException("D"))); - -assertProcessThrows(new KafkaException(errorToCheck)); -} - -private void assertPeeked(BackgroundEvent event) { -BackgroundEvent peekEvent = backgroundEventQueue.peek(); -assertNotNull(peekEvent); -assertEquals(event, peekEvent); -} - -private void assertProcessThrows(Throwable error) { -assertFalse(backgroundEventQueue.isEmpty()); - -try { -backgroundEventProcessor.process(); -fail("Should have
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
cadonna commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1412156215 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java: ## @@ -49,4 +49,9 @@ public void add(BackgroundEvent event) { log.trace("Enqueued event: {}", event); backgroundEventQueue.add(event); } + +// Visible for testing +public Queue backgroundEventQueue() { +return backgroundEventQueue; +} Review Comment: I had to take painkillers to stand the pain when I added that method because I totally agree with you but I did not find another way. I will reconsider. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] HOTFIX: fix ConsistencyVectorIntegrationTest failure [kafka]
lucasbru opened a new pull request, #14895: URL: https://github.com/apache/kafka/pull/14895 #14735 changed the result for `KeyQuery` from `ValueAndTimestamp` to `V`, but forgot to update `ConsistencyVectorIntegrationTest` accordingly. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
cadonna commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1412153908 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition topicPartition) { } } +public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) { +this.groupMetadata = Optional.of(groupMetadata); +} + @Override public ConsumerGroupMetadata groupMetadata() { -throw new KafkaException("method not implemented"); +acquireAndEnsureOpen(); +try { +maybeThrowInvalidGroupIdException(); +backgroundEventProcessor.process(); Review Comment: OK, should then `backgroundEventProcessor.process()` only be called in `AsyncKafkaConsumer#poll()`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]
eduwercamacaro commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1412148951 ## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ## @@ -580,6 +582,23 @@ public void setGlobalStateRestoreListener(final StateRestoreListener globalState } } +/** + * Set the listener which is triggered whenever a standby task is updated + * + * @param standbyListener The listener triggered when a standby task is updated. + * @throws IllegalStateException if this {@code KafkaStreams} instance has already been started. + */ +public void setStandbyUpdateListener(final StandbyUpdateListener standbyListener) { Review Comment: IMO, in the context of this method, `standbyListener` makes more sense than `userStandbyListener`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
cadonna commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1412142819 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition topicPartition) { } } +public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) { Review Comment: That was a temporary change that I missed to remove. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
cadonna commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1412141829 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -398,6 +466,38 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { requestManagersSupplier); } +private Optional initializeGroupMetadata(final ConsumerConfig config, +final GroupRebalanceConfig groupRebalanceConfig) { +final Optional groupMetadata = initializeGroupMetadata( +groupRebalanceConfig.groupId, +groupRebalanceConfig.groupInstanceId +); +if (!groupMetadata.isPresent()) { +config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG); +config.ignore(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED); +} +return groupMetadata; +} + +private Optional initializeGroupMetadata(final String groupId, +final Optional groupInstanceId) { +if (groupId != null) { +if (groupId.isEmpty()) { +throwInInvalidGroupIdException(); Review Comment: Because the group ID cannot be empty if it is set. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14412: Better Rocks column family management [kafka]
nicktelford commented on PR #14852: URL: https://github.com/apache/kafka/pull/14852#issuecomment-1836167995 Sorry about the compile error. It should now build. Regarding tests: agreed. When KIP-892 lands, it adds an extra CF, but at that point RocksDBStoreTest and RocksDBTimestampedStoreTest will implicitly test that too, so I don't think additional tests are required. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14412: Better Rocks column family management [kafka]
nicktelford commented on code in PR #14852: URL: https://github.com/apache/kafka/pull/14852#discussion_r1412137956 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java: ## @@ -64,45 +62,27 @@ public RocksDBTimestampedStore(final String name, @Override void openRocksDB(final DBOptions dbOptions, final ColumnFamilyOptions columnFamilyOptions) { -final List columnFamilyDescriptors = asList( -new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions), -new ColumnFamilyDescriptor("keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8), columnFamilyOptions)); -final List columnFamilies = new ArrayList<>(columnFamilyDescriptors.size()); - -try { -db = RocksDB.open(dbOptions, dbDir.getAbsolutePath(), columnFamilyDescriptors, columnFamilies); -setDbAccessor(columnFamilies.get(0), columnFamilies.get(1)); -} catch (final RocksDBException e) { -if ("Column family not found: keyValueWithTimestamp".equals(e.getMessage())) { -try { -db = RocksDB.open(dbOptions, dbDir.getAbsolutePath(), columnFamilyDescriptors.subList(0, 1), columnFamilies); - columnFamilies.add(db.createColumnFamily(columnFamilyDescriptors.get(1))); -} catch (final RocksDBException fatal) { -throw new ProcessorStateException("Error opening store " + name + " at location " + dbDir.toString(), fatal); -} -setDbAccessor(columnFamilies.get(0), columnFamilies.get(1)); -} else { -throw new ProcessorStateException("Error opening store " + name + " at location " + dbDir.toString(), e); -} -} -} - -private void setDbAccessor(final ColumnFamilyHandle noTimestampColumnFamily, - final ColumnFamilyHandle withTimestampColumnFamily) { -final RocksIterator noTimestampsIter = db.newIterator(noTimestampColumnFamily); +final List columnFamilies = openRocksDB( +dbOptions, +new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions), +new ColumnFamilyDescriptor("keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8), columnFamilyOptions) +); +final ColumnFamilyHandle noTimestampCF = columnFamilies.get(0); +final ColumnFamilyHandle withTimestampCF = columnFamilies.get(1); Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14412: Better Rocks column family management [kafka]
cadonna commented on code in PR #14852: URL: https://github.com/apache/kafka/pull/14852#discussion_r1412123554 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java: ## @@ -64,45 +62,27 @@ public RocksDBTimestampedStore(final String name, @Override void openRocksDB(final DBOptions dbOptions, final ColumnFamilyOptions columnFamilyOptions) { -final List columnFamilyDescriptors = asList( -new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions), -new ColumnFamilyDescriptor("keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8), columnFamilyOptions)); -final List columnFamilies = new ArrayList<>(columnFamilyDescriptors.size()); - -try { -db = RocksDB.open(dbOptions, dbDir.getAbsolutePath(), columnFamilyDescriptors, columnFamilies); -setDbAccessor(columnFamilies.get(0), columnFamilies.get(1)); -} catch (final RocksDBException e) { -if ("Column family not found: keyValueWithTimestamp".equals(e.getMessage())) { -try { -db = RocksDB.open(dbOptions, dbDir.getAbsolutePath(), columnFamilyDescriptors.subList(0, 1), columnFamilies); - columnFamilies.add(db.createColumnFamily(columnFamilyDescriptors.get(1))); -} catch (final RocksDBException fatal) { -throw new ProcessorStateException("Error opening store " + name + " at location " + dbDir.toString(), fatal); -} -setDbAccessor(columnFamilies.get(0), columnFamilies.get(1)); -} else { -throw new ProcessorStateException("Error opening store " + name + " at location " + dbDir.toString(), e); -} -} -} - -private void setDbAccessor(final ColumnFamilyHandle noTimestampColumnFamily, - final ColumnFamilyHandle withTimestampColumnFamily) { -final RocksIterator noTimestampsIter = db.newIterator(noTimestampColumnFamily); +final List columnFamilies = openRocksDB( +dbOptions, +new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions), +new ColumnFamilyDescriptor("keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8), columnFamilyOptions) +); +final ColumnFamilyHandle noTimestampCF = columnFamilies.get(0); +final ColumnFamilyHandle withTimestampCF = columnFamilies.get(1); Review Comment: nit: Could you please spell out `ColumnFamily`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15842) Correct handling of KafkaConsumer.committed for new consumer
[ https://issues.apache.org/jira/browse/KAFKA-15842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Schofield resolved KAFKA-15842. -- Resolution: Fixed > Correct handling of KafkaConsumer.committed for new consumer > > > Key: KAFKA-15842 > URL: https://issues.apache.org/jira/browse/KAFKA-15842 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Minor > Labels: kip-848-client-support, kip-848-e2e, kip-848-preview > > KafkaConsumer.committed throws TimeOutException when there is no response. > The new consumer currently returns a null. Changing the new consumer to > behave like the old consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14412: Better Rocks column family management [kafka]
nicktelford commented on PR #14852: URL: https://github.com/apache/kafka/pull/14852#issuecomment-1836136670 @cadonna > Do we have unit tests in place that test all this logic? We don't have any tests dedicated to this, but `RocksDBStoreTest` and `RocksDBTimestampedStoreTest` implicitly test both the case that we only have the default CF, and the case that we have an extra CF (for the timestamped values). These tests fail spectacularly when any of this logic has a bug in it. Also, most of the integration tests break pretty badly when any of this is wrong. Are there any additional tests you'd like me to add? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15842: Correct handling of KafkaConsumer.committed for new consumer [kafka]
cadonna merged PR #14859: URL: https://github.com/apache/kafka/pull/14859 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15842: Correct handling of KafkaConsumer.committed for new consumer [kafka]
cadonna commented on code in PR #14859: URL: https://github.com/apache/kafka/pull/14859#discussion_r1412110677 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -663,6 +662,10 @@ public Map committed(final Set
Re: [PR] KAFKA-15280: Implement client support for KIP-848 server-side assignors [kafka]
lucasbru commented on PR #14878: URL: https://github.com/apache/kafka/pull/14878#issuecomment-1836125795 @cadonna Could you please review this? @lianetm This includes a small fix in the reconciliation logic. Please check it, nor sure if this is 100% correct. I had trouble getting partitions revoked from my consumer in the integration tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]
eduwercamacaro commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1412102373 ## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ## @@ -175,13 +176,15 @@ public class KafkaStreams implements AutoCloseable { private final KafkaClientSupplier clientSupplier; protected final TopologyMetadata topologyMetadata; private final QueryableStoreProvider queryableStoreProvider; +private final StandbyUpdateListener delegatingStandbyUpdateListener; GlobalStreamThread globalStreamThread; private KafkaStreams.StateListener stateListener; private StateRestoreListener globalStateRestoreListener; private boolean oldHandler; private BiConsumer streamsUncaughtExceptionHandler; private final Object changeThreadCount = new Object(); +private StandbyUpdateListener globalStandbyListener; Review Comment: I agree. Your proposal makes things easier to understand. Thanks! :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]
dajac commented on PR #14570: URL: https://github.com/apache/kafka/pull/14570#issuecomment-1836106444 Hey folks. It seems that this PR has introduced failures in trunk. From the last build: ``` Build / JDK 11 and Scala 2.13 / shouldHaveSamePositionBoundActiveAndStandBy – org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest 13s Build / JDK 11 and Scala 2.13 / shouldHaveSamePositionBoundActiveAndStandBy – org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest 5s Build / JDK 21 and Scala 2.13 / shouldHaveSamePositionBoundActiveAndStandBy – org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest 47s Build / JDK 21 and Scala 2.13 / shouldHaveSamePositionBoundActiveAndStandBy – org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest 12s Build / JDK 17 and Scala 2.13 / shouldHaveSamePositionBoundActiveAndStandBy – org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest 17s Build / JDK 17 and Scala 2.13 / shouldHaveSamePositionBoundActiveAndStandBy – org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest 15s Build / JDK 8 and Scala 2.12 / shouldHaveSamePositionBoundActiveAndStandBy – org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest 50s Build / JDK 8 and Scala 2.12 / shouldHaveSamePositionBoundActiveAndStandBy – org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest ``` Could you please double check? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]
aliehsaeedii commented on code in PR #14626: URL: https://github.com/apache/kafka/pull/14626#discussion_r1412093650 ## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java: ## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.query.ResultOrder; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.VersionedRecordIterator; +import org.rocksdb.Snapshot; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.ListIterator; +import java.util.Optional; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class LogicalSegmentIterator implements VersionedRecordIterator { Review Comment: > I remember that we also introduced ManagedIterator interface for versioned state store -- would we need to implement it, too? This is `ManagedKeyValueIterator` which is implementing `KeyValueIterator`. I think for this KIP, Keyvalue is not needed. >Should we extend AbstractIterator to share some common code? Makes everything more complicated I think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14516: [2/N] New Group Coordinator IT for Static Member Replacement [kafka]
dajac commented on code in PR #14882: URL: https://github.com/apache/kafka/pull/14882#discussion_r1412090130 ## core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala: ## @@ -137,6 +137,98 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { assertEquals(-1, consumerGroupHeartbeatResponse.data.memberEpoch) } + @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array( +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), +new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testRejoiningStaticMemberGetsAssignmentsBackWhenNewGroupCoordinatorIsEnabled(): Unit = { +val raftCluster = cluster.asInstanceOf[RaftClusterInstance] +val admin = cluster.createAdminClient() +val instanceId = "instanceId" + +// Creates the __consumer_offsets topics because it won't be created automatically +// in this test because it does not use FindCoordinator API. +TestUtils.createOffsetsTopicWithAdmin( + admin = admin, + brokers = raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala, + controllers = raftCluster.controllerServers().asScala.toSeq +) + +// Create the topic. +val topicId = TestUtils.createTopicWithAdminRaw( + admin = admin, + topic = "foo", + numPartitions = 3 +) + +// Heartbeat request so that a static member joins the group +var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() +.setGroupId("grp") +.setInstanceId(instanceId) +.setMemberEpoch(0) +.setRebalanceTimeoutMs(5 * 60 * 1000) +.setSubscribedTopicNames(List("foo").asJava) +.setTopicPartitions(List.empty.asJava) +).build() + +// This is the expected assignment. +val expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(List(new ConsumerGroupHeartbeatResponseData.TopicPartitions() +.setTopicId(topicId) +.setPartitions(List[Integer](0, 1, 2).asJava)).asJava) + +// Send the request until receiving a successful response. There is a delay +// here because the group coordinator is loaded in the background. +var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null +TestUtils.waitUntilTrue(() => { + consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest) + consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code +}, msg = s"Could not join the group successfully and get partitions assigned. Last response $consumerGroupHeartbeatResponse.") Review Comment: @vamossagar12 The issue is here. Basically, when the HB is processed by the group coordinator, the metadata of topic `foo` are not there yet so the assignment is empty. I think that you should HB here until you get the desired assignment. Keep in mind that you have to update the memberId and memberEpoch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14516: [2/N] New Group Coordinator IT for Static Member Replacement [kafka]
dajac commented on code in PR #14882: URL: https://github.com/apache/kafka/pull/14882#discussion_r1412090389 ## core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala: ## @@ -137,6 +137,98 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { assertEquals(-1, consumerGroupHeartbeatResponse.data.memberEpoch) } + @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array( +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), +new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testRejoiningStaticMemberGetsAssignmentsBackWhenNewGroupCoordinatorIsEnabled(): Unit = { +val raftCluster = cluster.asInstanceOf[RaftClusterInstance] +val admin = cluster.createAdminClient() +val instanceId = "instanceId" + +// Creates the __consumer_offsets topics because it won't be created automatically +// in this test because it does not use FindCoordinator API. +TestUtils.createOffsetsTopicWithAdmin( + admin = admin, + brokers = raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala, + controllers = raftCluster.controllerServers().asScala.toSeq +) + +// Create the topic. +val topicId = TestUtils.createTopicWithAdminRaw( + admin = admin, + topic = "foo", + numPartitions = 3 +) + +// Heartbeat request so that a static member joins the group +var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() +.setGroupId("grp") +.setInstanceId(instanceId) +.setMemberEpoch(0) +.setRebalanceTimeoutMs(5 * 60 * 1000) +.setSubscribedTopicNames(List("foo").asJava) +.setTopicPartitions(List.empty.asJava) +).build() + +// This is the expected assignment. +val expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(List(new ConsumerGroupHeartbeatResponseData.TopicPartitions() +.setTopicId(topicId) +.setPartitions(List[Integer](0, 1, 2).asJava)).asJava) + +// Send the request until receiving a successful response. There is a delay +// here because the group coordinator is loaded in the background. +var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null +TestUtils.waitUntilTrue(() => { + consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest) + consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code +}, msg = s"Could not join the group successfully and get partitions assigned. Last response $consumerGroupHeartbeatResponse.") + +// Verify the response. +assertNotNull(consumerGroupHeartbeatResponse.data.memberId) +assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch) +assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment) + +val oldMemberId = consumerGroupHeartbeatResponse.data.memberId + +// Leave the group temporarily +consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() +.setGroupId("grp") Review Comment: The member id should be set here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 closed pull request #14724: KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) URL: https://github.com/apache/kafka/pull/14724 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14516: [2/N] New Group Coordinator IT for Static Member Replacement [kafka]
vamossagar12 commented on PR #14882: URL: https://github.com/apache/kafka/pull/14882#issuecomment-1836078586 oh I see. Didn't realise that, sorry. The fact that I see this on the local as well means it's flaky.. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14516: [2/N] New Group Coordinator IT for Static Member Replacement [kafka]
dajac commented on PR #14882: URL: https://github.com/apache/kafka/pull/14882#issuecomment-1836072757 @vamossagar12 Hum... `ERROR Unexpected error handling org.apache.kafka.server.AssignmentsManager$DispatchEvent@3909ac4e` is not related to the group coordinator. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
dajac commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1412044990 ## core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala: ## @@ -153,6 +154,15 @@ class CoordinatorLoaderImpl[T]( } currentOffset = batch.nextOffset + val currentHighWatermark = log.highWatermark + if (currentOffset >= currentHighWatermark) { +coordinator.updateLastWrittenOffset(currentOffset) + } + + if (currentHighWatermark > previousHighWatermark) { +coordinator.updateLastCommittedOffset(currentHighWatermark) +previousHighWatermark = currentHighWatermark + } Review Comment: I think that it works. However, it is worth pointing out that the last committed offset could be higher than the last written offset in the state machine until records past it are read. ## core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala: ## @@ -366,6 +370,110 @@ class CoordinatorLoaderImplTest { } } + @Test + def testUpdateLastWrittenOffsetOnBatchLoaded(): Unit = { +val tp = new TopicPartition("foo", 0) +val replicaManager = mock(classOf[ReplicaManager]) +val serde = new StringKeyValueDeserializer +val log = mock(classOf[UnifiedLog]) +val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) + +TestUtils.resource(new CoordinatorLoaderImpl[(String, String)]( + time = Time.SYSTEM, + replicaManager = replicaManager, + deserializer = serde, + loadBufferSize = 1000 +)) { loader => + when(replicaManager.getLog(tp)).thenReturn(Some(log)) + when(log.logStartOffset).thenReturn(0L) + when(log.highWatermark).thenReturn(0L).thenReturn(0L).thenReturn(2L) + when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(7L)) + + val readResult1 = logReadResult(startOffset = 0, records = Seq( +new SimpleRecord("k1".getBytes, "v1".getBytes), +new SimpleRecord("k2".getBytes, "v2".getBytes) + )) + + when(log.read( +startOffset = 0L, +maxLength = 1000, +isolation = FetchIsolation.LOG_END, +minOneMessage = true + )).thenReturn(readResult1) + + val readResult2 = logReadResult(startOffset = 2, records = Seq( +new SimpleRecord("k3".getBytes, "v3".getBytes), +new SimpleRecord("k4".getBytes, "v4".getBytes), +new SimpleRecord("k5".getBytes, "v5".getBytes) + )) + + when(log.read( +startOffset = 2L, +maxLength = 1000, +isolation = FetchIsolation.LOG_END, +minOneMessage = true + )).thenReturn(readResult2) + + val readResult3 = logReadResult(startOffset = 5, records = Seq( +new SimpleRecord("k6".getBytes, "v6".getBytes), +new SimpleRecord("k7".getBytes, "v7".getBytes) + )) + + when(log.read( +startOffset = 5L, +maxLength = 1000, +isolation = FetchIsolation.LOG_END, +minOneMessage = true + )).thenReturn(readResult3) + + assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS)) + + verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1")) + verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2")) + verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3")) + verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4")) + verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5")) + verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k6", "v6")) + verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k7", "v7")) + verify(coordinator, times(0)).updateLastWrittenOffset(0) + verify(coordinator, times(1)).updateLastWrittenOffset(2) + verify(coordinator, times(1)).updateLastWrittenOffset(5) + verify(coordinator, times(1)).updateLastWrittenOffset(7) + verify(coordinator, times(1)).updateLastCommittedOffset(0) + verify(coordinator, times(1)).updateLastCommittedOffset(2) + verify(coordinator, times(0)).updateLastCommittedOffset(5) +} + } + + @Test + def testUpdateLastWrittenOffsetAndUpdateLastCommittedOffsetNoRecordsRead(): Unit = { +val tp = new TopicPartition("foo", 0) +val replicaManager = mock(classOf[ReplicaManager]) +val serde = new StringKeyValueDeserializer +val log = mock(classOf[UnifiedLog]) +val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) + +TestUtils.resource(new CoordinatorLoaderImpl[(String, String)]( + time = Time.SYSTEM, + replicaManager = replicaManager, + deserializer = serde, +
Re: [PR] KAFKA-14412: Better Rocks column family management [kafka]
cadonna commented on code in PR #14852: URL: https://github.com/apache/kafka/pull/14852#discussion_r1412057240 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java: ## @@ -278,18 +280,74 @@ private void addValueProvidersToMetricsRecorder() { void openRocksDB(final DBOptions dbOptions, final ColumnFamilyOptions columnFamilyOptions) { -final List columnFamilyDescriptors -= Collections.singletonList(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions)); -final List columnFamilies = new ArrayList<>(columnFamilyDescriptors.size()); +final List columnFamilies = openRocksDB( +dbOptions, +new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions) +); + +dbAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0)); +} + +/** + * Open RocksDB while automatically creating any requested column families that don't yet exist. + */ +protected List openRocksDB(final DBOptions dbOptions, + final ColumnFamilyDescriptor defaultColumnFamilyDescriptor, + final ColumnFamilyDescriptor... columnFamilyDescriptors) { +final String absolutePath = dbDir.getAbsolutePath(); +final List extraDescriptors = Arrays.asList(columnFamilyDescriptors); +final List allDescriptors = new ArrayList<>(1 + columnFamilyDescriptors.length); +allDescriptors.add(defaultColumnFamilyDescriptor); +allDescriptors.addAll(extraDescriptors); try { -db = RocksDB.open(dbOptions, dbDir.getAbsolutePath(), columnFamilyDescriptors, columnFamilies); -dbAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0)); +final Options options = new Options(dbOptions, defaultColumnFamilyDescriptor.getOptions()); +final List allExisting = RocksDB.listColumnFamilies(options, absolutePath); + +final List existingDescriptors = allDescriptors.stream() +.filter(descriptor -> descriptor == defaultColumnFamilyDescriptor || allExisting.stream().anyMatch(existing -> Arrays.equals(existing, descriptor.getName( +.collect(Collectors.toList()); Review Comment: nit: Just a proposal for better readability: ```suggestion final List existingDescriptors = new LinkedList<>(); existingDescriptors.add(defaultColumnFamilyDescriptor); existingDescriptors.addAll(allDescriptors.stream() .filter(descriptor -> allExisting.stream().anyMatch(existing -> Arrays.equals(existing, descriptor.getName( .collect(Collectors.toList() ); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]
aliehsaeedii commented on code in PR #14626: URL: https://github.com/apache/kafka/pull/14626#discussion_r1412048452 ## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java: ## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.query.ResultOrder; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.VersionedRecordIterator; +import org.rocksdb.Snapshot; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.ListIterator; +import java.util.Optional; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class LogicalSegmentIterator implements VersionedRecordIterator { +protected final ListIterator segmentIterator; +private final Bytes key; +private final Long fromTime; +private final Long toTime; +private final ResultOrder order; +protected ListIterator> iterator; + +// defined for creating/releasing the snapshot. +private LogicalKeyValueSegment snapshotOwner; +private Snapshot snapshot; + + + +public LogicalSegmentIterator(final ListIterator segmentIterator, + final Bytes key, + final Long fromTime, + final Long toTime, + final ResultOrder order) { + +this.segmentIterator = segmentIterator; +this.key = key; +this.fromTime = fromTime; +this.toTime = toTime; +this.iterator = Collections.emptyListIterator(); +this.order = order; +this.snapshot = null; +this.snapshotOwner = null; +} + +@Override +public void close() { +// user may refuse consuming all returned records, so release the snapshot when closing the iterator if it is not released yet! +releaseSnapshot(); +} + +@Override +public boolean hasNext() { +return iterator.hasNext() || maybeFillIterator(); +} + +@Override +public Object next() { +if (hasNext()) { +return iterator.next(); +} +return null; +} +private boolean maybeFillIterator() { + +final List> queryResults = new ArrayList<>(); +while (segmentIterator.hasNext()) { +final LogicalKeyValueSegment segment = segmentIterator.next(); + +if (snapshot == null) { // create the snapshot (this will happen only one time). +this.snapshotOwner = segment; +// take a RocksDB snapshot to return the segments content at the query time (in order to guarantee consistency) +final Lock lock = new ReentrantLock(); Review Comment: > because we can get a single snapshot when the iterator is creates You mean I get the snapshot in `RocksDBVersionedStore` and pass it to the `LogicalSegmentIterator`? This way, releasing the snapshot is problematic. How to release it? An object of `LogicalKeyValueSegment` must release it. That's why I introduced the `snapshotOwner` field. > , but use the same snaphot throughout the lifetime of the iterator? At the moment, I am using the same snapshot throughout the whole lifetime of the iterator. As I mentioned in the code comments, taking the snapshot is done just once, and it will be released only with explicit closing of the iterator or when the iterator is empty. > all of them have the same physical RocksDB instance under the hood?/ That's true. That's why a random segment, which can be the `latestValueStore` or the oldest segment (when order is ascending), creates and releases the single snapshot for being used through the whole lifetime of the iterator. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please
Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]
aliehsaeedii commented on code in PR #14626: URL: https://github.com/apache/kafka/pull/14626#discussion_r1412048452 ## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java: ## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.query.ResultOrder; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.VersionedRecordIterator; +import org.rocksdb.Snapshot; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.ListIterator; +import java.util.Optional; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class LogicalSegmentIterator implements VersionedRecordIterator { +protected final ListIterator segmentIterator; +private final Bytes key; +private final Long fromTime; +private final Long toTime; +private final ResultOrder order; +protected ListIterator> iterator; + +// defined for creating/releasing the snapshot. +private LogicalKeyValueSegment snapshotOwner; +private Snapshot snapshot; + + + +public LogicalSegmentIterator(final ListIterator segmentIterator, + final Bytes key, + final Long fromTime, + final Long toTime, + final ResultOrder order) { + +this.segmentIterator = segmentIterator; +this.key = key; +this.fromTime = fromTime; +this.toTime = toTime; +this.iterator = Collections.emptyListIterator(); +this.order = order; +this.snapshot = null; +this.snapshotOwner = null; +} + +@Override +public void close() { +// user may refuse consuming all returned records, so release the snapshot when closing the iterator if it is not released yet! +releaseSnapshot(); +} + +@Override +public boolean hasNext() { +return iterator.hasNext() || maybeFillIterator(); +} + +@Override +public Object next() { +if (hasNext()) { +return iterator.next(); +} +return null; +} +private boolean maybeFillIterator() { + +final List> queryResults = new ArrayList<>(); +while (segmentIterator.hasNext()) { +final LogicalKeyValueSegment segment = segmentIterator.next(); + +if (snapshot == null) { // create the snapshot (this will happen only one time). +this.snapshotOwner = segment; +// take a RocksDB snapshot to return the segments content at the query time (in order to guarantee consistency) +final Lock lock = new ReentrantLock(); Review Comment: > because we can get a single snapshot when the iterator is creates You mean I get the snapshot in `RocksDBVersionedStore` and pass it to the `LogicalSegmentIterator`? This way, releasing the snapshot is problematic. How to release it? An object of `LogicalKeyValueSegment` must release it. That's why I introduced the `snapshotOwner` field. > , but use the same snaphot throughout the lifetime of the iterator? At the moment, I am using the same snapshot throughout the whole lifetime of the iterator. As I mentioned in the code comments, taking the snapshot is done just once, and it will be released only with explicit closing of the iterator or when the iterator is empty. ## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java: ## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *
Re: [PR] KAFKA-14412: Generalise over RocksDB WriteBatch [kafka]
nicktelford commented on PR #14853: URL: https://github.com/apache/kafka/pull/14853#issuecomment-1836027902 @mjsax @ableegoldman @lucasbru @wcarlson5 @bbejeck @vvcephei @guozhangwang -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add junit properties to display parameterized test names [kafka]
cadonna commented on code in PR #14687: URL: https://github.com/apache/kafka/pull/14687#discussion_r1412039372 ## streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java: ## @@ -239,10 +240,14 @@ public static String safeUniqueTestName(final Class testClass, final TestName * Used by tests migrated to JUnit 5. */ public static String safeUniqueTestName(final Class testClass, final TestInfo testInfo) { -final String displayName = testInfo.getDisplayName(); final String methodName = testInfo.getTestMethod().map(Method::getName).orElse("unknownMethodName"); -final String testName = displayName.contains(methodName) ? methodName : methodName + displayName; -return safeUniqueTestName(testClass, testName); +// Generate a random uuid without an `-`. The `-` is used in Streams' thread name as +// a separator and some tests rely on this. +String randomUuid; +do { +randomUuid = Uuid.randomUuid().toString(); +} while (randomUuid.contains("-")); Review Comment: Why not just replacing `-` with a different character? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14412: Better Rocks column family management [kafka]
nicktelford commented on PR #14852: URL: https://github.com/apache/kafka/pull/14852#issuecomment-1836027702 @mjsax @ableegoldman @lucasbru @wcarlson5 @bbejeck @vvcephei @guozhangwang I was recommended to tag you all in these Kafka Streams PRs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15140) Improve TopicCommandIntegrationTest to be less flaky
[ https://issues.apache.org/jira/browse/KAFKA-15140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17792025#comment-17792025 ] Owen C.H. Leung commented on KAFKA-15140: - I've created a PR to fix it : https://github.com/apache/kafka/pull/14891 > Improve TopicCommandIntegrationTest to be less flaky > > > Key: KAFKA-15140 > URL: https://issues.apache.org/jira/browse/KAFKA-15140 > Project: Kafka > Issue Type: Test > Components: unit tests >Reporter: Divij Vaidya >Assignee: Lan Ding >Priority: Minor > Labels: flaky-test, newbie > Fix For: 3.5.1, 3.7.0 > > > *This is a good Jira for folks who are new to contributing to Kafka.* > Tests in TopicCommandIntegrationTest get flaky from time to time. The > objective of the task is to make them more robust by doing the following: > 1. Replace the usage {-}createAndWaitTopic{-}() adminClient.createTopics() > method and other places where were are creating a topic (without waiting) > with > TestUtils.createTopicWithAdmin(). The latter method already contains the > functionality to create a topic and wait for metadata to sync up. > 2. Replace the number 6 at places such as > "adminClient.createTopics( > Collections.singletonList(new NewTopic("foo_bar", 1, 6.toShort)))" with a > meaningful constant. > 3. Add logs if an assertion fails, for example, lines such as " > assertTrue(rows(0).startsWith(s"\tTopic: $testTopicName"), output)" should > have a third argument which prints the actual output printed so that we can > observe in the test logs on what was the output when assertion failed. > 4. Replace occurrences of "\n" with System.lineSeparator() which is platform > independent > 5. We should wait for reassignment to complete whenever we are re-assigning > partitions using alterconfig before we call describe to validate it. We could > use > TestUtils.waitForAllReassignmentsToComplete() > *Motivation of this task* > Try to fix the flaky test behaviour such as observed in > [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13924/5/testReport/junit/kafka.admin/TopicCommandIntegrationTest/Build___JDK_11_and_Scala_2_13___testDescribeUnderMinIsrPartitionsMixed_String__quorum_zk/] > > {noformat} > org.opentest4j.AssertionFailedError: expected: but was: > at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31) > at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:180) > at > app//kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed(TopicCommandIntegrationTest.scala:794){noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax commented on code in PR #14864: URL: https://github.com/apache/kafka/pull/14864#discussion_r1412011082 ## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ## @@ -1791,6 +1795,146 @@ protected int processStreamThread(final Consumer consumer) { return copy.size(); } +/** + * Returns the internal clients' assigned {@code client instance ids}. + * + * @return The internal clients' assigned instance ids used for metrics collection. + * + * @throws IllegalArgumentException If {@code timeout} is negative. Review Comment: Added this (in alignment to `consumer/producer/admin$clientInstanceId()` -- KIP needs to be updated accordingly ## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ## @@ -1791,6 +1795,146 @@ protected int processStreamThread(final Consumer consumer) { return copy.size(); } +/** + * Returns the internal clients' assigned {@code client instance ids}. + * + * @return The internal clients' assigned instance ids used for metrics collection. + * + * @throws IllegalArgumentException If {@code timeout} is negative. + * @throws IllegalStateException If {@code KafkaStreams} is not running. + * @throws TimeoutException Indicates that a request timed out. + * @throws StreamsException For any other error that might occur. + */ +public ClientInstanceIds clientInstanceIds(final Duration timeout) { +if (timeout.isNegative()) { +throw new IllegalArgumentException("The timeout cannot be negative."); +} +if (state().hasNotStarted()) { +throw new IllegalStateException("KafkaStreams has not been started, you can retry after calling start()."); +} +if (state().isShuttingDown() || state.hasCompletedShutdown()) { +throw new IllegalStateException("KafkaStreams has been stopped (" + state + ")."); +} + +final ClientInstanceIdsImpl clientInstanceIds = new ClientInstanceIdsImpl(); + +// (1) fan-out calls to threads + +// StreamThread for main/restore consumers and producer(s) +final Map> consumerFutures = new HashMap<>(); +final Map>>> producerFutures = new HashMap<>(); +for (final StreamThread streamThread : threads) { + consumerFutures.putAll(streamThread.consumerClientInstanceIds(timeout)); +producerFutures.put(streamThread.getName(), streamThread.producersClientInstanceIds(timeout)); +} +// GlobalThread +KafkaFuture globalThreadFuture = null; +if (globalStreamThread != null) { +globalThreadFuture = globalStreamThread.globalConsumerInstanceId(timeout); +} + +// (2) get admin client instance id in a blocking fashion, while Stream/GlobalThreads work in parallel +try { + clientInstanceIds.setAdminInstanceId(adminClient.clientInstanceId(timeout)); +} catch (final IllegalStateException telemetryDisabledError) { Review Comment: This is new, base on other PRs from KIP-714 -- `adminClient.clientInstanceId` throw is telemetry is disable -- for this case, we might not want to throw, but return a "partial" result... ## streams/src/main/java/org/apache/kafka/streams/internals/ClientInstanceIdsImpl.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.internals; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.streams.ClientInstanceIds; + +import java.util.HashMap; +import java.util.Map; + +public class ClientInstanceIdsImpl implements ClientInstanceIds { +private final Map consumerInstanceIds = new HashMap<>(); +private final Map producerInstanceIds = new HashMap<>(); +private Uuid adminInstanceId; + +public void addConsumerInstanceId(final String key, final Uuid instanceId) { +consumerInstanceIds.put(key, instanceId); +} + +public void addProducerInstanceId(final String key, final Uuid instanceId) { +producerInstanceIds.put(key, instanceId); +} + +
Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]
soarez commented on PR #14838: URL: https://github.com/apache/kafka/pull/14838#issuecomment-1836006504 @cmccabe @rondagostino @pprovenzano please take a look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14516: [2/N] New Group Coordinator IT for Static Member Replacement [kafka]
vamossagar12 commented on PR #14882: URL: https://github.com/apache/kafka/pull/14882#issuecomment-1836000366 @dajac , I ran the test a few times on my laptop. It passes most of the times but sometimes fails with ```[2023-12-01 17:21:23,530] WARN [QuorumController id=3000] Performing controller activation. The metadata log appears to be empty. Appending 1 bootstrap record(s) in metadata transaction at metadata.version 3.7-IV3 from bootstrap source 'testkit'. Setting the ZK migration state to NONE since this is a de-novo KRaft cluster. (org.apache.kafka.controller.QuorumController:108) [2023-12-01 17:21:24,488] ERROR [GroupCoordinator id=0] Execution of UpdateImage(tp=__consumer_offsets-0, offset=22) failed due to This is not the correct coordinator.. (org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime:1018) org.apache.kafka.common.errors.NotCoordinatorException: This is not the correct coordinator. [2023-12-01 17:21:24,628] ERROR Unexpected error handling org.apache.kafka.server.AssignmentsManager$DispatchEvent@3909ac4e (org.apache.kafka.server.AssignmentsManager:117) java.util.concurrent.RejectedExecutionException: The event queue is shutting down at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:253) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) at java.lang.Thread.run(Thread.java:750) [2023-12-01 17:21:25,077] WARN [NodeToControllerChannelManager id=3000 name=registration] Attempting to close NetworkClient that has already been closed. (org.apache.kafka.clients.NetworkClient:667) expected: but was: Expected :Assignment(topicPartitions=[TopicPartitions(topicId=x5K2iSzBSqmTlJQ8Qfq8sg, partitions=[0, 1, 2])]) Actual :Assignment(topicPartitions=[]) org.opentest4j.AssertionFailedError: expected: but was: at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197) at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177) at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1141) at kafka.server.ConsumerGroupHeartbeatRequestTest.testRejoiningStaticMemberGetsAssignmentsBackWhenNewGroupCoordinatorIsEnabled(ConsumerGroupHeartbeatRequestTest.scala:193) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) at org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:45) at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:94) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218) at
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax commented on code in PR #14864: URL: https://github.com/apache/kafka/pull/14864#discussion_r1412009752 ## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ## @@ -1791,6 +1794,74 @@ protected int processStreamThread(final Consumer consumer) { return copy.size(); } +/** + * Returns the internal clients' assigned {@code client instance ids}. + * + * @return the internal clients' assigned instance ids used for metrics collection. + * + * @throws IllegalStateException If {@code KafkaStreams} is not running. + * @throws TimeoutException Indicates that a request timed out. + */ +public ClientInstanceIds clientInstanceIds(final Duration timeout) { +if (state().hasNotStarted()) { +throw new IllegalStateException("KafkaStreams has not been started, you can retry after calling start()."); +} +if (state().isShuttingDown() || state.hasCompletedShutdown()) { +throw new IllegalStateException("KafkaStreams has been stopped (" + state + ")."); +} + +final ClientInstanceIdsImpl clientInstanceIds = new ClientInstanceIdsImpl(); + +final Map> streamThreadFutures = new HashMap<>(); +for (final StreamThread streamThread : threads) { + streamThreadFutures.putAll(streamThread.clientInstanceIds(timeout)); +} + +KafkaFuture globalThreadFuture = null; +if (globalStreamThread != null) { +globalThreadFuture = globalStreamThread.globalConsumerInstanceId(timeout); +} + +try { + clientInstanceIds.setAdminInstanceId(adminClient.clientInstanceId(timeout)); +} catch (final TimeoutException timeoutException) { +log.warn("Could not get admin client-instance-id due to timeout."); +} + +for (final Map.Entry> streamThreadFuture : streamThreadFutures.entrySet()) { +try { +clientInstanceIds.addConsumerInstanceId( +streamThreadFuture.getKey(), +streamThreadFuture.getValue().get() +); +} catch (final ExecutionException exception) { +if (exception.getCause() instanceof TimeoutException) { +log.warn("Could not get global consumer client-instance-id due to timeout."); +} else { +log.error("Could not get global consumer client-instance-id", exception); +} +} catch (final InterruptedException error) { +log.error("Could not get global consumer client-instance-id", error); +} +} + +if (globalThreadFuture != null) { +try { + clientInstanceIds.addConsumerInstanceId(globalStreamThread.getName(), globalThreadFuture.get()); +} catch (final ExecutionException exception) { +if (exception.getCause() instanceof TimeoutException) { +log.warn("Could not get global consumer client-instance-id due to timeout."); +} else { +log.error("Could not get global consumer client-instance-id", exception); +} +} catch (final InterruptedException error) { +log.error("Could not get global consumer client-instance-id", error); +} +} + +return clientInstanceIds; Review Comment: Does it buy us much? -- I actually would like to prefer somewhat better error handing, and better error messages. Did a larger rewrite of this. Let me know what you think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 commented on PR #14724: URL: https://github.com/apache/kafka/pull/14724#issuecomment-1835985666 > Hi @apoorvmittal10 thanks for writing this PR - also sorry for the delay but I made the first pass and left some stylistic comments. Hi @philipnee Thanks for the review. I have addressed the comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]
aliehsaeedii commented on code in PR #14626: URL: https://github.com/apache/kafka/pull/14626#discussion_r1411969845 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ## @@ -266,75 +269,28 @@ public VersionedRecord get(final Bytes key, final long asOfTimestamp) { return null; } -public VersionedRecordIterator get(final Bytes key, final long fromTimestamp, final long toTimestamp, final boolean isAscending) { - -Objects.requireNonNull(key, "key cannot be null"); +@SuppressWarnings("unchecked") +protected VersionedRecordIterator get(final Bytes key, final long fromTimestamp, final long toTimestamp, final ResultOrder order) { Review Comment: > Can we make it `private` (or package private)? yes `package-private` is possible. `private` is not possible caz in `StoreQueryUtils`, we call it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]
aliehsaeedii commented on code in PR #14626: URL: https://github.com/apache/kafka/pull/14626#discussion_r1411961568 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ## @@ -266,75 +269,28 @@ public VersionedRecord get(final Bytes key, final long asOfTimestamp) { return null; } -public VersionedRecordIterator get(final Bytes key, final long fromTimestamp, final long toTimestamp, final boolean isAscending) { - -Objects.requireNonNull(key, "key cannot be null"); +@SuppressWarnings("unchecked") Review Comment: > Why do we need this suppression? Because of the following warning when creating a `LogicalSegmentIterator` object ([line# 280](https://github.com/aliehsaeedii/kafka/blob/30f8d197319359074924b25041f63387484dbc08/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java#L280), and [line# 293](https://github.com/aliehsaeedii/kafka/blob/30f8d197319359074924b25041f63387484dbc08/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java#L293)) `Unchecked assignment: 'org.apache.kafka.streams.state.internals.LogicalSegmentIterator' to 'org.apache.kafka.streams.state.VersionedRecordIterator' ` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1411951695 ## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java: ## @@ -0,0 +1,952 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender; +import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils; +import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricKeyable; +import org.apache.kafka.common.telemetry.internals.MetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricsEmitter; +import org.apache.kafka.common.telemetry.internals.SinglePointMetric; +import org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.StringJoiner; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +/** + * The implementation of the {@link MetricsReporter} for client telemetry which manages the life-cycle + * of the client telemetry collection process. The client telemetry reporter is responsible for + * collecting the client telemetry data and sending it to the broker. + * + * + * The client telemetry reporter is configured with a {@link ClientTelemetrySender} which is + * responsible for sending the client telemetry data to the broker. The client telemetry reporter + * will attempt to fetch the telemetry subscription information from the broker and send the + * telemetry data to the broker based on the subscription information. + * + * + * The full life-cycle of the metric collection process is defined by a state machine in + * {@link ClientTelemetryState}. Each state is associated with a different set of operations. + * For example, the client telemetry reporter will attempt to fetch the telemetry subscription + * from the broker when in the {@link ClientTelemetryState#SUBSCRIPTION_NEEDED} state. + * If the push operation fails, the client telemetry reporter will attempt to re-fetch the + * subscription information by setting the state back to {@link ClientTelemetryState#SUBSCRIPTION_NEEDED}. + * + * + * In an unlikely scenario,
Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]
aliehsaeedii commented on code in PR #14626: URL: https://github.com/apache/kafka/pull/14626#discussion_r1411947974 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ## @@ -136,11 +139,11 @@ public long put(final Bytes key, final byte[] value, final long timestamp) { observedStreamTime = Math.max(observedStreamTime, timestamp); final long foundTs = doPut( -versionedStoreClient, -observedStreamTime, -key, -value, -timestamp +versionedStoreClient, Review Comment: > nit: avoid unnecessary reformation / intention changes -- similar below. > > 80% of diff in this file is just adding/removing spaces... very noisy Sorry, resolving the conflicts made a total mess! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix flaky test RefreshingHttpsJwksTest.testBasicScheduleRefresh [kafka]
ashwinpankaj commented on code in PR #14888: URL: https://github.com/apache/kafka/pull/14888#discussion_r1411942462 ## clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java: ## @@ -62,14 +62,14 @@ public class RefreshingHttpsJwksTest extends OAuthBearerTest { @Test public void testBasicScheduleRefresh() throws Exception { String keyId = "abc123"; -Time time = new MockTime(); +MockTime time = new MockTime(); HttpsJwks httpsJwks = spyHttpsJwks(); -try (RefreshingHttpsJwks refreshingHttpsJwks = getRefreshingHttpsJwks(time, httpsJwks)) { +try (RefreshingHttpsJwks refreshingHttpsJwks = getRefreshingHttpsJwks(time, httpsJwks, mockExecutorService(time))) { Review Comment: ```suggestion // we use mocktime here to ensure that scheduled refresh _doesn't_ run and update the invocation count // we expect httpsJwks.refresh() to be invoked twice, once from init() and maybeExpediteRefresh() each try (RefreshingHttpsJwks refreshingHttpsJwks = getRefreshingHttpsJwks(time, httpsJwks, mockExecutorService(time))) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15831: KIP-1000 protocol and admin client [kafka]
AndrewJSchofield opened a new pull request, #14894: URL: https://github.com/apache/kafka/pull/14894 This adds the new ListClientMetricsResources RPC to the Kafka protocol and puts support into the Kafka admin client. The broker-side implementation in this PR is just to return an empty list. A future PR will obtain the list from the config store. Includes a few unit tests for what is a very simple RPC. There are additional tests already written and waiting for the PR that delivers the kafka-client-metrics.sh tool which builds on this PR. This PR supersedes https://github.com/apache/kafka/pull/14811. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix flaky test RefreshingHttpsJwksTest.testBasicScheduleRefresh [kafka]
ashwinpankaj commented on code in PR #14888: URL: https://github.com/apache/kafka/pull/14888#discussion_r1411930564 ## clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java: ## @@ -62,14 +62,14 @@ public class RefreshingHttpsJwksTest extends OAuthBearerTest { @Test public void testBasicScheduleRefresh() throws Exception { String keyId = "abc123"; -Time time = new MockTime(); +MockTime time = new MockTime(); HttpsJwks httpsJwks = spyHttpsJwks(); -try (RefreshingHttpsJwks refreshingHttpsJwks = getRefreshingHttpsJwks(time, httpsJwks)) { +try (RefreshingHttpsJwks refreshingHttpsJwks = getRefreshingHttpsJwks(time, httpsJwks, mockExecutorService(time))) { refreshingHttpsJwks.init(); verify(httpsJwks, times(1)).refresh(); assertTrue(refreshingHttpsJwks.maybeExpediteRefresh(keyId)); -verify(httpsJwks, times(1)).refresh(); +verify(httpsJwks, times(2)).refresh(); Review Comment: Thanks @splett2 - so the assertion would fail whenever maybeExpediteRefresh actually ran !! nit: do you want to add a comment mentioning that mocktime is used to ensure that scheduled refresh doesn't run even once, thereby changing the invocation count ? LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix flaky test RefreshingHttpsJwksTest.testBasicScheduleRefresh [kafka]
ashwinpankaj commented on code in PR #14888: URL: https://github.com/apache/kafka/pull/14888#discussion_r1411930564 ## clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java: ## @@ -62,14 +62,14 @@ public class RefreshingHttpsJwksTest extends OAuthBearerTest { @Test public void testBasicScheduleRefresh() throws Exception { String keyId = "abc123"; -Time time = new MockTime(); +MockTime time = new MockTime(); HttpsJwks httpsJwks = spyHttpsJwks(); -try (RefreshingHttpsJwks refreshingHttpsJwks = getRefreshingHttpsJwks(time, httpsJwks)) { +try (RefreshingHttpsJwks refreshingHttpsJwks = getRefreshingHttpsJwks(time, httpsJwks, mockExecutorService(time))) { refreshingHttpsJwks.init(); verify(httpsJwks, times(1)).refresh(); assertTrue(refreshingHttpsJwks.maybeExpediteRefresh(keyId)); -verify(httpsJwks, times(1)).refresh(); +verify(httpsJwks, times(2)).refresh(); Review Comment: Thanks @splett2 - so the assertion would fail whenever maybeExpediteRefresh actually ran ! LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1411905307 ## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java: ## @@ -0,0 +1,952 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender; +import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils; +import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricKeyable; +import org.apache.kafka.common.telemetry.internals.MetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricsEmitter; +import org.apache.kafka.common.telemetry.internals.SinglePointMetric; +import org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.StringJoiner; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +/** + * The implementation of the {@link MetricsReporter} for client telemetry which manages the life-cycle + * of the client telemetry collection process. The client telemetry reporter is responsible for + * collecting the client telemetry data and sending it to the broker. + * + * + * The client telemetry reporter is configured with a {@link ClientTelemetrySender} which is + * responsible for sending the client telemetry data to the broker. The client telemetry reporter + * will attempt to fetch the telemetry subscription information from the broker and send the + * telemetry data to the broker based on the subscription information. + * + * + * The full life-cycle of the metric collection process is defined by a state machine in + * {@link ClientTelemetryState}. Each state is associated with a different set of operations. + * For example, the client telemetry reporter will attempt to fetch the telemetry subscription + * from the broker when in the {@link ClientTelemetryState#SUBSCRIPTION_NEEDED} state. + * If the push operation fails, the client telemetry reporter will attempt to re-fetch the + * subscription information by setting the state back to {@link ClientTelemetryState#SUBSCRIPTION_NEEDED}. + * + * + * In an unlikely scenario,
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1411901649 ## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java: ## @@ -0,0 +1,952 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender; +import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils; +import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricKeyable; +import org.apache.kafka.common.telemetry.internals.MetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricsEmitter; +import org.apache.kafka.common.telemetry.internals.SinglePointMetric; +import org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.StringJoiner; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +/** + * The implementation of the {@link MetricsReporter} for client telemetry which manages the life-cycle + * of the client telemetry collection process. The client telemetry reporter is responsible for + * collecting the client telemetry data and sending it to the broker. + * + * + * The client telemetry reporter is configured with a {@link ClientTelemetrySender} which is + * responsible for sending the client telemetry data to the broker. The client telemetry reporter + * will attempt to fetch the telemetry subscription information from the broker and send the + * telemetry data to the broker based on the subscription information. + * + * + * The full life-cycle of the metric collection process is defined by a state machine in + * {@link ClientTelemetryState}. Each state is associated with a different set of operations. + * For example, the client telemetry reporter will attempt to fetch the telemetry subscription + * from the broker when in the {@link ClientTelemetryState#SUBSCRIPTION_NEEDED} state. + * If the push operation fails, the client telemetry reporter will attempt to re-fetch the + * subscription information by setting the state back to {@link ClientTelemetryState#SUBSCRIPTION_NEEDED}. + * + * + * In an unlikely scenario,
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1411900986 ## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java: ## @@ -0,0 +1,952 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender; +import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils; +import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricKeyable; +import org.apache.kafka.common.telemetry.internals.MetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricsEmitter; +import org.apache.kafka.common.telemetry.internals.SinglePointMetric; +import org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.StringJoiner; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +/** + * The implementation of the {@link MetricsReporter} for client telemetry which manages the life-cycle + * of the client telemetry collection process. The client telemetry reporter is responsible for + * collecting the client telemetry data and sending it to the broker. + * + * + * The client telemetry reporter is configured with a {@link ClientTelemetrySender} which is + * responsible for sending the client telemetry data to the broker. The client telemetry reporter + * will attempt to fetch the telemetry subscription information from the broker and send the + * telemetry data to the broker based on the subscription information. + * + * + * The full life-cycle of the metric collection process is defined by a state machine in + * {@link ClientTelemetryState}. Each state is associated with a different set of operations. + * For example, the client telemetry reporter will attempt to fetch the telemetry subscription + * from the broker when in the {@link ClientTelemetryState#SUBSCRIPTION_NEEDED} state. + * If the push operation fails, the client telemetry reporter will attempt to re-fetch the + * subscription information by setting the state back to {@link ClientTelemetryState#SUBSCRIPTION_NEEDED}. + * + * + * In an unlikely scenario,
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1411884637 ## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java: ## @@ -0,0 +1,952 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender; +import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils; +import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricKeyable; +import org.apache.kafka.common.telemetry.internals.MetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricsEmitter; +import org.apache.kafka.common.telemetry.internals.SinglePointMetric; +import org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.StringJoiner; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +/** + * The implementation of the {@link MetricsReporter} for client telemetry which manages the life-cycle + * of the client telemetry collection process. The client telemetry reporter is responsible for + * collecting the client telemetry data and sending it to the broker. + * + * + * The client telemetry reporter is configured with a {@link ClientTelemetrySender} which is + * responsible for sending the client telemetry data to the broker. The client telemetry reporter + * will attempt to fetch the telemetry subscription information from the broker and send the + * telemetry data to the broker based on the subscription information. + * + * + * The full life-cycle of the metric collection process is defined by a state machine in + * {@link ClientTelemetryState}. Each state is associated with a different set of operations. + * For example, the client telemetry reporter will attempt to fetch the telemetry subscription + * from the broker when in the {@link ClientTelemetryState#SUBSCRIPTION_NEEDED} state. + * If the push operation fails, the client telemetry reporter will attempt to re-fetch the + * subscription information by setting the state back to {@link ClientTelemetryState#SUBSCRIPTION_NEEDED}. + * + * + * In an unlikely scenario,
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1411883131 ## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java: ## @@ -0,0 +1,952 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender; +import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils; +import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricKeyable; +import org.apache.kafka.common.telemetry.internals.MetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricsEmitter; +import org.apache.kafka.common.telemetry.internals.SinglePointMetric; +import org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.StringJoiner; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +/** + * The implementation of the {@link MetricsReporter} for client telemetry which manages the life-cycle + * of the client telemetry collection process. The client telemetry reporter is responsible for + * collecting the client telemetry data and sending it to the broker. + * + * + * The client telemetry reporter is configured with a {@link ClientTelemetrySender} which is + * responsible for sending the client telemetry data to the broker. The client telemetry reporter + * will attempt to fetch the telemetry subscription information from the broker and send the + * telemetry data to the broker based on the subscription information. + * + * + * The full life-cycle of the metric collection process is defined by a state machine in + * {@link ClientTelemetryState}. Each state is associated with a different set of operations. + * For example, the client telemetry reporter will attempt to fetch the telemetry subscription + * from the broker when in the {@link ClientTelemetryState#SUBSCRIPTION_NEEDED} state. + * If the push operation fails, the client telemetry reporter will attempt to re-fetch the + * subscription information by setting the state back to {@link ClientTelemetryState#SUBSCRIPTION_NEEDED}. + * + * + * In an unlikely scenario,
Re: [PR] KAFKA-15574; [2/N]: Client state machine updates [kafka]
dajac commented on code in PR #14857: URL: https://github.com/apache/kafka/pull/14857#discussion_r1411872217 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -322,16 +314,14 @@ private void onErrorResponse(final ConsumerGroupHeartbeatResponse response, break; case FENCED_MEMBER_EPOCH: -message = String.format("GroupHeartbeatRequest failed because member ID %s with epoch %s is invalid. " + -"Will abandon all partitions and rejoin the group", +message = String.format("GroupHeartbeatRequest failed because member ID %s with epoch %s is invalid.", membershipManager.memberId(), membershipManager.memberEpoch()); logInfo(message, response, currentTimeMs); membershipManager.transitionToFenced(); break; case UNKNOWN_MEMBER_ID: -message = String.format("GroupHeartbeatRequest failed because member of unknown ID %s with epoch %s is invalid. " + -"Will abandon all partitions and rejoin the group", +message = String.format("GroupHeartbeatRequest failed because member of unknown ID %s with epoch %s is invalid.", Review Comment: nit: How about `GroupHeartbeatRequest failed because member id {} is unknown.`? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -322,16 +314,14 @@ private void onErrorResponse(final ConsumerGroupHeartbeatResponse response, break; case FENCED_MEMBER_EPOCH: -message = String.format("GroupHeartbeatRequest failed because member ID %s with epoch %s is invalid. " + -"Will abandon all partitions and rejoin the group", +message = String.format("GroupHeartbeatRequest failed because member ID %s with epoch %s is invalid.", Review Comment: nit: While we are here, how about `GroupHeartbeatRequest failed because epoch {} is fenced.`?. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1411873720 ## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java: ## @@ -0,0 +1,952 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender; +import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils; +import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricKeyable; +import org.apache.kafka.common.telemetry.internals.MetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricsEmitter; +import org.apache.kafka.common.telemetry.internals.SinglePointMetric; +import org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.StringJoiner; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +/** + * The implementation of the {@link MetricsReporter} for client telemetry which manages the life-cycle + * of the client telemetry collection process. The client telemetry reporter is responsible for + * collecting the client telemetry data and sending it to the broker. + * + * + * The client telemetry reporter is configured with a {@link ClientTelemetrySender} which is + * responsible for sending the client telemetry data to the broker. The client telemetry reporter + * will attempt to fetch the telemetry subscription information from the broker and send the + * telemetry data to the broker based on the subscription information. + * + * + * The full life-cycle of the metric collection process is defined by a state machine in + * {@link ClientTelemetryState}. Each state is associated with a different set of operations. + * For example, the client telemetry reporter will attempt to fetch the telemetry subscription + * from the broker when in the {@link ClientTelemetryState#SUBSCRIPTION_NEEDED} state. + * If the push operation fails, the client telemetry reporter will attempt to re-fetch the + * subscription information by setting the state back to {@link ClientTelemetryState#SUBSCRIPTION_NEEDED}. + * + * + * In an unlikely scenario,
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1411866555 ## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java: ## @@ -0,0 +1,952 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender; +import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils; +import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricKeyable; +import org.apache.kafka.common.telemetry.internals.MetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricsEmitter; +import org.apache.kafka.common.telemetry.internals.SinglePointMetric; +import org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.StringJoiner; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +/** + * The implementation of the {@link MetricsReporter} for client telemetry which manages the life-cycle + * of the client telemetry collection process. The client telemetry reporter is responsible for + * collecting the client telemetry data and sending it to the broker. + * + * + * The client telemetry reporter is configured with a {@link ClientTelemetrySender} which is + * responsible for sending the client telemetry data to the broker. The client telemetry reporter + * will attempt to fetch the telemetry subscription information from the broker and send the + * telemetry data to the broker based on the subscription information. + * + * + * The full life-cycle of the metric collection process is defined by a state machine in + * {@link ClientTelemetryState}. Each state is associated with a different set of operations. Review Comment: Not yet, planning to do after 3.7 code freeze. I think you mean the way KIP-848 state transition is publically documented? I have created a jira so can track: https://issues.apache.org/jira/browse/KAFKA-15952 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail:
[jira] [Created] (KAFKA-15952) Create public doc for telemetry state transition
Apoorv Mittal created KAFKA-15952: - Summary: Create public doc for telemetry state transition Key: KAFKA-15952 URL: https://issues.apache.org/jira/browse/KAFKA-15952 Project: Kafka Issue Type: Sub-task Reporter: Apoorv Mittal Assignee: Apoorv Mittal -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1411859695 ## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java: ## @@ -0,0 +1,952 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender; +import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils; +import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricKeyable; +import org.apache.kafka.common.telemetry.internals.MetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricsEmitter; +import org.apache.kafka.common.telemetry.internals.SinglePointMetric; +import org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.StringJoiner; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +/** + * The implementation of the {@link MetricsReporter} for client telemetry which manages the life-cycle + * of the client telemetry collection process. The client telemetry reporter is responsible for + * collecting the client telemetry data and sending it to the broker. + * + * + * The client telemetry reporter is configured with a {@link ClientTelemetrySender} which is + * responsible for sending the client telemetry data to the broker. The client telemetry reporter + * will attempt to fetch the telemetry subscription information from the broker and send the + * telemetry data to the broker based on the subscription information. Review Comment: I have added `i.e. push interval, temporality, compression type, etc.` in comment to make to more specific. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1411855963 ## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryProvider.java: ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.resource.v1.Resource; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ClientTelemetryProvider implements Configurable { + +public static final String DOMAIN = "org.apache.kafka"; +// Client metrics tags +public static final String CLIENT_RACK = "client_rack"; +public static final String GROUP_ID = "group_id"; +public static final String GROUP_INSTANCE_ID = "group_instance_id"; +public static final String GROUP_MEMBER_ID = "group_member_id"; +public static final String TRANSACTIONAL_ID = "transactional_id"; + +private static final String PRODUCER_NAMESPACE = "kafka.producer"; +private static final String CONSUMER_NAMESPACE = "kafka.consumer"; + +private static final Map PRODUCER_CONFIG_MAPPING = new HashMap<>(); +private static final Map CONSUMER_CONFIG_MAPPING = new HashMap<>(); + +private volatile Resource resource = null; +private Map config = null; + +// Mapping of config keys to telemetry keys. Contains only keys which can be fetched from config. +// Config like group_member_id is not present here as it is not fetched from config. +static { +PRODUCER_CONFIG_MAPPING.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, ClientTelemetryProvider.TRANSACTIONAL_ID); + +CONSUMER_CONFIG_MAPPING.put(ConsumerConfig.GROUP_ID_CONFIG, ClientTelemetryProvider.GROUP_ID); +CONSUMER_CONFIG_MAPPING.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, ClientTelemetryProvider.GROUP_INSTANCE_ID); +} + +@Override +public synchronized void configure(Map configs) { +this.config = configs; +} + +/** + * Validate that all the data required for generating correct metrics is present. + * + * @param metricsContext {@link MetricsContext} + * @return false if all the data required for generating correct metrics is missing, true + * otherwise. + */ +public boolean validate(MetricsContext metricsContext) { +return ClientTelemetryUtils.validateRequiredResourceLabels(metricsContext.contextLabels()); +} + +/** + * Sets the metrics tags for the service or library exposing metrics. This will be called before + * {@link org.apache.kafka.common.metrics.MetricsReporter#init(List)} and may be called anytime + * after that. + * + * @param metricsContext {@link MetricsContext} + */ +public synchronized void contextChange(MetricsContext metricsContext) { +final Resource.Builder resourceBuilder = Resource.newBuilder(); + +final String namespace = metricsContext.contextLabels().get(MetricsContext.NAMESPACE); +if (PRODUCER_NAMESPACE.equals(namespace)) { +// Add producer resource labels. +PRODUCER_CONFIG_MAPPING.forEach((configKey, telemetryKey) -> { +if (config.containsKey(configKey)) { +addAttribute(resourceBuilder, telemetryKey, String.valueOf(config.get(configKey))); +} +}); +} else if (CONSUMER_NAMESPACE.equals(namespace)) { +// Add consumer resource labels. +CONSUMER_CONFIG_MAPPING.forEach((configKey, telemetryKey) -> { +if (config.containsKey(configKey)) { +addAttribute(resourceBuilder, telemetryKey, String.valueOf(config.get(configKey))); +} +}); +} + +// Add client rack label. +if
Re: [PR] KAFKA-14987 [2/2]; customize retry backoff for group/offsets expiration [kafka]
dajac commented on code in PR #14870: URL: https://github.com/apache/kafka/pull/14870#discussion_r1411843255 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -620,8 +621,9 @@ public boolean cleanupExpiredOffsets(String groupId, List records) { }); if (!expiredPartitions.isEmpty()) { -log.info("[GroupId {}] Expiring offsets of partitions (allOffsetsExpired={}): {}", -groupId, allOffsetsExpired, String.join(", ", expiredPartitions)); +log.info("[GroupId {}] Expiring {} offsets (allOffsetsExpired={}) in {} milliseconds.", +groupId, expiredPartitions.size(), allOffsetsExpired, time.milliseconds() - startMs); +log.debug("[GroupId {}] Expired partitions: {}", groupId, String.join(", ", expiredPartitions)); Review Comment: Instead of this one, I wonder if we should just add a debug message for every expired offset at L613. What do you think? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -620,8 +621,9 @@ public boolean cleanupExpiredOffsets(String groupId, List records) { }); if (!expiredPartitions.isEmpty()) { -log.info("[GroupId {}] Expiring offsets of partitions (allOffsetsExpired={}): {}", -groupId, allOffsetsExpired, String.join(", ", expiredPartitions)); +log.info("[GroupId {}] Expiring {} offsets (allOffsetsExpired={}) in {} milliseconds.", Review Comment: I don't really get the value of adding the time here. What's the reasoning behind it? However, I think that it would make sense to add a log in `cleanupGroupMetadata` which contains the time taken and the number of records generated. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorTimer.java: ## @@ -52,6 +52,20 @@ interface TimeoutOperation { */ void schedule(String key, long delay, TimeUnit unit, boolean retry, TimeoutOperation operation); +/** + * Add an operation to the timer. If an operation with the same key + * already exists, replace it with the new operation. + * + * @param key The key to identify this operation. + * @param delay The delay to wait before expiring. + * @param unit The delay unit. + * @param retry A boolean indicating whether the operation should + * be retried on failure. + * @param retryBackoff The delay when rescheduled on retry. + * @param operation The operation to perform upon expiration. + */ +void schedule(String key, long delay, TimeUnit unit, boolean retry, long retryBackoff, TimeoutOperation operation); Review Comment: Does it rely on the `unit`? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -281,6 +281,18 @@ public void schedule( TimeUnit unit, boolean retry, TimeoutOperation operation +) { +schedule(key, delay, unit, retry, 500, operation); +} + +@Override +public void schedule( +String key, +long delay, +TimeUnit unit, +boolean retry, +long retryBackoff, Review Comment: What's the unit of this? Does it rely on `unit`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15890: Consumer.poll with long timeout unaware of assigned partitions [kafka]
AndrewJSchofield commented on code in PR #14835: URL: https://github.com/apache/kafka/pull/14835#discussion_r1411833816 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -205,6 +205,26 @@ public void wakeup() { networkClientDelegate.wakeup(); } +/** + * Returns the delay for which the application thread can safely wait before it should be responsive + * to results from the request managers. For example, the subscription state can change when heartbeats + * are sent, so blocking for longer than the heartbeat interval might mean the application thread is not + * responsive to changes. + * + * @return The maximum delay in milliseconds + */ +public long maximumTimeToWait() { +final long currentTimeMs = time.milliseconds(); +if (requestManagers == null) { +return MAX_POLL_TIMEOUT_MS; +} +return requestManagers.entries().stream() +.filter(Optional::isPresent) +.map(Optional::get) +.map(rm -> rm.maximumTimeToWait(currentTimeMs)) +.reduce(Long.MAX_VALUE, Math::min); +} + Review Comment: I'll take this concept and work on it in a follow-on PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14509: [2/N] Implement server side logic for ConsumerGroupDescribe API [kafka]
dajac commented on code in PR #14544: URL: https://github.com/apache/kafka/pull/14544#discussion_r1411829247 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -547,6 +549,69 @@ public CompletableFuture listGroups( return future; } +/** + * See {@link GroupCoordinator#consumerGroupDescribe(RequestContext, List)}. + */ +@Override +public CompletableFuture> consumerGroupDescribe( +RequestContext context, +List groupIds +) { +if (!isActive.get()) { +return CompletableFuture.completedFuture(ConsumerGroupDescribeRequest.getErrorDescribedGroupList( +groupIds, +Errors.COORDINATOR_NOT_AVAILABLE +)); +} + +final List>> futures = +new ArrayList<>(groupIds.size()); +final Map> groupsByTopicPartition = new HashMap<>(); +groupIds.forEach(groupId -> { +// For backwards compatibility, we support DescribeGroups for the empty group id. +if (groupId == null) { Review Comment: Actually, we don't support the empty group id for this new API. We should use `isGroupIdNotEmpty` here. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -547,6 +549,69 @@ public CompletableFuture listGroups( return future; } +/** + * See {@link GroupCoordinator#consumerGroupDescribe(RequestContext, List)}. + */ +@Override +public CompletableFuture> consumerGroupDescribe( +RequestContext context, +List groupIds +) { +if (!isActive.get()) { +return CompletableFuture.completedFuture(ConsumerGroupDescribeRequest.getErrorDescribedGroupList( +groupIds, +Errors.COORDINATOR_NOT_AVAILABLE +)); +} + +final List>> futures = +new ArrayList<>(groupIds.size()); +final Map> groupsByTopicPartition = new HashMap<>(); +groupIds.forEach(groupId -> { +// For backwards compatibility, we support DescribeGroups for the empty group id. +if (groupId == null) { Review Comment: Actually, we don't support the empty group id for this new API. We should use `isGroupIdNotEmpty` here and remove the comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [2/N]: Client state machine updates [kafka]
dajac commented on code in PR #14857: URL: https://github.com/apache/kafka/pull/14857#discussion_r1411824345 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -365,6 +366,16 @@ private void replaceUnresolvedAssignmentWithNewAssignment( */ @Override public void transitionToFenced() { +if (state == MemberState.PREPARE_LEAVING || state == MemberState.LEAVING) { +log.debug("Member {} with epoch {} got fenced but it is already leaving the group " + +"with state {}, so it won't attempt to rejoin.", memberId, memberEpoch, state); +return; +} +if (state == MemberState.UNSUBSCRIBED) { +log.debug("Member {} with epoch {} got fenced but it already left the group, so it " + +"won't attempt to rejoin.", memberId, memberEpoch); +return; +} Review Comment: You're absolutely right. This is the current behaviour in the legacy consumer. I was wondering if the current behaviour is actually correct in the legacy consumer as well. Anyway, we don't have to address this here as it is fine to replicate the current behaviour. However, we should think a little more about this case, I think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14509: [2/2] Implement server side logic for ConsumerGroupDescribe API [kafka]
dajac commented on code in PR #14544: URL: https://github.com/apache/kafka/pull/14544#discussion_r1411799674 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -547,6 +549,69 @@ public CompletableFuture listGroups( return future; } +/** + * See {@link GroupCoordinator#consumerGroupDescribe(RequestContext, List)}. + */ +@Override +public CompletableFuture> consumerGroupDescribe( +RequestContext context, +List groupIds +) { +if (!isActive.get()) { +return CompletableFuture.completedFuture(ConsumerGroupDescribeRequest.getErrorDescribedGroupList( +groupIds, +Errors.COORDINATOR_NOT_AVAILABLE +)); +} + +final List>> futures = +new ArrayList<>(groupIds.size()); +final Map> groupsByTopicPartition = new HashMap<>(); +groupIds.forEach(groupId -> { +// For backwards compatibility, we support DescribeGroups for the empty group id. +if (groupId == null) { + futures.add(CompletableFuture.completedFuture(Collections.singletonList( +new ConsumerGroupDescribeResponseData.DescribedGroup() +.setGroupId(null) +.setErrorCode(Errors.INVALID_GROUP_ID.code()) +.setErrorMessage(Errors.INVALID_GROUP_ID.message()) +))); +} else { +groupsByTopicPartition +.computeIfAbsent(topicPartitionFor(groupId), __ -> new ArrayList<>()) +.add(groupId); +} +}); + +groupsByTopicPartition.forEach((topicPartition, groupList) -> { + CompletableFuture> future = +runtime.scheduleReadOperation( +"consumer-group-describe", +topicPartition, +(coordinator, lastCommittedOffset) -> coordinator.consumerGroupDescribe(groupIds, lastCommittedOffset) +).exceptionally(exception -> { +if (!(exception instanceof KafkaException)) { +log.error("ConsumerGroupDescribe request {} hit an unexpected exception: {}.", +groupList, exception.getMessage()); +} + +return ConsumerGroupDescribeRequest.getErrorDescribedGroupList( +groupList, +Errors.forException(exception) +); +}); + +futures.add(future); +}); + +final CompletableFuture allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); +return allFutures.thenApply(v -> { +final List res = new ArrayList<>(); +futures.forEach(future -> res.addAll(future.join())); +return res; +}); Review Comment: I think that we have more or less the same code elsewhere in this class. If you are interested, we could try to refactor this into an helper method as a follow-up. We can keep it as-is in this pull request though. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -468,6 +468,33 @@ public List listGroups(List statesFi return groupStream.map(group -> group.asListedGroup(committedOffset)).collect(Collectors.toList()); } + +/** + * Handles a ConsumerGroupDescribe request. + * @param groupIds The IDs of the groups to describe. Review Comment: nit: We usually put an empty line between the description and the params. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -547,6 +549,69 @@ public CompletableFuture listGroups( return future; } +/** + * See {@link GroupCoordinator#consumerGroupDescribe(RequestContext, List)}. + */ +@Override +public CompletableFuture> consumerGroupDescribe( +RequestContext context, +List groupIds +) { +if (!isActive.get()) { +return CompletableFuture.completedFuture(ConsumerGroupDescribeRequest.getErrorDescribedGroupList( +groupIds, +Errors.COORDINATOR_NOT_AVAILABLE +)); +} + +final List>> futures = +new ArrayList<>(groupIds.size()); +final Map> groupsByTopicPartition = new HashMap<>(); +groupIds.forEach(groupId -> { +// For backwards compatibility, we support DescribeGroups for the empty group id. +if (groupId == null) { + futures.add(CompletableFuture.completedFuture(Collections.singletonList( +new ConsumerGroupDescribeResponseData.DescribedGroup() +.setGroupId(null) +
Re: [PR] KAFKA-15945 : Flaky test - testSyncTopicConfigs() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest [kafka]
ashwinpankaj commented on code in PR #14893: URL: https://github.com/apache/kafka/pull/14893#discussion_r1411813212 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ## @@ -770,6 +771,9 @@ public void testSyncTopicConfigs() throws InterruptedException { Map> configOps = Collections.singletonMap(configResource, ops); // alter configs on target cluster backup.kafka().incrementalAlterConfigs(configOps); +// wait until the configs are changed +waitForConfigValueChange(backup, backupTopic, "delete.retention.ms", "2000"); +waitForConfigValueChange(backup, backupTopic, "retention.bytes", "2000"); Review Comment: thanks looking into this @atu-sharm ! I wonder if a wait for `retention.bytes` to be equal to 2000 will always succeed. waitForCondition has a default poll interval of 100 ms. If the value is set to 2000 and then changed to 1000 by MirrorMaker within the time the thread sleeps (100ms) , we may miss the value change and timeout. Can't we just wait on the futures returned by incrementalAlterConfigs ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15945 : Flaky test - testSyncTopicConfigs() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest [kafka]
atu-sharm commented on PR #14893: URL: https://github.com/apache/kafka/pull/14893#issuecomment-1835716040 Hi @mimaison @C0urante can you have a look at this once? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15945) Flaky test - testSyncTopicConfigs() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
[ https://issues.apache.org/jira/browse/KAFKA-15945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17791951#comment-17791951 ] Atul Sharma commented on KAFKA-15945: - Raised PR: https://github.com/apache/kafka/pull/14893 > Flaky test - testSyncTopicConfigs() – > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest > - > > Key: KAFKA-15945 > URL: https://issues.apache.org/jira/browse/KAFKA-15945 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.0 >Reporter: Andrew Schofield >Priority: Major > Labels: flaky-test > > Last seen: > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14811/7/tests > Error > org.opentest4j.AssertionFailedError: `delete.retention.ms` should be 2000, > because it's explicitly defined on the target topic! ==> expected: <2000> > but was: <8640> > Stacktrace > org.opentest4j.AssertionFailedError: `delete.retention.ms` should be 2000, > because it's explicitly defined on the target topic! ==> expected: <2000> > but was: <8640> > at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at > app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197) > at > app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) > at > app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1152) > at > app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.lambda$testSyncTopicConfigs$8(MirrorConnectorsIntegrationBaseTest.java:780) > at > app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:331) > at > app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379) > at > app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:328) > at > app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:312) > at > app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:302) > at > app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testSyncTopicConfigs(MirrorConnectorsIntegrationBaseTest.java:774) > at > java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568) > at > app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) > at > app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) > at >
Re: [PR] KAFKA-15276: Implement event plumbing for ConsumerRebalanceListener callbacks [kafka]
dajac commented on PR #14640: URL: https://github.com/apache/kafka/pull/14640#issuecomment-1835695981 @kirktrue Thanks for the PR. Could you please update the description? The current one looks pretty outdated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15695: Update the local log start offset of a log after rebuilding the auxiliary state [kafka]
satishd commented on PR #14649: URL: https://github.com/apache/kafka/pull/14649#issuecomment-1835672440 @nikramakrishnan Can you pull out the integration test from these changes and raise a separate PR resolving the intermittent failure so that we can unblock merging this change with the added UTs in this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Disable FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor [kafka]
dajac merged PR #14876: URL: https://github.com/apache/kafka/pull/14876 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org