[jira] [Assigned] (KAFKA-15955) Migrating ZK brokers send dir assignments

2023-12-01 Thread Proven Provenzano (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Proven Provenzano reassigned KAFKA-15955:
-

Assignee: Proven Provenzano

> Migrating ZK brokers send dir assignments
> -
>
> Key: KAFKA-15955
> URL: https://issues.apache.org/jira/browse/KAFKA-15955
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Igor Soarez
>Assignee: Proven Provenzano
>Priority: Major
>
> Broker in ZooKeeper mode, while in migration mode, should start sending 
> directory assignments to the KRaft Controller using AssignmentsManager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-01 Thread via GitHub


cadonna commented on PR #14879:
URL: https://github.com/apache/kafka/pull/14879#issuecomment-1836362310

   > Can you elaborate on the direction to remove the background queue from the 
'test builder' instead of using the one it constructed?
   
   I had issues with tests using the spy on the `AsyncKafkaConsumer`. More 
precisely, a test failed with the spy but did not fail with a consumer not 
wrapped into a spy. 
   BTW, IMO, spies (or partial mocks) should be used really carefully. 
Actually, good code does not need spies (with a few exceptions). Spies avoid to 
structure the code well. They do not force one to loosely couple objects. Even 
the [Mockito 
documentation](https://javadoc.io/doc/org.mockito/mockito-core/latest/org/mockito/Mockito.html#spy)
 warns against their own spies. Additionally, the code under test, i.e., the 
async Kafka consumer, should not be wrapped into a spy. We should test that 
code directly to avoid possible side effects coming from the wrapping or from 
mistakes in specifying stubs on the spy. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [2/N] New Group Coordinator IT for Static Member Replacement [kafka]

2023-12-01 Thread via GitHub


vamossagar12 commented on code in PR #14882:
URL: https://github.com/apache/kafka/pull/14882#discussion_r1412299630


##
core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala:
##
@@ -137,6 +137,98 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
 assertEquals(-1, consumerGroupHeartbeatResponse.data.memberEpoch)
   }
 
+  @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array(
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def 
testRejoiningStaticMemberGetsAssignmentsBackWhenNewGroupCoordinatorIsEnabled(): 
Unit = {
+val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
+val admin = cluster.createAdminClient()
+val instanceId = "instanceId"
+
+// Creates the __consumer_offsets topics because it won't be created 
automatically
+// in this test because it does not use FindCoordinator API.
+TestUtils.createOffsetsTopicWithAdmin(
+  admin = admin,
+  brokers = 
raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala,
+  controllers = raftCluster.controllerServers().asScala.toSeq
+)
+
+// Create the topic.
+val topicId = TestUtils.createTopicWithAdminRaw(
+  admin = admin,
+  topic = "foo",
+  numPartitions = 3
+)
+
+// Heartbeat request so that a static member joins the group
+var consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
+  new ConsumerGroupHeartbeatRequestData()
+.setGroupId("grp")
+.setInstanceId(instanceId)
+.setMemberEpoch(0)
+.setRebalanceTimeoutMs(5 * 60 * 1000)
+.setSubscribedTopicNames(List("foo").asJava)
+.setTopicPartitions(List.empty.asJava)
+).build()
+
+// This is the expected assignment.
+val expectedAssignment = new 
ConsumerGroupHeartbeatResponseData.Assignment()
+  .setTopicPartitions(List(new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+.setTopicId(topicId)
+.setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
+
+// Send the request until receiving a successful response. There is a delay
+// here because the group coordinator is loaded in the background.
+var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+TestUtils.waitUntilTrue(() => {
+  consumerGroupHeartbeatResponse = 
connectAndReceive(consumerGroupHeartbeatRequest)
+  consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+}, msg = s"Could not join the group successfully and get partitions 
assigned. Last response $consumerGroupHeartbeatResponse.")

Review Comment:
   Got it, yeah that makes sense. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15953: Refactor polling delays [kafka]

2023-12-01 Thread via GitHub


AndrewJSchofield opened a new pull request, #14897:
URL: https://github.com/apache/kafka/pull/14897

   Caches the maximum time to wait in the consumer network thread so the 
application thread is better isolated from the request managers.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Add @Timeout annotation to consumer integration tests [kafka]

2023-12-01 Thread via GitHub


dajac opened a new pull request, #14896:
URL: https://github.com/apache/kafka/pull/14896

   In this 
[buid](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14826/11/pipeline/12/),
 the following test hang forever.
   
   ```
   Gradle Test Run :core:test > Gradle Test Executor 93 > PlaintextConsumerTest 
> testSeek(String, String) > testSeek(String, 
String).quorum=kraft+kip848.groupProtocol=consumer STARTED
   ```
   
   As the new consumer is not extremely stable yet, we should add a Timeout to 
all those integration tests to ensure that builds are not blocked unnecessarily.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15955) Migrating ZK brokers send dir assignments

2023-12-01 Thread Igor Soarez (Jira)
Igor Soarez created KAFKA-15955:
---

 Summary: Migrating ZK brokers send dir assignments
 Key: KAFKA-15955
 URL: https://issues.apache.org/jira/browse/KAFKA-15955
 Project: Kafka
  Issue Type: Sub-task
Reporter: Igor Soarez


Broker in ZooKeeper mode, while in migration mode, should start sending 
directory assignments to the KRaft Controller using AssignmentsManager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15574; [2/N]: Client state machine updates [kafka]

2023-12-01 Thread via GitHub


lianetm commented on PR #14857:
URL: https://github.com/apache/kafka/pull/14857#issuecomment-1836284959

   All test pass locally, I will keep an eye on the build here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [2/N]: Client state machine updates [kafka]

2023-12-01 Thread via GitHub


lianetm commented on PR #14857:
URL: https://github.com/apache/kafka/pull/14857#issuecomment-1836283568

   Done, latest changes merged @dajac, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14509: [2/N] Implement server side logic for ConsumerGroupDescribe API [kafka]

2023-12-01 Thread via GitHub


riedelmax commented on PR #14544:
URL: https://github.com/apache/kafka/pull/14544#issuecomment-1836279426

   Hey @dajac 
   thanks for the review. I addressed all the points except for the ones we 
intent to do a follow up.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [2/N]: Client state machine updates [kafka]

2023-12-01 Thread via GitHub


dajac commented on code in PR #14857:
URL: https://github.com/apache/kafka/pull/14857#discussion_r1412217786


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -365,6 +366,16 @@ private void replaceUnresolvedAssignmentWithNewAssignment(
  */
 @Override
 public void transitionToFenced() {
+if (state == MemberState.PREPARE_LEAVING || state == 
MemberState.LEAVING) {
+log.debug("Member {} with epoch {} got fenced but it is already 
leaving the group " +
+"with state {}, so it won't attempt to rejoin.", memberId, 
memberEpoch, state);
+return;
+}
+if (state == MemberState.UNSUBSCRIBED) {
+log.debug("Member {} with epoch {} got fenced but it already left 
the group, so it " +
+"won't attempt to rejoin.", memberId, memberEpoch);
+return;
+}

Review Comment:
   Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [2/N]: Client state machine updates [kafka]

2023-12-01 Thread via GitHub


dajac commented on PR #14857:
URL: https://github.com/apache/kafka/pull/14857#issuecomment-1836265424

   @lianetm There are conflicts. Could you please rebase or merge trunk?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [2/N]: Client state machine updates [kafka]

2023-12-01 Thread via GitHub


lianetm commented on code in PR #14857:
URL: https://github.com/apache/kafka/pull/14857#discussion_r1412204434


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -365,6 +366,16 @@ private void replaceUnresolvedAssignmentWithNewAssignment(
  */
 @Override
 public void transitionToFenced() {
+if (state == MemberState.PREPARE_LEAVING || state == 
MemberState.LEAVING) {
+log.debug("Member {} with epoch {} got fenced but it is already 
leaving the group " +
+"with state {}, so it won't attempt to rejoin.", memberId, 
memberEpoch, state);
+return;
+}
+if (state == MemberState.UNSUBSCRIBED) {
+log.debug("Member {} with epoch {} got fenced but it already left 
the group, so it " +
+"won't attempt to rejoin.", memberId, memberEpoch);
+return;
+}

Review Comment:
   I see, agree that makes total sense in the close (kind of "terminal" state 
for the consumer), but for the unsubscribe would probably make sense to put a 
little more effort in making sure that the member "properly" leaves the group 
(sending last HB). We would need to think a bit more about the implications so 
I filed [KAFKA-15954](https://issues.apache.org/jira/browse/KAFKA-15954) to 
review that as a follow-up. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [2/N]: Client state machine updates [kafka]

2023-12-01 Thread via GitHub


lianetm commented on code in PR #14857:
URL: https://github.com/apache/kafka/pull/14857#discussion_r1412204434


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -365,6 +366,16 @@ private void replaceUnresolvedAssignmentWithNewAssignment(
  */
 @Override
 public void transitionToFenced() {
+if (state == MemberState.PREPARE_LEAVING || state == 
MemberState.LEAVING) {
+log.debug("Member {} with epoch {} got fenced but it is already 
leaving the group " +
+"with state {}, so it won't attempt to rejoin.", memberId, 
memberEpoch, state);
+return;
+}
+if (state == MemberState.UNSUBSCRIBED) {
+log.debug("Member {} with epoch {} got fenced but it already left 
the group, so it " +
+"won't attempt to rejoin.", memberId, memberEpoch);
+return;
+}

Review Comment:
   I see, agree that makes total sense in the close (kind of "terminal" state 
for the consumer), but for the unsubscribe would probably make sense to put a 
little more effort in making sure that the member "properly" leaves the group 
(sending last HB). We would need to think a bit more about the implications so 
I filed https://issues.apache.org/jira/browse/KAFKA-15954 to review that as a 
follow-up. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15954) Review minimal effort approach on consumer last heartbeat on unsubscribe

2023-12-01 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans reassigned KAFKA-15954:
--

Assignee: Lianet Magrans

> Review minimal effort approach on consumer last heartbeat on unsubscribe
> 
>
> Key: KAFKA-15954
> URL: https://issues.apache.org/jira/browse/KAFKA-15954
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
>
> Currently the legacy and new consumer follows a minimal effort approach when 
> sending a leave group (legacy) or last heartbeat request (new consumer). The 
> request is sent without waiting/handling any response. This behaviour applies 
> when the consumer is being closed or when it unsubscribes.
> For the case when the consumer is being closed, (which is a "terminal" 
> state), it makes sense to just follow a minimal effort approach for 
> "properly" leaving the group. But for the case of unsubscribe, it would maybe 
> make sense to put a little more effort in making sure that the last heartbeat 
> is sent and received by the broker. Note that unsubscribe could a temporary 
> state, where the consumer might want to re-join the group at any time. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15953) Refactor polling delays

2023-12-01 Thread Andrew Schofield (Jira)
Andrew Schofield created KAFKA-15953:


 Summary: Refactor polling delays
 Key: KAFKA-15953
 URL: https://issues.apache.org/jira/browse/KAFKA-15953
 Project: Kafka
  Issue Type: Sub-task
  Components: clients
Affects Versions: 3.7.0
Reporter: Andrew Schofield
Assignee: Andrew Schofield
 Fix For: 3.7.0


This is a follow-on tasks for 
https://issues.apache.org/jira/browse/KAFKA-15890. The idea is to reduce the 
interaction between the application thread and the request managers which was 
introduced in that earlier JIRA's patch.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15954) Review minimal effort approach on consumer last heartbeat on unsubscribe

2023-12-01 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-15954:
--

 Summary: Review minimal effort approach on consumer last heartbeat 
on unsubscribe
 Key: KAFKA-15954
 URL: https://issues.apache.org/jira/browse/KAFKA-15954
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer
Reporter: Lianet Magrans


Currently the legacy and new consumer follows a minimal effort approach when 
sending a leave group (legacy) or last heartbeat request (new consumer). The 
request is sent without waiting/handling any response. This behaviour applies 
when the consumer is being closed or when it unsubscribes.

For the case when the consumer is being closed, (which is a "terminal" state), 
it makes sense to just follow a minimal effort approach for "properly" leaving 
the group. But for the case of unsubscribe, it would maybe make sense to put a 
little more effort in making sure that the last heartbeat is sent and received 
by the broker. Note that unsubscribe could a temporary state, where the 
consumer might want to re-join the group at any time. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15426: Process and persist directory assignments [kafka]

2023-12-01 Thread via GitHub


soarez commented on PR #14863:
URL: https://github.com/apache/kafka/pull/14863#issuecomment-1836233799

   Ready for review. Draft because it depends on 
https://github.com/apache/kafka/pull/14838 so ignore the first commit.
   
   @cmccabe @rondagostino @pprovenzano 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15890) Consumer.poll with long timeout unaware of assigned partitions

2023-12-01 Thread Andrew Schofield (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Schofield resolved KAFKA-15890.
--
Resolution: Fixed

> Consumer.poll with long timeout unaware of assigned partitions
> --
>
> Key: KAFKA-15890
> URL: https://issues.apache.org/jira/browse/KAFKA-15890
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> kip-848-preview
>
> Various problems found testing `kafka-console-consumer.sh` with the new 
> consumer, including NPEs, never-ending reconcilation states and failure to 
> fetch records.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15890: Consumer.poll with long timeout unaware of assigned partitions [kafka]

2023-12-01 Thread via GitHub


lucasbru merged PR #14835:
URL: https://github.com/apache/kafka/pull/14835


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-01 Thread via GitHub


AndrewJSchofield commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1412179127


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -666,12 +766,16 @@ public Map 
committed(final Set

Re: [PR] KAFKA-14133: Migrate ProcessorStateManagerTest and StreamThreadTest to Mockito [kafka]

2023-12-01 Thread via GitHub


clolov commented on PR #13932:
URL: https://github.com/apache/kafka/pull/13932#issuecomment-1836217374

   Heya @cadonna! Many thanks for the review and apologies for the delay. Yes, 
it makes perfect sense, I also came across
   ```
   I=0; while ./gradlew clients:test --tests RequestResponseTest --rerun 
--fail-fast; do (( I=$I+1 )); echo "Completed run: $I"; sleep 1; done
   ```
   from the Kafka README.
   
   Hopefully I have addressed everything!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [2/N]: Client state machine updates [kafka]

2023-12-01 Thread via GitHub


lianetm commented on code in PR #14857:
URL: https://github.com/apache/kafka/pull/14857#discussion_r1412178458


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -322,16 +314,14 @@ private void onErrorResponse(final 
ConsumerGroupHeartbeatResponse response,
 break;
 
 case FENCED_MEMBER_EPOCH:
-message = String.format("GroupHeartbeatRequest failed because 
member ID %s with epoch %s is invalid. " +
-"Will abandon all partitions and rejoin the 
group",
+message = String.format("GroupHeartbeatRequest failed because 
member ID %s with epoch %s is invalid.",

Review Comment:
   Sounds good, (just added the member ID too, so `GroupHeartbeatRequest failed 
for member %s because epoch %s is fenced.`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Migrate ProcessorStateManagerTest and StreamThreadTest to Mockito [kafka]

2023-12-01 Thread via GitHub


clolov commented on code in PR #13932:
URL: https://github.com/apache/kafka/pull/13932#discussion_r1412177600


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##
@@ -3494,32 +3323,27 @@ private StreamThread setUpThread(final Properties 
streamsConfigProps) {
 }
 
 private TaskManager mockTaskManager(final Task runningTask) {
-final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);
+final TaskManager taskManager = mock(TaskManager.class);
 final TaskId taskId = new TaskId(0, 0);
 
-expect(runningTask.state()).andStubReturn(Task.State.RUNNING);
-expect(runningTask.id()).andStubReturn(taskId);
-
expect(taskManager.allOwnedTasks()).andStubReturn(Collections.singletonMap(taskId,
 runningTask));
-
expect(taskManager.commit(Collections.singleton(runningTask))).andStubReturn(1);
+when(runningTask.state()).thenReturn(Task.State.RUNNING);
+
when(taskManager.allOwnedTasks()).thenReturn(Collections.singletonMap(taskId, 
runningTask));
 return taskManager;
 }
 
 private TaskManager mockTaskManagerPurge(final int numberOfPurges) {
 final Task runningTask = mock(Task.class);
 final TaskManager taskManager = mockTaskManager(runningTask);
 
-taskManager.maybePurgeCommittedRecords();
-EasyMock.expectLastCall().times(numberOfPurges);

Review Comment:
   Yes, sorry, I don't know why I left/change it to this, I changed it 
everywhere else to what you have suggested. This has been remedied in the 
latest version of the pull request.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15891) Flaky test: testResetSinkConnectorOffsetsOverriddenConsumerGroupId – org.apache.kafka.connect.integration.OffsetsApiIntegrationTest

2023-12-01 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy updated KAFKA-15891:
---
Labels: flaky-test  (was: )

> Flaky test: testResetSinkConnectorOffsetsOverriddenConsumerGroupId – 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest
> ---
>
> Key: KAFKA-15891
> URL: https://issues.apache.org/jira/browse/KAFKA-15891
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apoorv Mittal
>Priority: Major
>  Labels: flaky-test
>
> h4. Error
> org.opentest4j.AssertionFailedError: Condition not met within timeout 3. 
> Sink connector consumer group offsets should catch up to the topic end 
> offsets ==> expected:  but was: 
> h4. Stacktrace
> org.opentest4j.AssertionFailedError: Condition not met within timeout 3. 
> Sink connector consumer group offsets should catch up to the topic end 
> offsets ==> expected:  but was: 
>  at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>  at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>  at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
>  at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
>  at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210)
>  at 
> app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:331)
>  at 
> app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:328)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:312)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:302)
>  at 
> app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.verifyExpectedSinkConnectorOffsets(OffsetsApiIntegrationTest.java:917)
>  at 
> app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.resetAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:725)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-01 Thread via GitHub


cadonna commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1412171456


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandlerTest.java:
##
@@ -51,91 +61,91 @@ public void tearDown() {
 testBuilder.close();
 }
 
-@Test
-public void testNoEvents() {
-assertTrue(backgroundEventQueue.isEmpty());
-backgroundEventProcessor.process((event, error) -> { });
-assertTrue(backgroundEventQueue.isEmpty());
-}
-
-@Test
-public void testSingleEvent() {
-BackgroundEvent event = new ErrorBackgroundEvent(new 
RuntimeException("A"));
-backgroundEventQueue.add(event);
-assertPeeked(event);
-backgroundEventProcessor.process((e, error) -> { });
-assertTrue(backgroundEventQueue.isEmpty());
-}
-
-@Test
-public void testSingleErrorEvent() {
-KafkaException error = new KafkaException("error");
-BackgroundEvent event = new ErrorBackgroundEvent(error);
-backgroundEventHandler.add(new ErrorBackgroundEvent(error));
-assertPeeked(event);
-assertProcessThrows(error);
-}
-
-@Test
-public void testMultipleEvents() {
-BackgroundEvent event1 = new ErrorBackgroundEvent(new 
RuntimeException("A"));
-backgroundEventQueue.add(event1);
-backgroundEventQueue.add(new ErrorBackgroundEvent(new 
RuntimeException("B")));
-backgroundEventQueue.add(new ErrorBackgroundEvent(new 
RuntimeException("C")));
-
-assertPeeked(event1);
-backgroundEventProcessor.process((event, error) -> { });
-assertTrue(backgroundEventQueue.isEmpty());
-}
-
-@Test
-public void testMultipleErrorEvents() {
-Throwable error1 = new Throwable("error1");
-KafkaException error2 = new KafkaException("error2");
-KafkaException error3 = new KafkaException("error3");
-
-backgroundEventHandler.add(new ErrorBackgroundEvent(error1));
-backgroundEventHandler.add(new ErrorBackgroundEvent(error2));
-backgroundEventHandler.add(new ErrorBackgroundEvent(error3));
-
-assertProcessThrows(new KafkaException(error1));
-}
-
-@Test
-public void testMixedEventsWithErrorEvents() {
-Throwable error1 = new Throwable("error1");
-KafkaException error2 = new KafkaException("error2");
-KafkaException error3 = new KafkaException("error3");
-
-RuntimeException errorToCheck = new RuntimeException("A");
-backgroundEventQueue.add(new ErrorBackgroundEvent(errorToCheck));
-backgroundEventHandler.add(new ErrorBackgroundEvent(error1));
-backgroundEventQueue.add(new ErrorBackgroundEvent(new 
RuntimeException("B")));
-backgroundEventHandler.add(new ErrorBackgroundEvent(error2));
-backgroundEventQueue.add(new ErrorBackgroundEvent(new 
RuntimeException("C")));
-backgroundEventHandler.add(new ErrorBackgroundEvent(error3));
-backgroundEventQueue.add(new ErrorBackgroundEvent(new 
RuntimeException("D")));
-
-assertProcessThrows(new KafkaException(errorToCheck));
-}
-
-private void assertPeeked(BackgroundEvent event) {
-BackgroundEvent peekEvent = backgroundEventQueue.peek();
-assertNotNull(peekEvent);
-assertEquals(event, peekEvent);
-}
-
-private void assertProcessThrows(Throwable error) {
-assertFalse(backgroundEventQueue.isEmpty());
-
-try {
-backgroundEventProcessor.process();
-fail("Should have thrown error: " + error);
-} catch (Throwable t) {
-assertEquals(error.getClass(), t.getClass());
-assertEquals(error.getMessage(), t.getMessage());
-}
-
-assertTrue(backgroundEventQueue.isEmpty());
-}
+//@Test

Review Comment:
   You need to have those commented out code in a draft PR, otherwise GitHub 
let's you not mark it as draft  
   
   Jokes aside, yes they will be removed.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandlerTest.java:
##
@@ -51,91 +61,91 @@ public void tearDown() {
 testBuilder.close();
 }
 
-@Test
-public void testNoEvents() {
-assertTrue(backgroundEventQueue.isEmpty());
-backgroundEventProcessor.process((event, error) -> { });
-assertTrue(backgroundEventQueue.isEmpty());
-}
-
-@Test
-public void testSingleEvent() {
-BackgroundEvent event = new ErrorBackgroundEvent(new 
RuntimeException("A"));
-backgroundEventQueue.add(event);
-assertPeeked(event);
-backgroundEventProcessor.process((e, error) -> { });
-assertTrue(backgroundEventQueue.isEmpty());
-}
-
-@Test
-public void testSingleErrorEvent() {
-KafkaException error = new KafkaException("error");
-BackgroundEvent event = new 

Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-01 Thread via GitHub


cadonna commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1412169763


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1365,7 +1482,11 @@ public KafkaConsumerMetrics kafkaConsumerMetrics() {
 private void maybeThrowFencedInstanceException() {
 if (isFenced) {
 throw new FencedInstanceIdException("Get fenced exception for 
group.instance.id " +

Review Comment:
   Touché  



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1365,7 +1482,11 @@ public KafkaConsumerMetrics kafkaConsumerMetrics() {
 private void maybeThrowFencedInstanceException() {
 if (isFenced) {
 throw new FencedInstanceIdException("Get fenced exception for 
group.instance.id " +

Review Comment:
   Touché  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-01 Thread via GitHub


cadonna commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1412168452


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -666,12 +766,16 @@ public Map 
committed(final Sethttps://github.com/apache/kafka/pull/14872



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-01 Thread via GitHub


AndrewJSchofield commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1412158807


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1365,7 +1482,11 @@ public KafkaConsumerMetrics kafkaConsumerMetrics() {
 private void maybeThrowFencedInstanceException() {
 if (isFenced) {
 throw new FencedInstanceIdException("Get fenced exception for 
group.instance.id " +

Review Comment:
   Please use the `ConsumerConfigs` constant for the `"group.instance.id"`.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition 
topicPartition) {
 }
 }
 
+public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) {
+this.groupMetadata = Optional.of(groupMetadata);
+}
+
 @Override
 public ConsumerGroupMetadata groupMetadata() {
-throw new KafkaException("method not implemented");
+acquireAndEnsureOpen();
+try {
+maybeThrowInvalidGroupIdException();
+backgroundEventProcessor.process();
+return groupMetadata.orElseThrow(
+() -> new IllegalStateException("No group metadata found 
although a valid group ID exists. This is a bug!")

Review Comment:
   You *know* that groupMetadata is present because of the earlier 
`maybeThrowInvalidGroupIdException`. I suppose one pattern would be to return 
an unwrapped `GroupMetadata` from maybeThrowInvalidGroupIdException so that 
you've eliminated the possibility of the `Optional` being empty.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -666,12 +766,16 @@ public Map 
committed(final Set { });
-assertTrue(backgroundEventQueue.isEmpty());
-}
-
-@Test
-public void testSingleEvent() {
-BackgroundEvent event = new ErrorBackgroundEvent(new 
RuntimeException("A"));
-backgroundEventQueue.add(event);
-assertPeeked(event);
-backgroundEventProcessor.process((e, error) -> { });
-assertTrue(backgroundEventQueue.isEmpty());
-}
-
-@Test
-public void testSingleErrorEvent() {
-KafkaException error = new KafkaException("error");
-BackgroundEvent event = new ErrorBackgroundEvent(error);
-backgroundEventHandler.add(new ErrorBackgroundEvent(error));
-assertPeeked(event);
-assertProcessThrows(error);
-}
-
-@Test
-public void testMultipleEvents() {
-BackgroundEvent event1 = new ErrorBackgroundEvent(new 
RuntimeException("A"));
-backgroundEventQueue.add(event1);
-backgroundEventQueue.add(new ErrorBackgroundEvent(new 
RuntimeException("B")));
-backgroundEventQueue.add(new ErrorBackgroundEvent(new 
RuntimeException("C")));
-
-assertPeeked(event1);
-backgroundEventProcessor.process((event, error) -> { });
-assertTrue(backgroundEventQueue.isEmpty());
-}
-
-@Test
-public void testMultipleErrorEvents() {
-Throwable error1 = new Throwable("error1");
-KafkaException error2 = new KafkaException("error2");
-KafkaException error3 = new KafkaException("error3");
-
-backgroundEventHandler.add(new ErrorBackgroundEvent(error1));
-backgroundEventHandler.add(new ErrorBackgroundEvent(error2));
-backgroundEventHandler.add(new ErrorBackgroundEvent(error3));
-
-assertProcessThrows(new KafkaException(error1));
-}
-
-@Test
-public void testMixedEventsWithErrorEvents() {
-Throwable error1 = new Throwable("error1");
-KafkaException error2 = new KafkaException("error2");
-KafkaException error3 = new KafkaException("error3");
-
-RuntimeException errorToCheck = new RuntimeException("A");
-backgroundEventQueue.add(new ErrorBackgroundEvent(errorToCheck));
-backgroundEventHandler.add(new ErrorBackgroundEvent(error1));
-backgroundEventQueue.add(new ErrorBackgroundEvent(new 
RuntimeException("B")));
-backgroundEventHandler.add(new ErrorBackgroundEvent(error2));
-backgroundEventQueue.add(new ErrorBackgroundEvent(new 
RuntimeException("C")));
-backgroundEventHandler.add(new ErrorBackgroundEvent(error3));
-backgroundEventQueue.add(new ErrorBackgroundEvent(new 
RuntimeException("D")));
-
-assertProcessThrows(new KafkaException(errorToCheck));
-}
-
-private void assertPeeked(BackgroundEvent event) {
-BackgroundEvent peekEvent = backgroundEventQueue.peek();
-assertNotNull(peekEvent);
-assertEquals(event, peekEvent);
-}
-
-private void assertProcessThrows(Throwable error) {
-assertFalse(backgroundEventQueue.isEmpty());
-
-try {
-backgroundEventProcessor.process();
-fail("Should have 

Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-01 Thread via GitHub


cadonna commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1412156215


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java:
##
@@ -49,4 +49,9 @@ public void add(BackgroundEvent event) {
 log.trace("Enqueued event: {}", event);
 backgroundEventQueue.add(event);
 }
+
+// Visible for testing
+public Queue backgroundEventQueue() {
+return backgroundEventQueue;
+}

Review Comment:
   I had to take painkillers to stand the pain when I added that method because 
I totally agree with you but I did not find another way. I will reconsider. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] HOTFIX: fix ConsistencyVectorIntegrationTest failure [kafka]

2023-12-01 Thread via GitHub


lucasbru opened a new pull request, #14895:
URL: https://github.com/apache/kafka/pull/14895

   #14735 changed the result for `KeyQuery` from `ValueAndTimestamp` to
   `V`, but forgot to update `ConsistencyVectorIntegrationTest` accordingly.

   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-01 Thread via GitHub


cadonna commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1412153908


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition 
topicPartition) {
 }
 }
 
+public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) {
+this.groupMetadata = Optional.of(groupMetadata);
+}
+
 @Override
 public ConsumerGroupMetadata groupMetadata() {
-throw new KafkaException("method not implemented");
+acquireAndEnsureOpen();
+try {
+maybeThrowInvalidGroupIdException();
+backgroundEventProcessor.process();

Review Comment:
   OK, should then `backgroundEventProcessor.process()` only be called in 
`AsyncKafkaConsumer#poll()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]

2023-12-01 Thread via GitHub


eduwercamacaro commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1412148951


##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -580,6 +582,23 @@ public void setGlobalStateRestoreListener(final 
StateRestoreListener globalState
 }
 }
 
+/**
+ * Set the listener which is triggered whenever a standby task is updated
+ *
+ * @param standbyListener The listener triggered when a standby task is 
updated.
+ * @throws IllegalStateException if this {@code KafkaStreams} instance has 
already been started.
+ */
+public void setStandbyUpdateListener(final StandbyUpdateListener 
standbyListener) {

Review Comment:
   IMO, in the context of this method, `standbyListener` makes more sense than 
`userStandbyListener`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-01 Thread via GitHub


cadonna commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1412142819


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition 
topicPartition) {
 }
 }
 
+public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) {

Review Comment:
   That was a temporary change that I missed to remove.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-01 Thread via GitHub


cadonna commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1412141829


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -398,6 +466,38 @@ public class AsyncKafkaConsumer implements 
ConsumerDelegate {
 requestManagersSupplier);
 }
 
+private Optional initializeGroupMetadata(final 
ConsumerConfig config,
+final 
GroupRebalanceConfig groupRebalanceConfig) {
+final Optional groupMetadata = 
initializeGroupMetadata(
+groupRebalanceConfig.groupId,
+groupRebalanceConfig.groupInstanceId
+);
+if (!groupMetadata.isPresent()) {
+config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
+config.ignore(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
+}
+return groupMetadata;
+}
+
+private Optional initializeGroupMetadata(final 
String groupId,
+final 
Optional groupInstanceId) {
+if (groupId != null) {
+if (groupId.isEmpty()) {
+throwInInvalidGroupIdException();

Review Comment:
   Because the group ID cannot be empty if it is set.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14412: Better Rocks column family management [kafka]

2023-12-01 Thread via GitHub


nicktelford commented on PR #14852:
URL: https://github.com/apache/kafka/pull/14852#issuecomment-1836167995

   Sorry about the compile error. It should now build.
   
   Regarding tests: agreed. When KIP-892 lands, it adds an extra CF, but at 
that point RocksDBStoreTest and RocksDBTimestampedStoreTest will implicitly 
test that too, so I don't think additional tests are required.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14412: Better Rocks column family management [kafka]

2023-12-01 Thread via GitHub


nicktelford commented on code in PR #14852:
URL: https://github.com/apache/kafka/pull/14852#discussion_r1412137956


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java:
##
@@ -64,45 +62,27 @@ public RocksDBTimestampedStore(final String name,
 @Override
 void openRocksDB(final DBOptions dbOptions,
  final ColumnFamilyOptions columnFamilyOptions) {
-final List columnFamilyDescriptors = asList(
-new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions),
-new 
ColumnFamilyDescriptor("keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8),
 columnFamilyOptions));
-final List columnFamilies = new 
ArrayList<>(columnFamilyDescriptors.size());
-
-try {
-db = RocksDB.open(dbOptions, dbDir.getAbsolutePath(), 
columnFamilyDescriptors, columnFamilies);
-setDbAccessor(columnFamilies.get(0), columnFamilies.get(1));
-} catch (final RocksDBException e) {
-if ("Column family not found: 
keyValueWithTimestamp".equals(e.getMessage())) {
-try {
-db = RocksDB.open(dbOptions, dbDir.getAbsolutePath(), 
columnFamilyDescriptors.subList(0, 1), columnFamilies);
-
columnFamilies.add(db.createColumnFamily(columnFamilyDescriptors.get(1)));
-} catch (final RocksDBException fatal) {
-throw new ProcessorStateException("Error opening store " + 
name + " at location " + dbDir.toString(), fatal);
-}
-setDbAccessor(columnFamilies.get(0), columnFamilies.get(1));
-} else {
-throw new ProcessorStateException("Error opening store " + 
name + " at location " + dbDir.toString(), e);
-}
-}
-}
-
-private void setDbAccessor(final ColumnFamilyHandle 
noTimestampColumnFamily,
-   final ColumnFamilyHandle 
withTimestampColumnFamily) {
-final RocksIterator noTimestampsIter = 
db.newIterator(noTimestampColumnFamily);
+final List columnFamilies = openRocksDB(
+dbOptions,
+new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions),
+new 
ColumnFamilyDescriptor("keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8),
 columnFamilyOptions)
+);
+final ColumnFamilyHandle noTimestampCF = columnFamilies.get(0);
+final ColumnFamilyHandle withTimestampCF = columnFamilies.get(1);

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14412: Better Rocks column family management [kafka]

2023-12-01 Thread via GitHub


cadonna commented on code in PR #14852:
URL: https://github.com/apache/kafka/pull/14852#discussion_r1412123554


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java:
##
@@ -64,45 +62,27 @@ public RocksDBTimestampedStore(final String name,
 @Override
 void openRocksDB(final DBOptions dbOptions,
  final ColumnFamilyOptions columnFamilyOptions) {
-final List columnFamilyDescriptors = asList(
-new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions),
-new 
ColumnFamilyDescriptor("keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8),
 columnFamilyOptions));
-final List columnFamilies = new 
ArrayList<>(columnFamilyDescriptors.size());
-
-try {
-db = RocksDB.open(dbOptions, dbDir.getAbsolutePath(), 
columnFamilyDescriptors, columnFamilies);
-setDbAccessor(columnFamilies.get(0), columnFamilies.get(1));
-} catch (final RocksDBException e) {
-if ("Column family not found: 
keyValueWithTimestamp".equals(e.getMessage())) {
-try {
-db = RocksDB.open(dbOptions, dbDir.getAbsolutePath(), 
columnFamilyDescriptors.subList(0, 1), columnFamilies);
-
columnFamilies.add(db.createColumnFamily(columnFamilyDescriptors.get(1)));
-} catch (final RocksDBException fatal) {
-throw new ProcessorStateException("Error opening store " + 
name + " at location " + dbDir.toString(), fatal);
-}
-setDbAccessor(columnFamilies.get(0), columnFamilies.get(1));
-} else {
-throw new ProcessorStateException("Error opening store " + 
name + " at location " + dbDir.toString(), e);
-}
-}
-}
-
-private void setDbAccessor(final ColumnFamilyHandle 
noTimestampColumnFamily,
-   final ColumnFamilyHandle 
withTimestampColumnFamily) {
-final RocksIterator noTimestampsIter = 
db.newIterator(noTimestampColumnFamily);
+final List columnFamilies = openRocksDB(
+dbOptions,
+new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions),
+new 
ColumnFamilyDescriptor("keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8),
 columnFamilyOptions)
+);
+final ColumnFamilyHandle noTimestampCF = columnFamilies.get(0);
+final ColumnFamilyHandle withTimestampCF = columnFamilies.get(1);

Review Comment:
   nit: Could you please spell out `ColumnFamily`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15842) Correct handling of KafkaConsumer.committed for new consumer

2023-12-01 Thread Andrew Schofield (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Schofield resolved KAFKA-15842.
--
Resolution: Fixed

> Correct handling of KafkaConsumer.committed for new consumer
> 
>
> Key: KAFKA-15842
> URL: https://issues.apache.org/jira/browse/KAFKA-15842
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Minor
>  Labels: kip-848-client-support, kip-848-e2e, kip-848-preview
>
> KafkaConsumer.committed throws TimeOutException when there is no response. 
> The new consumer currently returns a null. Changing the new consumer to 
> behave like the old consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14412: Better Rocks column family management [kafka]

2023-12-01 Thread via GitHub


nicktelford commented on PR #14852:
URL: https://github.com/apache/kafka/pull/14852#issuecomment-1836136670

   @cadonna 
   > Do we have unit tests in place that test all this logic?
   
   We don't have any tests dedicated to this, but `RocksDBStoreTest` and 
`RocksDBTimestampedStoreTest` implicitly test both the case that we only have 
the default CF, and the case that we have an extra CF (for the timestamped 
values). These tests fail spectacularly when any of this logic has a bug in it.
   
   Also, most of the integration tests break pretty badly when any of this is 
wrong.
   
   Are there any additional tests you'd like me to add?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15842: Correct handling of KafkaConsumer.committed for new consumer [kafka]

2023-12-01 Thread via GitHub


cadonna merged PR #14859:
URL: https://github.com/apache/kafka/pull/14859


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15842: Correct handling of KafkaConsumer.committed for new consumer [kafka]

2023-12-01 Thread via GitHub


cadonna commented on code in PR #14859:
URL: https://github.com/apache/kafka/pull/14859#discussion_r1412110677


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -663,6 +662,10 @@ public Map 
committed(final Set

Re: [PR] KAFKA-15280: Implement client support for KIP-848 server-side assignors [kafka]

2023-12-01 Thread via GitHub


lucasbru commented on PR #14878:
URL: https://github.com/apache/kafka/pull/14878#issuecomment-1836125795

   @cadonna Could you please review this?
   
   @lianetm This includes a small fix in the reconciliation logic. Please check 
it, nor sure if this is 100% correct. I had trouble getting partitions revoked 
from my consumer in the integration tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]

2023-12-01 Thread via GitHub


eduwercamacaro commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1412102373


##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -175,13 +176,15 @@ public class KafkaStreams implements AutoCloseable {
 private final KafkaClientSupplier clientSupplier;
 protected final TopologyMetadata topologyMetadata;
 private final QueryableStoreProvider queryableStoreProvider;
+private final StandbyUpdateListener delegatingStandbyUpdateListener;
 
 GlobalStreamThread globalStreamThread;
 private KafkaStreams.StateListener stateListener;
 private StateRestoreListener globalStateRestoreListener;
 private boolean oldHandler;
 private BiConsumer streamsUncaughtExceptionHandler;
 private final Object changeThreadCount = new Object();
+private StandbyUpdateListener globalStandbyListener;

Review Comment:
   I agree. Your proposal makes things easier to understand. Thanks! :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-12-01 Thread via GitHub


dajac commented on PR #14570:
URL: https://github.com/apache/kafka/pull/14570#issuecomment-1836106444

   Hey folks. It seems that this PR has introduced failures in trunk. From the 
last build:
   
   ```
   Build / JDK 11 and Scala 2.13 / shouldHaveSamePositionBoundActiveAndStandBy 
– org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest
   13s
   Build / JDK 11 and Scala 2.13 / shouldHaveSamePositionBoundActiveAndStandBy 
– org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest
   5s
   Build / JDK 21 and Scala 2.13 / shouldHaveSamePositionBoundActiveAndStandBy 
– org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest
   47s
   Build / JDK 21 and Scala 2.13 / shouldHaveSamePositionBoundActiveAndStandBy 
– org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest
   12s
   Build / JDK 17 and Scala 2.13 / shouldHaveSamePositionBoundActiveAndStandBy 
– org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest
   17s
   Build / JDK 17 and Scala 2.13 / shouldHaveSamePositionBoundActiveAndStandBy 
– org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest
   15s
   Build / JDK 8 and Scala 2.12 / shouldHaveSamePositionBoundActiveAndStandBy – 
org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest
   50s
   Build / JDK 8 and Scala 2.12 / shouldHaveSamePositionBoundActiveAndStandBy – 
org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest
   ```
   
   Could you please double check?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]

2023-12-01 Thread via GitHub


aliehsaeedii commented on code in PR #14626:
URL: https://github.com/apache/kafka/pull/14626#discussion_r1412093650


##
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.query.ResultOrder;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.streams.state.VersionedRecordIterator;
+import org.rocksdb.Snapshot;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Optional;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class LogicalSegmentIterator implements VersionedRecordIterator {

Review Comment:
   > I remember that we also introduced ManagedIterator interface for versioned 
state store -- would we need to implement it, too?
   
   This is `ManagedKeyValueIterator` which is implementing `KeyValueIterator`. 
I think for this KIP, Keyvalue is not needed.
   
   >Should we extend AbstractIterator to share some common code?
   
   Makes everything more complicated I think.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [2/N] New Group Coordinator IT for Static Member Replacement [kafka]

2023-12-01 Thread via GitHub


dajac commented on code in PR #14882:
URL: https://github.com/apache/kafka/pull/14882#discussion_r1412090130


##
core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala:
##
@@ -137,6 +137,98 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
 assertEquals(-1, consumerGroupHeartbeatResponse.data.memberEpoch)
   }
 
+  @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array(
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def 
testRejoiningStaticMemberGetsAssignmentsBackWhenNewGroupCoordinatorIsEnabled(): 
Unit = {
+val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
+val admin = cluster.createAdminClient()
+val instanceId = "instanceId"
+
+// Creates the __consumer_offsets topics because it won't be created 
automatically
+// in this test because it does not use FindCoordinator API.
+TestUtils.createOffsetsTopicWithAdmin(
+  admin = admin,
+  brokers = 
raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala,
+  controllers = raftCluster.controllerServers().asScala.toSeq
+)
+
+// Create the topic.
+val topicId = TestUtils.createTopicWithAdminRaw(
+  admin = admin,
+  topic = "foo",
+  numPartitions = 3
+)
+
+// Heartbeat request so that a static member joins the group
+var consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
+  new ConsumerGroupHeartbeatRequestData()
+.setGroupId("grp")
+.setInstanceId(instanceId)
+.setMemberEpoch(0)
+.setRebalanceTimeoutMs(5 * 60 * 1000)
+.setSubscribedTopicNames(List("foo").asJava)
+.setTopicPartitions(List.empty.asJava)
+).build()
+
+// This is the expected assignment.
+val expectedAssignment = new 
ConsumerGroupHeartbeatResponseData.Assignment()
+  .setTopicPartitions(List(new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+.setTopicId(topicId)
+.setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
+
+// Send the request until receiving a successful response. There is a delay
+// here because the group coordinator is loaded in the background.
+var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+TestUtils.waitUntilTrue(() => {
+  consumerGroupHeartbeatResponse = 
connectAndReceive(consumerGroupHeartbeatRequest)
+  consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+}, msg = s"Could not join the group successfully and get partitions 
assigned. Last response $consumerGroupHeartbeatResponse.")

Review Comment:
   @vamossagar12 The issue is here. Basically, when the HB is processed by the 
group coordinator, the metadata of topic `foo` are not there yet so the 
assignment is empty. I think that you should HB here until you get the desired 
assignment. Keep in mind that you have to update the memberId and memberEpoch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [2/N] New Group Coordinator IT for Static Member Replacement [kafka]

2023-12-01 Thread via GitHub


dajac commented on code in PR #14882:
URL: https://github.com/apache/kafka/pull/14882#discussion_r1412090389


##
core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala:
##
@@ -137,6 +137,98 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
 assertEquals(-1, consumerGroupHeartbeatResponse.data.memberEpoch)
   }
 
+  @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array(
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def 
testRejoiningStaticMemberGetsAssignmentsBackWhenNewGroupCoordinatorIsEnabled(): 
Unit = {
+val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
+val admin = cluster.createAdminClient()
+val instanceId = "instanceId"
+
+// Creates the __consumer_offsets topics because it won't be created 
automatically
+// in this test because it does not use FindCoordinator API.
+TestUtils.createOffsetsTopicWithAdmin(
+  admin = admin,
+  brokers = 
raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala,
+  controllers = raftCluster.controllerServers().asScala.toSeq
+)
+
+// Create the topic.
+val topicId = TestUtils.createTopicWithAdminRaw(
+  admin = admin,
+  topic = "foo",
+  numPartitions = 3
+)
+
+// Heartbeat request so that a static member joins the group
+var consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
+  new ConsumerGroupHeartbeatRequestData()
+.setGroupId("grp")
+.setInstanceId(instanceId)
+.setMemberEpoch(0)
+.setRebalanceTimeoutMs(5 * 60 * 1000)
+.setSubscribedTopicNames(List("foo").asJava)
+.setTopicPartitions(List.empty.asJava)
+).build()
+
+// This is the expected assignment.
+val expectedAssignment = new 
ConsumerGroupHeartbeatResponseData.Assignment()
+  .setTopicPartitions(List(new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+.setTopicId(topicId)
+.setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
+
+// Send the request until receiving a successful response. There is a delay
+// here because the group coordinator is loaded in the background.
+var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+TestUtils.waitUntilTrue(() => {
+  consumerGroupHeartbeatResponse = 
connectAndReceive(consumerGroupHeartbeatRequest)
+  consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+}, msg = s"Could not join the group successfully and get partitions 
assigned. Last response $consumerGroupHeartbeatResponse.")
+
+// Verify the response.
+assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
+assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
+assertEquals(expectedAssignment, 
consumerGroupHeartbeatResponse.data.assignment)
+
+val oldMemberId = consumerGroupHeartbeatResponse.data.memberId
+
+// Leave the group temporarily
+consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+  new ConsumerGroupHeartbeatRequestData()
+.setGroupId("grp")

Review Comment:
   The member id should be set here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


apoorvmittal10 closed pull request #14724: KAFKA-15663, KAFKA-15794: Telemetry 
reporter and request handling (KIP-714)
URL: https://github.com/apache/kafka/pull/14724


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [2/N] New Group Coordinator IT for Static Member Replacement [kafka]

2023-12-01 Thread via GitHub


vamossagar12 commented on PR #14882:
URL: https://github.com/apache/kafka/pull/14882#issuecomment-1836078586

   oh I see. Didn't realise that, sorry. The fact that I see this on the local 
as well means it's flaky..


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [2/N] New Group Coordinator IT for Static Member Replacement [kafka]

2023-12-01 Thread via GitHub


dajac commented on PR #14882:
URL: https://github.com/apache/kafka/pull/14882#issuecomment-1836072757

   @vamossagar12 Hum... `ERROR Unexpected error handling 
org.apache.kafka.server.AssignmentsManager$DispatchEvent@3909ac4e` is not 
related to the group coordinator.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-12-01 Thread via GitHub


dajac commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1412044990


##
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##
@@ -153,6 +154,15 @@ class CoordinatorLoaderImpl[T](
   }
 
   currentOffset = batch.nextOffset
+  val currentHighWatermark = log.highWatermark
+  if (currentOffset >= currentHighWatermark) {
+coordinator.updateLastWrittenOffset(currentOffset)
+  }
+
+  if (currentHighWatermark > previousHighWatermark) {
+coordinator.updateLastCommittedOffset(currentHighWatermark)
+previousHighWatermark = currentHighWatermark
+  }

Review Comment:
   I think that it works. However, it is worth pointing out that the last 
committed offset could be higher than the last written offset in the state 
machine until records past it are read.



##
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala:
##
@@ -366,6 +370,110 @@ class CoordinatorLoaderImplTest {
 }
   }
 
+  @Test
+  def testUpdateLastWrittenOffsetOnBatchLoaded(): Unit = {
+val tp = new TopicPartition("foo", 0)
+val replicaManager = mock(classOf[ReplicaManager])
+val serde = new StringKeyValueDeserializer
+val log = mock(classOf[UnifiedLog])
+val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+
+TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+  time = Time.SYSTEM,
+  replicaManager = replicaManager,
+  deserializer = serde,
+  loadBufferSize = 1000
+)) { loader =>
+  when(replicaManager.getLog(tp)).thenReturn(Some(log))
+  when(log.logStartOffset).thenReturn(0L)
+  when(log.highWatermark).thenReturn(0L).thenReturn(0L).thenReturn(2L)
+  when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(7L))
+
+  val readResult1 = logReadResult(startOffset = 0, records = Seq(
+new SimpleRecord("k1".getBytes, "v1".getBytes),
+new SimpleRecord("k2".getBytes, "v2".getBytes)
+  ))
+
+  when(log.read(
+startOffset = 0L,
+maxLength = 1000,
+isolation = FetchIsolation.LOG_END,
+minOneMessage = true
+  )).thenReturn(readResult1)
+
+  val readResult2 = logReadResult(startOffset = 2, records = Seq(
+new SimpleRecord("k3".getBytes, "v3".getBytes),
+new SimpleRecord("k4".getBytes, "v4".getBytes),
+new SimpleRecord("k5".getBytes, "v5".getBytes)
+  ))
+
+  when(log.read(
+startOffset = 2L,
+maxLength = 1000,
+isolation = FetchIsolation.LOG_END,
+minOneMessage = true
+  )).thenReturn(readResult2)
+
+  val readResult3 = logReadResult(startOffset = 5, records = Seq(
+new SimpleRecord("k6".getBytes, "v6".getBytes),
+new SimpleRecord("k7".getBytes, "v7".getBytes)
+  ))
+
+  when(log.read(
+startOffset = 5L,
+maxLength = 1000,
+isolation = FetchIsolation.LOG_END,
+minOneMessage = true
+  )).thenReturn(readResult3)
+
+  assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
+
+  verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1"))
+  verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
+  verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3"))
+  verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4"))
+  verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5"))
+  verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k6", "v6"))
+  verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k7", "v7"))
+  verify(coordinator, times(0)).updateLastWrittenOffset(0)
+  verify(coordinator, times(1)).updateLastWrittenOffset(2)
+  verify(coordinator, times(1)).updateLastWrittenOffset(5)
+  verify(coordinator, times(1)).updateLastWrittenOffset(7)
+  verify(coordinator, times(1)).updateLastCommittedOffset(0)
+  verify(coordinator, times(1)).updateLastCommittedOffset(2)
+  verify(coordinator, times(0)).updateLastCommittedOffset(5)
+}
+  }
+
+  @Test
+  def testUpdateLastWrittenOffsetAndUpdateLastCommittedOffsetNoRecordsRead(): 
Unit = {
+val tp = new TopicPartition("foo", 0)
+val replicaManager = mock(classOf[ReplicaManager])
+val serde = new StringKeyValueDeserializer
+val log = mock(classOf[UnifiedLog])
+val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+
+TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+  time = Time.SYSTEM,
+  replicaManager = replicaManager,
+  deserializer = serde,
+  

Re: [PR] KAFKA-14412: Better Rocks column family management [kafka]

2023-12-01 Thread via GitHub


cadonna commented on code in PR #14852:
URL: https://github.com/apache/kafka/pull/14852#discussion_r1412057240


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##
@@ -278,18 +280,74 @@ private void addValueProvidersToMetricsRecorder() {
 
 void openRocksDB(final DBOptions dbOptions,
  final ColumnFamilyOptions columnFamilyOptions) {
-final List columnFamilyDescriptors
-= Collections.singletonList(new 
ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions));
-final List columnFamilies = new 
ArrayList<>(columnFamilyDescriptors.size());
+final List columnFamilies = openRocksDB(
+dbOptions,
+new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions)
+);
+
+dbAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0));
+}
+
+/**
+ * Open RocksDB while automatically creating any requested column families 
that don't yet exist.
+ */
+protected List openRocksDB(final DBOptions dbOptions,
+   final 
ColumnFamilyDescriptor defaultColumnFamilyDescriptor,
+   final 
ColumnFamilyDescriptor... columnFamilyDescriptors) {
+final String absolutePath = dbDir.getAbsolutePath();
+final List extraDescriptors = 
Arrays.asList(columnFamilyDescriptors);
+final List allDescriptors = new ArrayList<>(1 
+ columnFamilyDescriptors.length);
+allDescriptors.add(defaultColumnFamilyDescriptor);
+allDescriptors.addAll(extraDescriptors);
 
 try {
-db = RocksDB.open(dbOptions, dbDir.getAbsolutePath(), 
columnFamilyDescriptors, columnFamilies);
-dbAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0));
+final Options options = new Options(dbOptions, 
defaultColumnFamilyDescriptor.getOptions());
+final List allExisting = 
RocksDB.listColumnFamilies(options, absolutePath);
+
+final List existingDescriptors = 
allDescriptors.stream()
+.filter(descriptor -> descriptor == 
defaultColumnFamilyDescriptor || allExisting.stream().anyMatch(existing -> 
Arrays.equals(existing, descriptor.getName(
+.collect(Collectors.toList());

Review Comment:
   nit:
   Just a proposal for better readability:
   ```suggestion
   final List existingDescriptors = new 
LinkedList<>();
   existingDescriptors.add(defaultColumnFamilyDescriptor);
   existingDescriptors.addAll(allDescriptors.stream()
   .filter(descriptor -> allExisting.stream().anyMatch(existing 
-> Arrays.equals(existing, descriptor.getName(
   .collect(Collectors.toList()
   );
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]

2023-12-01 Thread via GitHub


aliehsaeedii commented on code in PR #14626:
URL: https://github.com/apache/kafka/pull/14626#discussion_r1412048452


##
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.query.ResultOrder;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.streams.state.VersionedRecordIterator;
+import org.rocksdb.Snapshot;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Optional;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class LogicalSegmentIterator implements VersionedRecordIterator {
+protected final ListIterator segmentIterator;
+private final Bytes key;
+private final Long fromTime;
+private final Long toTime;
+private final ResultOrder order;
+protected ListIterator> iterator;
+
+// defined for creating/releasing the snapshot. 
+private LogicalKeyValueSegment snapshotOwner;
+private Snapshot snapshot;
+
+
+
+public LogicalSegmentIterator(final ListIterator 
segmentIterator,
+  final Bytes key,
+  final Long fromTime,
+  final Long toTime,
+  final ResultOrder order) {
+
+this.segmentIterator = segmentIterator;
+this.key = key;
+this.fromTime = fromTime;
+this.toTime = toTime;
+this.iterator = Collections.emptyListIterator();
+this.order = order;
+this.snapshot = null;
+this.snapshotOwner = null;
+}
+
+@Override
+public void close() {
+// user may refuse consuming all returned records, so release the 
snapshot when closing the iterator if it is not released yet!
+releaseSnapshot();
+}
+
+@Override
+public boolean hasNext() {
+return iterator.hasNext() || maybeFillIterator();
+}
+
+@Override
+public Object next() {
+if (hasNext()) {
+return iterator.next();
+}
+return null;
+}
+private boolean maybeFillIterator() {
+
+final List> queryResults = new ArrayList<>();
+while (segmentIterator.hasNext()) {
+final LogicalKeyValueSegment segment = segmentIterator.next();
+
+if (snapshot == null) { // create the snapshot (this will happen 
only one time).
+this.snapshotOwner = segment;
+// take a RocksDB snapshot to return the segments content at 
the query time (in order to guarantee consistency)
+final Lock lock = new ReentrantLock();

Review Comment:
   > because we can get a single snapshot when the iterator is creates
   
   You mean I get the snapshot in `RocksDBVersionedStore` and pass it to the 
`LogicalSegmentIterator`? This way, releasing the snapshot is problematic. How 
to release it? An object of `LogicalKeyValueSegment` must release it. That's 
why I introduced the `snapshotOwner` field.  
   
   > , but use the same snaphot throughout the lifetime of the iterator?
   
   At the moment, I am using the same snapshot throughout the whole lifetime of 
the iterator. As I mentioned in the code comments, taking the snapshot is done 
just once, and it will be released only with explicit closing of the iterator 
or when the iterator is empty.
   
   > all of them have the same physical RocksDB instance under the hood?/
   
   That's true. That's why a random segment, which can be the 
`latestValueStore` or the oldest segment (when order is ascending), creates and 
releases the single snapshot for being used through the whole lifetime of the 
iterator.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please 

Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]

2023-12-01 Thread via GitHub


aliehsaeedii commented on code in PR #14626:
URL: https://github.com/apache/kafka/pull/14626#discussion_r1412048452


##
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.query.ResultOrder;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.streams.state.VersionedRecordIterator;
+import org.rocksdb.Snapshot;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Optional;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class LogicalSegmentIterator implements VersionedRecordIterator {
+protected final ListIterator segmentIterator;
+private final Bytes key;
+private final Long fromTime;
+private final Long toTime;
+private final ResultOrder order;
+protected ListIterator> iterator;
+
+// defined for creating/releasing the snapshot. 
+private LogicalKeyValueSegment snapshotOwner;
+private Snapshot snapshot;
+
+
+
+public LogicalSegmentIterator(final ListIterator 
segmentIterator,
+  final Bytes key,
+  final Long fromTime,
+  final Long toTime,
+  final ResultOrder order) {
+
+this.segmentIterator = segmentIterator;
+this.key = key;
+this.fromTime = fromTime;
+this.toTime = toTime;
+this.iterator = Collections.emptyListIterator();
+this.order = order;
+this.snapshot = null;
+this.snapshotOwner = null;
+}
+
+@Override
+public void close() {
+// user may refuse consuming all returned records, so release the 
snapshot when closing the iterator if it is not released yet!
+releaseSnapshot();
+}
+
+@Override
+public boolean hasNext() {
+return iterator.hasNext() || maybeFillIterator();
+}
+
+@Override
+public Object next() {
+if (hasNext()) {
+return iterator.next();
+}
+return null;
+}
+private boolean maybeFillIterator() {
+
+final List> queryResults = new ArrayList<>();
+while (segmentIterator.hasNext()) {
+final LogicalKeyValueSegment segment = segmentIterator.next();
+
+if (snapshot == null) { // create the snapshot (this will happen 
only one time).
+this.snapshotOwner = segment;
+// take a RocksDB snapshot to return the segments content at 
the query time (in order to guarantee consistency)
+final Lock lock = new ReentrantLock();

Review Comment:
   > because we can get a single snapshot when the iterator is creates
   
   You mean I get the snapshot in `RocksDBVersionedStore` and pass it to the 
`LogicalSegmentIterator`? This way, releasing the snapshot is problematic. How 
to release it? An object of `LogicalKeyValueSegment` must release it. That's 
why I introduced the `snapshotOwner` field.  
   
   > , but use the same snaphot throughout the lifetime of the iterator?
   At the moment, I am using the same snapshot throughout the whole lifetime of 
the iterator. As I mentioned in the code comments, taking the snapshot is done 
just once, and it will be released only with explicit closing of the iterator 
or when the iterator is empty.
   
   



##
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *

Re: [PR] KAFKA-14412: Generalise over RocksDB WriteBatch [kafka]

2023-12-01 Thread via GitHub


nicktelford commented on PR #14853:
URL: https://github.com/apache/kafka/pull/14853#issuecomment-1836027902

   @mjsax @ableegoldman @lucasbru @wcarlson5 @bbejeck @vvcephei @guozhangwang


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add junit properties to display parameterized test names [kafka]

2023-12-01 Thread via GitHub


cadonna commented on code in PR #14687:
URL: https://github.com/apache/kafka/pull/14687#discussion_r1412039372


##
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java:
##
@@ -239,10 +240,14 @@ public static String safeUniqueTestName(final Class 
testClass, final TestName
  * Used by tests migrated to JUnit 5.
  */
 public static String safeUniqueTestName(final Class testClass, final 
TestInfo testInfo) {
-final String displayName = testInfo.getDisplayName();
 final String methodName = 
testInfo.getTestMethod().map(Method::getName).orElse("unknownMethodName");
-final String testName = displayName.contains(methodName) ? methodName 
: methodName + displayName;
-return safeUniqueTestName(testClass, testName);
+// Generate a random uuid without an `-`. The `-` is used in Streams' 
thread name as
+// a separator and some tests rely on this.
+String randomUuid;
+do {
+randomUuid = Uuid.randomUuid().toString();
+} while (randomUuid.contains("-"));

Review Comment:
   Why not just replacing `-` with a different character? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14412: Better Rocks column family management [kafka]

2023-12-01 Thread via GitHub


nicktelford commented on PR #14852:
URL: https://github.com/apache/kafka/pull/14852#issuecomment-1836027702

   @mjsax @ableegoldman @lucasbru @wcarlson5 @bbejeck @vvcephei @guozhangwang 
   I was recommended to tag you all in these Kafka Streams PRs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15140) Improve TopicCommandIntegrationTest to be less flaky

2023-12-01 Thread Owen C.H. Leung (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17792025#comment-17792025
 ] 

Owen C.H. Leung commented on KAFKA-15140:
-

I've created a PR to fix it : 

https://github.com/apache/kafka/pull/14891

> Improve TopicCommandIntegrationTest to be less flaky
> 
>
> Key: KAFKA-15140
> URL: https://issues.apache.org/jira/browse/KAFKA-15140
> Project: Kafka
>  Issue Type: Test
>  Components: unit tests
>Reporter: Divij Vaidya
>Assignee: Lan Ding
>Priority: Minor
>  Labels: flaky-test, newbie
> Fix For: 3.5.1, 3.7.0
>
>
> *This is a good Jira for folks who are new to contributing to Kafka.*
> Tests in TopicCommandIntegrationTest get flaky from time to time. The 
> objective of the task is to make them more robust by doing the following:
> 1. Replace the usage {-}createAndWaitTopic{-}() adminClient.createTopics() 
> method and other places where were are creating a topic (without waiting) 
> with 
> TestUtils.createTopicWithAdmin(). The latter method already contains the 
> functionality to create a topic and wait for metadata to sync up.
> 2. Replace the number 6 at places such as 
> "adminClient.createTopics(
> Collections.singletonList(new NewTopic("foo_bar", 1, 6.toShort)))" with a 
> meaningful constant.
> 3. Add logs if an assertion fails, for example, lines such as "
> assertTrue(rows(0).startsWith(s"\tTopic: $testTopicName"), output)" should 
> have a third argument which prints the actual output printed so that we can 
> observe in the test logs on what was the output when assertion failed.
> 4. Replace occurrences of "\n" with System.lineSeparator() which is platform 
> independent
> 5. We should wait for reassignment to complete whenever we are re-assigning 
> partitions using alterconfig before we call describe to validate it. We could 
> use 
> TestUtils.waitForAllReassignmentsToComplete()
> *Motivation of this task*
> Try to fix the flaky test behaviour such as observed in 
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13924/5/testReport/junit/kafka.admin/TopicCommandIntegrationTest/Build___JDK_11_and_Scala_2_13___testDescribeUnderMinIsrPartitionsMixed_String__quorum_zk/]
>  
> {noformat}
> org.opentest4j.AssertionFailedError: expected:  but was: 
>   at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>   at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>   at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
>   at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
>   at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31)
>   at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:180)
>   at 
> app//kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed(TopicCommandIntegrationTest.scala:794){noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-01 Thread via GitHub


mjsax commented on code in PR #14864:
URL: https://github.com/apache/kafka/pull/14864#discussion_r1412011082


##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -1791,6 +1795,146 @@ protected int processStreamThread(final 
Consumer consumer) {
 return copy.size();
 }
 
+/**
+ * Returns the internal clients' assigned {@code client instance ids}.
+ *
+ * @return The internal clients' assigned instance ids used for metrics 
collection.
+ *
+ * @throws IllegalArgumentException If {@code timeout} is negative.

Review Comment:
   Added this (in alignment to `consumer/producer/admin$clientInstanceId()` -- 
KIP needs to be updated accordingly



##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -1791,6 +1795,146 @@ protected int processStreamThread(final 
Consumer consumer) {
 return copy.size();
 }
 
+/**
+ * Returns the internal clients' assigned {@code client instance ids}.
+ *
+ * @return The internal clients' assigned instance ids used for metrics 
collection.
+ *
+ * @throws IllegalArgumentException If {@code timeout} is negative.
+ * @throws IllegalStateException If {@code KafkaStreams} is not running.
+ * @throws TimeoutException Indicates that a request timed out.
+ * @throws StreamsException For any other error that might occur.
+ */
+public ClientInstanceIds clientInstanceIds(final Duration timeout) {
+if (timeout.isNegative()) {
+throw new IllegalArgumentException("The timeout cannot be 
negative.");
+}
+if (state().hasNotStarted()) {
+throw new IllegalStateException("KafkaStreams has not been 
started, you can retry after calling start().");
+}
+if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+throw new IllegalStateException("KafkaStreams has been stopped (" 
+ state + ").");
+}
+
+final ClientInstanceIdsImpl clientInstanceIds = new 
ClientInstanceIdsImpl();
+
+// (1) fan-out calls to threads
+
+// StreamThread for main/restore consumers and producer(s)
+final Map> consumerFutures = new HashMap<>();
+final Map>>> 
producerFutures = new HashMap<>();
+for (final StreamThread streamThread : threads) {
+
consumerFutures.putAll(streamThread.consumerClientInstanceIds(timeout));
+producerFutures.put(streamThread.getName(), 
streamThread.producersClientInstanceIds(timeout));
+}
+// GlobalThread
+KafkaFuture globalThreadFuture = null;
+if (globalStreamThread != null) {
+globalThreadFuture = 
globalStreamThread.globalConsumerInstanceId(timeout);
+}
+
+// (2) get admin client instance id in a blocking fashion, while 
Stream/GlobalThreads work in parallel
+try {
+
clientInstanceIds.setAdminInstanceId(adminClient.clientInstanceId(timeout));
+} catch (final IllegalStateException telemetryDisabledError) {

Review Comment:
   This is new, base on other PRs from KIP-714 -- 
`adminClient.clientInstanceId` throw is telemetry is disable -- for this case, 
we might not want to throw, but return a "partial" result...



##
streams/src/main/java/org/apache/kafka/streams/internals/ClientInstanceIdsImpl.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.internals;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.streams.ClientInstanceIds;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ClientInstanceIdsImpl implements ClientInstanceIds {
+private final Map consumerInstanceIds = new HashMap<>();
+private final Map producerInstanceIds = new HashMap<>();
+private Uuid adminInstanceId;
+
+public void addConsumerInstanceId(final String key, final Uuid instanceId) 
{
+consumerInstanceIds.put(key, instanceId);
+}
+
+public void addProducerInstanceId(final String key, final Uuid instanceId) 
{
+producerInstanceIds.put(key, instanceId);
+}
+
+

Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]

2023-12-01 Thread via GitHub


soarez commented on PR #14838:
URL: https://github.com/apache/kafka/pull/14838#issuecomment-1836006504

   @cmccabe @rondagostino @pprovenzano please take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [2/N] New Group Coordinator IT for Static Member Replacement [kafka]

2023-12-01 Thread via GitHub


vamossagar12 commented on PR #14882:
URL: https://github.com/apache/kafka/pull/14882#issuecomment-1836000366

   @dajac , I ran the test a few times on my laptop. It passes most of the 
times but sometimes fails with ```[2023-12-01 17:21:23,530] WARN 
[QuorumController id=3000] Performing controller activation. The metadata log 
appears to be empty. Appending 1 bootstrap record(s) in metadata transaction at 
metadata.version 3.7-IV3 from bootstrap source 'testkit'. Setting the ZK 
migration state to NONE since this is a de-novo KRaft cluster. 
(org.apache.kafka.controller.QuorumController:108)
   [2023-12-01 17:21:24,488] ERROR [GroupCoordinator id=0] Execution of 
UpdateImage(tp=__consumer_offsets-0, offset=22) failed due to This is not the 
correct coordinator.. 
(org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime:1018)
   org.apache.kafka.common.errors.NotCoordinatorException: This is not the 
correct coordinator.
   [2023-12-01 17:21:24,628] ERROR Unexpected error handling 
org.apache.kafka.server.AssignmentsManager$DispatchEvent@3909ac4e 
(org.apache.kafka.server.AssignmentsManager:117)
   java.util.concurrent.RejectedExecutionException: The event queue is shutting 
down
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:253)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
at java.lang.Thread.run(Thread.java:750)
   [2023-12-01 17:21:25,077] WARN [NodeToControllerChannelManager id=3000 
name=registration] Attempting to close NetworkClient that has already been 
closed. (org.apache.kafka.clients.NetworkClient:667)
   
   expected: 
 but was: 
   Expected 
:Assignment(topicPartitions=[TopicPartitions(topicId=x5K2iSzBSqmTlJQ8Qfq8sg, 
partitions=[0, 1, 2])])
   Actual   :Assignment(topicPartitions=[])
   
   
   org.opentest4j.AssertionFailedError: expected: 
 but was: 
at 
org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
at 
org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
at 
org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
at 
org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)
at 
org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177)
at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1141)
at 
kafka.server.ConsumerGroupHeartbeatRequestTest.testRejoiningStaticMemberGetsAssignmentsBackWhenNewGroupCoordinatorIsEnabled(ConsumerGroupHeartbeatRequestTest.scala:193)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at 
org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:45)
at 
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:94)
at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218)
at 

Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-01 Thread via GitHub


mjsax commented on code in PR #14864:
URL: https://github.com/apache/kafka/pull/14864#discussion_r1412009752


##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -1791,6 +1794,74 @@ protected int processStreamThread(final 
Consumer consumer) {
 return copy.size();
 }
 
+/**
+ * Returns the internal clients' assigned {@code client instance ids}.
+ *
+ * @return the internal clients' assigned instance ids used for metrics 
collection.
+ *
+ * @throws IllegalStateException If {@code KafkaStreams} is not running.
+ * @throws TimeoutException Indicates that a request timed out.
+ */
+public ClientInstanceIds clientInstanceIds(final Duration timeout) {
+if (state().hasNotStarted()) {
+throw new IllegalStateException("KafkaStreams has not been 
started, you can retry after calling start().");
+}
+if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+throw new IllegalStateException("KafkaStreams has been stopped (" 
+ state + ").");
+}
+
+final ClientInstanceIdsImpl clientInstanceIds = new 
ClientInstanceIdsImpl();
+
+final Map> streamThreadFutures = new 
HashMap<>();
+for (final StreamThread streamThread : threads) {
+
streamThreadFutures.putAll(streamThread.clientInstanceIds(timeout));
+}
+
+KafkaFuture globalThreadFuture = null;
+if (globalStreamThread != null) {
+globalThreadFuture = 
globalStreamThread.globalConsumerInstanceId(timeout);
+}
+
+try {
+
clientInstanceIds.setAdminInstanceId(adminClient.clientInstanceId(timeout));
+} catch (final TimeoutException timeoutException) {
+log.warn("Could not get admin client-instance-id due to timeout.");
+}
+
+for (final Map.Entry> streamThreadFuture : 
streamThreadFutures.entrySet()) {
+try {
+clientInstanceIds.addConsumerInstanceId(
+streamThreadFuture.getKey(),
+streamThreadFuture.getValue().get()
+);
+} catch (final ExecutionException exception) {
+if (exception.getCause() instanceof TimeoutException) {
+log.warn("Could not get global consumer client-instance-id 
due to timeout.");
+} else {
+log.error("Could not get global consumer 
client-instance-id", exception);
+}
+} catch (final InterruptedException error) {
+log.error("Could not get global consumer client-instance-id", 
error);
+}
+}
+
+if (globalThreadFuture != null) {
+try {
+
clientInstanceIds.addConsumerInstanceId(globalStreamThread.getName(), 
globalThreadFuture.get());
+} catch (final ExecutionException exception) {
+if (exception.getCause() instanceof TimeoutException) {
+log.warn("Could not get global consumer client-instance-id 
due to timeout.");
+} else {
+log.error("Could not get global consumer 
client-instance-id", exception);
+}
+} catch (final InterruptedException error) {
+log.error("Could not get global consumer client-instance-id", 
error);
+}
+}
+
+return clientInstanceIds;

Review Comment:
   Does it buy us much? -- I actually would like to prefer somewhat better 
error handing, and better error messages. Did a larger rewrite of this. Let me 
know what you think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


apoorvmittal10 commented on PR #14724:
URL: https://github.com/apache/kafka/pull/14724#issuecomment-1835985666

   > Hi @apoorvmittal10 thanks for writing this PR - also sorry for the delay 
but I made the first pass and left some stylistic comments.
   
   Hi @philipnee Thanks for the review. I have addressed the comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]

2023-12-01 Thread via GitHub


aliehsaeedii commented on code in PR #14626:
URL: https://github.com/apache/kafka/pull/14626#discussion_r1411969845


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##
@@ -266,75 +269,28 @@ public VersionedRecord get(final Bytes key, final 
long asOfTimestamp) {
 return null;
 }
 
-public VersionedRecordIterator get(final Bytes key, final long 
fromTimestamp, final long toTimestamp, final boolean isAscending) {
-
-Objects.requireNonNull(key, "key cannot be null");
+@SuppressWarnings("unchecked")
+protected VersionedRecordIterator get(final Bytes key, final long 
fromTimestamp, final long toTimestamp, final ResultOrder order) {

Review Comment:
   > Can we make it `private` (or package private)?
   
   yes `package-private` is possible. `private` is not possible caz in 
`StoreQueryUtils`, we call it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]

2023-12-01 Thread via GitHub


aliehsaeedii commented on code in PR #14626:
URL: https://github.com/apache/kafka/pull/14626#discussion_r1411961568


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##
@@ -266,75 +269,28 @@ public VersionedRecord get(final Bytes key, final 
long asOfTimestamp) {
 return null;
 }
 
-public VersionedRecordIterator get(final Bytes key, final long 
fromTimestamp, final long toTimestamp, final boolean isAscending) {
-
-Objects.requireNonNull(key, "key cannot be null");
+@SuppressWarnings("unchecked")

Review Comment:
   > Why do we need this suppression?
   
   Because of the following warning when creating a `LogicalSegmentIterator` 
object ([line# 
280](https://github.com/aliehsaeedii/kafka/blob/30f8d197319359074924b25041f63387484dbc08/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java#L280),
 and [line# 
293](https://github.com/aliehsaeedii/kafka/blob/30f8d197319359074924b25041f63387484dbc08/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java#L293))
   `Unchecked assignment: 
'org.apache.kafka.streams.state.internals.LogicalSegmentIterator' to 
'org.apache.kafka.streams.state.VersionedRecordIterator' `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


apoorvmittal10 commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1411951695


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java:
##
@@ -0,0 +1,952 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
+import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricKeyable;
+import org.apache.kafka.common.telemetry.internals.MetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricsEmitter;
+import org.apache.kafka.common.telemetry.internals.SinglePointMetric;
+import 
org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+/**
+ * The implementation of the {@link MetricsReporter} for client telemetry 
which manages the life-cycle
+ * of the client telemetry collection process. The client telemetry reporter 
is responsible for
+ * collecting the client telemetry data and sending it to the broker.
+ * 
+ *
+ * The client telemetry reporter is configured with a {@link 
ClientTelemetrySender} which is
+ * responsible for sending the client telemetry data to the broker. The client 
telemetry reporter
+ * will attempt to fetch the telemetry subscription information from the 
broker and send the
+ * telemetry data to the broker based on the subscription information.
+ * 
+ *
+ * The full life-cycle of the metric collection process is defined by a state 
machine in
+ * {@link ClientTelemetryState}. Each state is associated with a different set 
of operations.
+ * For example, the client telemetry reporter will attempt to fetch the 
telemetry subscription
+ * from the broker when in the {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED} state.
+ * If the push operation fails, the client telemetry reporter will attempt to 
re-fetch the
+ * subscription information by setting the state back to {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED}.
+ * 
+ *
+ * In an unlikely scenario, 

Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]

2023-12-01 Thread via GitHub


aliehsaeedii commented on code in PR #14626:
URL: https://github.com/apache/kafka/pull/14626#discussion_r1411947974


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##
@@ -136,11 +139,11 @@ public long put(final Bytes key, final byte[] value, 
final long timestamp) {
 observedStreamTime = Math.max(observedStreamTime, timestamp);
 
 final long foundTs = doPut(
-versionedStoreClient,
-observedStreamTime,
-key,
-value,
-timestamp
+versionedStoreClient,

Review Comment:
   > nit: avoid unnecessary reformation / intention changes -- similar below.
   > 
   > 80% of diff in this file is just adding/removing spaces... very noisy
   
   Sorry, resolving the conflicts made a total mess!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix flaky test RefreshingHttpsJwksTest.testBasicScheduleRefresh [kafka]

2023-12-01 Thread via GitHub


ashwinpankaj commented on code in PR #14888:
URL: https://github.com/apache/kafka/pull/14888#discussion_r1411942462


##
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java:
##
@@ -62,14 +62,14 @@ public class RefreshingHttpsJwksTest extends 
OAuthBearerTest {
 @Test
 public void testBasicScheduleRefresh() throws Exception {
 String keyId = "abc123";
-Time time = new MockTime();
+MockTime time = new MockTime();
 HttpsJwks httpsJwks = spyHttpsJwks();
 
-try (RefreshingHttpsJwks refreshingHttpsJwks = 
getRefreshingHttpsJwks(time, httpsJwks)) {
+try (RefreshingHttpsJwks refreshingHttpsJwks = 
getRefreshingHttpsJwks(time, httpsJwks, mockExecutorService(time))) {

Review Comment:
   ```suggestion
  // we use mocktime here to ensure that scheduled refresh _doesn't_ 
run and update the invocation count
  // we expect httpsJwks.refresh() to be invoked twice, once from 
init() and maybeExpediteRefresh() each
   try (RefreshingHttpsJwks refreshingHttpsJwks = 
getRefreshingHttpsJwks(time, httpsJwks, mockExecutorService(time))) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15831: KIP-1000 protocol and admin client [kafka]

2023-12-01 Thread via GitHub


AndrewJSchofield opened a new pull request, #14894:
URL: https://github.com/apache/kafka/pull/14894

   This adds the new ListClientMetricsResources RPC to the Kafka protocol and 
puts support
   into the Kafka admin client. The broker-side implementation in this PR is 
just to return an empty
   list. A future PR will obtain the list from the config store.
   
   Includes a few unit tests for what is a very simple RPC. There are 
additional tests already written and
   waiting for the PR that delivers the kafka-client-metrics.sh tool which 
builds on this PR.
   
   This PR supersedes https://github.com/apache/kafka/pull/14811.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix flaky test RefreshingHttpsJwksTest.testBasicScheduleRefresh [kafka]

2023-12-01 Thread via GitHub


ashwinpankaj commented on code in PR #14888:
URL: https://github.com/apache/kafka/pull/14888#discussion_r1411930564


##
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java:
##
@@ -62,14 +62,14 @@ public class RefreshingHttpsJwksTest extends 
OAuthBearerTest {
 @Test
 public void testBasicScheduleRefresh() throws Exception {
 String keyId = "abc123";
-Time time = new MockTime();
+MockTime time = new MockTime();
 HttpsJwks httpsJwks = spyHttpsJwks();
 
-try (RefreshingHttpsJwks refreshingHttpsJwks = 
getRefreshingHttpsJwks(time, httpsJwks)) {
+try (RefreshingHttpsJwks refreshingHttpsJwks = 
getRefreshingHttpsJwks(time, httpsJwks, mockExecutorService(time))) {
 refreshingHttpsJwks.init();
 verify(httpsJwks, times(1)).refresh();
 assertTrue(refreshingHttpsJwks.maybeExpediteRefresh(keyId));
-verify(httpsJwks, times(1)).refresh();
+verify(httpsJwks, times(2)).refresh();

Review Comment:
   Thanks @splett2 - so the assertion would fail whenever maybeExpediteRefresh 
actually ran !!
   nit: do you want to add a comment mentioning that mocktime is used to ensure 
that scheduled refresh doesn't run even once, thereby changing the invocation 
count ?
   
   LGTM



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix flaky test RefreshingHttpsJwksTest.testBasicScheduleRefresh [kafka]

2023-12-01 Thread via GitHub


ashwinpankaj commented on code in PR #14888:
URL: https://github.com/apache/kafka/pull/14888#discussion_r1411930564


##
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java:
##
@@ -62,14 +62,14 @@ public class RefreshingHttpsJwksTest extends 
OAuthBearerTest {
 @Test
 public void testBasicScheduleRefresh() throws Exception {
 String keyId = "abc123";
-Time time = new MockTime();
+MockTime time = new MockTime();
 HttpsJwks httpsJwks = spyHttpsJwks();
 
-try (RefreshingHttpsJwks refreshingHttpsJwks = 
getRefreshingHttpsJwks(time, httpsJwks)) {
+try (RefreshingHttpsJwks refreshingHttpsJwks = 
getRefreshingHttpsJwks(time, httpsJwks, mockExecutorService(time))) {
 refreshingHttpsJwks.init();
 verify(httpsJwks, times(1)).refresh();
 assertTrue(refreshingHttpsJwks.maybeExpediteRefresh(keyId));
-verify(httpsJwks, times(1)).refresh();
+verify(httpsJwks, times(2)).refresh();

Review Comment:
   Thanks @splett2 - so the assertion would fail whenever maybeExpediteRefresh 
actually ran ! LGTM



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


apoorvmittal10 commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1411905307


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java:
##
@@ -0,0 +1,952 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
+import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricKeyable;
+import org.apache.kafka.common.telemetry.internals.MetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricsEmitter;
+import org.apache.kafka.common.telemetry.internals.SinglePointMetric;
+import 
org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+/**
+ * The implementation of the {@link MetricsReporter} for client telemetry 
which manages the life-cycle
+ * of the client telemetry collection process. The client telemetry reporter 
is responsible for
+ * collecting the client telemetry data and sending it to the broker.
+ * 
+ *
+ * The client telemetry reporter is configured with a {@link 
ClientTelemetrySender} which is
+ * responsible for sending the client telemetry data to the broker. The client 
telemetry reporter
+ * will attempt to fetch the telemetry subscription information from the 
broker and send the
+ * telemetry data to the broker based on the subscription information.
+ * 
+ *
+ * The full life-cycle of the metric collection process is defined by a state 
machine in
+ * {@link ClientTelemetryState}. Each state is associated with a different set 
of operations.
+ * For example, the client telemetry reporter will attempt to fetch the 
telemetry subscription
+ * from the broker when in the {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED} state.
+ * If the push operation fails, the client telemetry reporter will attempt to 
re-fetch the
+ * subscription information by setting the state back to {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED}.
+ * 
+ *
+ * In an unlikely scenario, 

Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


apoorvmittal10 commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1411901649


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java:
##
@@ -0,0 +1,952 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
+import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricKeyable;
+import org.apache.kafka.common.telemetry.internals.MetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricsEmitter;
+import org.apache.kafka.common.telemetry.internals.SinglePointMetric;
+import 
org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+/**
+ * The implementation of the {@link MetricsReporter} for client telemetry 
which manages the life-cycle
+ * of the client telemetry collection process. The client telemetry reporter 
is responsible for
+ * collecting the client telemetry data and sending it to the broker.
+ * 
+ *
+ * The client telemetry reporter is configured with a {@link 
ClientTelemetrySender} which is
+ * responsible for sending the client telemetry data to the broker. The client 
telemetry reporter
+ * will attempt to fetch the telemetry subscription information from the 
broker and send the
+ * telemetry data to the broker based on the subscription information.
+ * 
+ *
+ * The full life-cycle of the metric collection process is defined by a state 
machine in
+ * {@link ClientTelemetryState}. Each state is associated with a different set 
of operations.
+ * For example, the client telemetry reporter will attempt to fetch the 
telemetry subscription
+ * from the broker when in the {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED} state.
+ * If the push operation fails, the client telemetry reporter will attempt to 
re-fetch the
+ * subscription information by setting the state back to {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED}.
+ * 
+ *
+ * In an unlikely scenario, 

Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


apoorvmittal10 commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1411900986


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java:
##
@@ -0,0 +1,952 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
+import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricKeyable;
+import org.apache.kafka.common.telemetry.internals.MetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricsEmitter;
+import org.apache.kafka.common.telemetry.internals.SinglePointMetric;
+import 
org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+/**
+ * The implementation of the {@link MetricsReporter} for client telemetry 
which manages the life-cycle
+ * of the client telemetry collection process. The client telemetry reporter 
is responsible for
+ * collecting the client telemetry data and sending it to the broker.
+ * 
+ *
+ * The client telemetry reporter is configured with a {@link 
ClientTelemetrySender} which is
+ * responsible for sending the client telemetry data to the broker. The client 
telemetry reporter
+ * will attempt to fetch the telemetry subscription information from the 
broker and send the
+ * telemetry data to the broker based on the subscription information.
+ * 
+ *
+ * The full life-cycle of the metric collection process is defined by a state 
machine in
+ * {@link ClientTelemetryState}. Each state is associated with a different set 
of operations.
+ * For example, the client telemetry reporter will attempt to fetch the 
telemetry subscription
+ * from the broker when in the {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED} state.
+ * If the push operation fails, the client telemetry reporter will attempt to 
re-fetch the
+ * subscription information by setting the state back to {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED}.
+ * 
+ *
+ * In an unlikely scenario, 

Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


apoorvmittal10 commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1411884637


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java:
##
@@ -0,0 +1,952 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
+import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricKeyable;
+import org.apache.kafka.common.telemetry.internals.MetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricsEmitter;
+import org.apache.kafka.common.telemetry.internals.SinglePointMetric;
+import 
org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+/**
+ * The implementation of the {@link MetricsReporter} for client telemetry 
which manages the life-cycle
+ * of the client telemetry collection process. The client telemetry reporter 
is responsible for
+ * collecting the client telemetry data and sending it to the broker.
+ * 
+ *
+ * The client telemetry reporter is configured with a {@link 
ClientTelemetrySender} which is
+ * responsible for sending the client telemetry data to the broker. The client 
telemetry reporter
+ * will attempt to fetch the telemetry subscription information from the 
broker and send the
+ * telemetry data to the broker based on the subscription information.
+ * 
+ *
+ * The full life-cycle of the metric collection process is defined by a state 
machine in
+ * {@link ClientTelemetryState}. Each state is associated with a different set 
of operations.
+ * For example, the client telemetry reporter will attempt to fetch the 
telemetry subscription
+ * from the broker when in the {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED} state.
+ * If the push operation fails, the client telemetry reporter will attempt to 
re-fetch the
+ * subscription information by setting the state back to {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED}.
+ * 
+ *
+ * In an unlikely scenario, 

Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


apoorvmittal10 commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1411883131


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java:
##
@@ -0,0 +1,952 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
+import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricKeyable;
+import org.apache.kafka.common.telemetry.internals.MetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricsEmitter;
+import org.apache.kafka.common.telemetry.internals.SinglePointMetric;
+import 
org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+/**
+ * The implementation of the {@link MetricsReporter} for client telemetry 
which manages the life-cycle
+ * of the client telemetry collection process. The client telemetry reporter 
is responsible for
+ * collecting the client telemetry data and sending it to the broker.
+ * 
+ *
+ * The client telemetry reporter is configured with a {@link 
ClientTelemetrySender} which is
+ * responsible for sending the client telemetry data to the broker. The client 
telemetry reporter
+ * will attempt to fetch the telemetry subscription information from the 
broker and send the
+ * telemetry data to the broker based on the subscription information.
+ * 
+ *
+ * The full life-cycle of the metric collection process is defined by a state 
machine in
+ * {@link ClientTelemetryState}. Each state is associated with a different set 
of operations.
+ * For example, the client telemetry reporter will attempt to fetch the 
telemetry subscription
+ * from the broker when in the {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED} state.
+ * If the push operation fails, the client telemetry reporter will attempt to 
re-fetch the
+ * subscription information by setting the state back to {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED}.
+ * 
+ *
+ * In an unlikely scenario, 

Re: [PR] KAFKA-15574; [2/N]: Client state machine updates [kafka]

2023-12-01 Thread via GitHub


dajac commented on code in PR #14857:
URL: https://github.com/apache/kafka/pull/14857#discussion_r1411872217


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -322,16 +314,14 @@ private void onErrorResponse(final 
ConsumerGroupHeartbeatResponse response,
 break;
 
 case FENCED_MEMBER_EPOCH:
-message = String.format("GroupHeartbeatRequest failed because 
member ID %s with epoch %s is invalid. " +
-"Will abandon all partitions and rejoin the 
group",
+message = String.format("GroupHeartbeatRequest failed because 
member ID %s with epoch %s is invalid.",
 membershipManager.memberId(), 
membershipManager.memberEpoch());
 logInfo(message, response, currentTimeMs);
 membershipManager.transitionToFenced();
 break;
 
 case UNKNOWN_MEMBER_ID:
-message = String.format("GroupHeartbeatRequest failed because 
member of unknown ID %s with epoch %s is invalid. " +
-"Will abandon all partitions and rejoin the 
group",
+message = String.format("GroupHeartbeatRequest failed because 
member of unknown ID %s with epoch %s is invalid.",

Review Comment:
   nit: How about `GroupHeartbeatRequest failed because member id {} is 
unknown.`?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -322,16 +314,14 @@ private void onErrorResponse(final 
ConsumerGroupHeartbeatResponse response,
 break;
 
 case FENCED_MEMBER_EPOCH:
-message = String.format("GroupHeartbeatRequest failed because 
member ID %s with epoch %s is invalid. " +
-"Will abandon all partitions and rejoin the 
group",
+message = String.format("GroupHeartbeatRequest failed because 
member ID %s with epoch %s is invalid.",

Review Comment:
   nit: While we are here, how about `GroupHeartbeatRequest failed because 
epoch {} is fenced.`?.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


apoorvmittal10 commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1411873720


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java:
##
@@ -0,0 +1,952 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
+import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricKeyable;
+import org.apache.kafka.common.telemetry.internals.MetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricsEmitter;
+import org.apache.kafka.common.telemetry.internals.SinglePointMetric;
+import 
org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+/**
+ * The implementation of the {@link MetricsReporter} for client telemetry 
which manages the life-cycle
+ * of the client telemetry collection process. The client telemetry reporter 
is responsible for
+ * collecting the client telemetry data and sending it to the broker.
+ * 
+ *
+ * The client telemetry reporter is configured with a {@link 
ClientTelemetrySender} which is
+ * responsible for sending the client telemetry data to the broker. The client 
telemetry reporter
+ * will attempt to fetch the telemetry subscription information from the 
broker and send the
+ * telemetry data to the broker based on the subscription information.
+ * 
+ *
+ * The full life-cycle of the metric collection process is defined by a state 
machine in
+ * {@link ClientTelemetryState}. Each state is associated with a different set 
of operations.
+ * For example, the client telemetry reporter will attempt to fetch the 
telemetry subscription
+ * from the broker when in the {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED} state.
+ * If the push operation fails, the client telemetry reporter will attempt to 
re-fetch the
+ * subscription information by setting the state back to {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED}.
+ * 
+ *
+ * In an unlikely scenario, 

Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


apoorvmittal10 commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1411866555


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java:
##
@@ -0,0 +1,952 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
+import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricKeyable;
+import org.apache.kafka.common.telemetry.internals.MetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricsEmitter;
+import org.apache.kafka.common.telemetry.internals.SinglePointMetric;
+import 
org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+/**
+ * The implementation of the {@link MetricsReporter} for client telemetry 
which manages the life-cycle
+ * of the client telemetry collection process. The client telemetry reporter 
is responsible for
+ * collecting the client telemetry data and sending it to the broker.
+ * 
+ *
+ * The client telemetry reporter is configured with a {@link 
ClientTelemetrySender} which is
+ * responsible for sending the client telemetry data to the broker. The client 
telemetry reporter
+ * will attempt to fetch the telemetry subscription information from the 
broker and send the
+ * telemetry data to the broker based on the subscription information.
+ * 
+ *
+ * The full life-cycle of the metric collection process is defined by a state 
machine in
+ * {@link ClientTelemetryState}. Each state is associated with a different set 
of operations.

Review Comment:
   Not yet, planning to do after 3.7 code freeze. I think you mean the way 
KIP-848 state transition is publically documented? I have created a jira so can 
track: https://issues.apache.org/jira/browse/KAFKA-15952



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 

[jira] [Created] (KAFKA-15952) Create public doc for telemetry state transition

2023-12-01 Thread Apoorv Mittal (Jira)
Apoorv Mittal created KAFKA-15952:
-

 Summary: Create public doc for telemetry state transition
 Key: KAFKA-15952
 URL: https://issues.apache.org/jira/browse/KAFKA-15952
 Project: Kafka
  Issue Type: Sub-task
Reporter: Apoorv Mittal
Assignee: Apoorv Mittal






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


apoorvmittal10 commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1411859695


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java:
##
@@ -0,0 +1,952 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
+import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricKeyable;
+import org.apache.kafka.common.telemetry.internals.MetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricsEmitter;
+import org.apache.kafka.common.telemetry.internals.SinglePointMetric;
+import 
org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+/**
+ * The implementation of the {@link MetricsReporter} for client telemetry 
which manages the life-cycle
+ * of the client telemetry collection process. The client telemetry reporter 
is responsible for
+ * collecting the client telemetry data and sending it to the broker.
+ * 
+ *
+ * The client telemetry reporter is configured with a {@link 
ClientTelemetrySender} which is
+ * responsible for sending the client telemetry data to the broker. The client 
telemetry reporter
+ * will attempt to fetch the telemetry subscription information from the 
broker and send the
+ * telemetry data to the broker based on the subscription information.

Review Comment:
   I have added `i.e. push interval, temporality, compression type, etc.` in 
comment to make to more specific.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-12-01 Thread via GitHub


apoorvmittal10 commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1411855963


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryProvider.java:
##
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.common.v1.AnyValue;
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.resource.v1.Resource;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ClientTelemetryProvider implements Configurable {
+
+public static final String DOMAIN = "org.apache.kafka";
+// Client metrics tags
+public static final String CLIENT_RACK = "client_rack";
+public static final String GROUP_ID = "group_id";
+public static final String GROUP_INSTANCE_ID = "group_instance_id";
+public static final String GROUP_MEMBER_ID = "group_member_id";
+public static final String TRANSACTIONAL_ID = "transactional_id";
+
+private static final String PRODUCER_NAMESPACE = "kafka.producer";
+private static final String CONSUMER_NAMESPACE = "kafka.consumer";
+
+private static final Map PRODUCER_CONFIG_MAPPING = new 
HashMap<>();
+private static final Map CONSUMER_CONFIG_MAPPING = new 
HashMap<>();
+
+private volatile Resource resource = null;
+private Map config = null;
+
+// Mapping of config keys to telemetry keys. Contains only keys which can 
be fetched from config.
+// Config like group_member_id is not present here as it is not fetched 
from config.
+static {
+PRODUCER_CONFIG_MAPPING.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
ClientTelemetryProvider.TRANSACTIONAL_ID);
+
+CONSUMER_CONFIG_MAPPING.put(ConsumerConfig.GROUP_ID_CONFIG, 
ClientTelemetryProvider.GROUP_ID);
+CONSUMER_CONFIG_MAPPING.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, 
ClientTelemetryProvider.GROUP_INSTANCE_ID);
+}
+
+@Override
+public synchronized void configure(Map configs) {
+this.config = configs;
+}
+
+/**
+ * Validate that all the data required for generating correct metrics is 
present.
+ *
+ * @param metricsContext {@link MetricsContext}
+ * @return false if all the data required for generating correct metrics 
is missing, true
+ * otherwise.
+ */
+public boolean validate(MetricsContext metricsContext) {
+return 
ClientTelemetryUtils.validateRequiredResourceLabels(metricsContext.contextLabels());
+}
+
+/**
+ * Sets the metrics tags for the service or library exposing metrics. This 
will be called before
+ * {@link org.apache.kafka.common.metrics.MetricsReporter#init(List)} and 
may be called anytime
+ * after that.
+ *
+ * @param metricsContext {@link MetricsContext}
+ */
+public synchronized void contextChange(MetricsContext metricsContext) {
+final Resource.Builder resourceBuilder = Resource.newBuilder();
+
+final String namespace = 
metricsContext.contextLabels().get(MetricsContext.NAMESPACE);
+if (PRODUCER_NAMESPACE.equals(namespace)) {
+// Add producer resource labels.
+PRODUCER_CONFIG_MAPPING.forEach((configKey, telemetryKey) -> {
+if (config.containsKey(configKey)) {
+addAttribute(resourceBuilder, telemetryKey, 
String.valueOf(config.get(configKey)));
+}
+});
+} else if (CONSUMER_NAMESPACE.equals(namespace)) {
+// Add consumer resource labels.
+CONSUMER_CONFIG_MAPPING.forEach((configKey, telemetryKey) -> {
+if (config.containsKey(configKey)) {
+addAttribute(resourceBuilder, telemetryKey, 
String.valueOf(config.get(configKey)));
+}
+});
+}
+
+// Add client rack label.
+if 

Re: [PR] KAFKA-14987 [2/2]; customize retry backoff for group/offsets expiration [kafka]

2023-12-01 Thread via GitHub


dajac commented on code in PR #14870:
URL: https://github.com/apache/kafka/pull/14870#discussion_r1411843255


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -620,8 +621,9 @@ public boolean cleanupExpiredOffsets(String groupId, 
List records) {
 });
 
 if (!expiredPartitions.isEmpty()) {
-log.info("[GroupId {}] Expiring offsets of partitions 
(allOffsetsExpired={}): {}",
-groupId, allOffsetsExpired, String.join(", ", 
expiredPartitions));
+log.info("[GroupId {}] Expiring {} offsets (allOffsetsExpired={}) 
in {} milliseconds.",
+groupId, expiredPartitions.size(), allOffsetsExpired, 
time.milliseconds() - startMs);
+log.debug("[GroupId {}] Expired partitions: {}", groupId, 
String.join(", ", expiredPartitions));

Review Comment:
   Instead of this one, I wonder if we should just add a debug message for 
every expired offset at L613. What do you think?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -620,8 +621,9 @@ public boolean cleanupExpiredOffsets(String groupId, 
List records) {
 });
 
 if (!expiredPartitions.isEmpty()) {
-log.info("[GroupId {}] Expiring offsets of partitions 
(allOffsetsExpired={}): {}",
-groupId, allOffsetsExpired, String.join(", ", 
expiredPartitions));
+log.info("[GroupId {}] Expiring {} offsets (allOffsetsExpired={}) 
in {} milliseconds.",

Review Comment:
   I don't really get the value of adding the time here. What's the reasoning 
behind it? However, I think that it would make sense to add a log in 
`cleanupGroupMetadata` which contains the time taken and the number of records 
generated.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorTimer.java:
##
@@ -52,6 +52,20 @@ interface TimeoutOperation {
  */
 void schedule(String key, long delay, TimeUnit unit, boolean retry, 
TimeoutOperation operation);
 
+/**
+ * Add an operation to the timer. If an operation with the same key
+ * already exists, replace it with the new operation.
+ *
+ * @param key   The key to identify this operation.
+ * @param delay The delay to wait before expiring.
+ * @param unit  The delay unit.
+ * @param retry A boolean indicating whether the operation should
+ *  be retried on failure.
+ * @param retryBackoff  The delay when rescheduled on retry.
+ * @param operation The operation to perform upon expiration.
+ */
+void schedule(String key, long delay, TimeUnit unit, boolean retry, long 
retryBackoff, TimeoutOperation operation);

Review Comment:
   Does it rely on the `unit`?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -281,6 +281,18 @@ public void schedule(
 TimeUnit unit,
 boolean retry,
 TimeoutOperation operation
+) {
+schedule(key, delay, unit, retry, 500, operation);
+}
+
+@Override
+public void schedule(
+String key,
+long delay,
+TimeUnit unit,
+boolean retry,
+long retryBackoff,

Review Comment:
   What's the unit of this? Does it rely on `unit`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15890: Consumer.poll with long timeout unaware of assigned partitions [kafka]

2023-12-01 Thread via GitHub


AndrewJSchofield commented on code in PR #14835:
URL: https://github.com/apache/kafka/pull/14835#discussion_r1411833816


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -205,6 +205,26 @@ public void wakeup() {
 networkClientDelegate.wakeup();
 }
 
+/**
+ * Returns the delay for which the application thread can safely wait 
before it should be responsive
+ * to results from the request managers. For example, the subscription 
state can change when heartbeats
+ * are sent, so blocking for longer than the heartbeat interval might mean 
the application thread is not
+ * responsive to changes.
+ *
+ * @return The maximum delay in milliseconds
+ */
+public long maximumTimeToWait() {
+final long currentTimeMs = time.milliseconds();
+if (requestManagers == null) {
+return MAX_POLL_TIMEOUT_MS;
+}
+return requestManagers.entries().stream()
+.filter(Optional::isPresent)
+.map(Optional::get)
+.map(rm -> rm.maximumTimeToWait(currentTimeMs))
+.reduce(Long.MAX_VALUE, Math::min);
+}
+

Review Comment:
   I'll take this concept and work on it in a follow-on PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14509: [2/N] Implement server side logic for ConsumerGroupDescribe API [kafka]

2023-12-01 Thread via GitHub


dajac commented on code in PR #14544:
URL: https://github.com/apache/kafka/pull/14544#discussion_r1411829247


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -547,6 +549,69 @@ public CompletableFuture 
listGroups(
 return future;
 }
 
+/**
+ * See {@link GroupCoordinator#consumerGroupDescribe(RequestContext, 
List)}.
+ */
+@Override
+public 
CompletableFuture> 
consumerGroupDescribe(
+RequestContext context,
+List groupIds
+) {
+if (!isActive.get()) {
+return 
CompletableFuture.completedFuture(ConsumerGroupDescribeRequest.getErrorDescribedGroupList(
+groupIds,
+Errors.COORDINATOR_NOT_AVAILABLE
+));
+}
+
+final 
List>> 
futures =
+new ArrayList<>(groupIds.size());
+final Map> groupsByTopicPartition = new 
HashMap<>();
+groupIds.forEach(groupId -> {
+// For backwards compatibility, we support DescribeGroups for the 
empty group id.
+if (groupId == null) {

Review Comment:
   Actually, we don't support the empty group id for this new API. We should 
use `isGroupIdNotEmpty` here.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -547,6 +549,69 @@ public CompletableFuture 
listGroups(
 return future;
 }
 
+/**
+ * See {@link GroupCoordinator#consumerGroupDescribe(RequestContext, 
List)}.
+ */
+@Override
+public 
CompletableFuture> 
consumerGroupDescribe(
+RequestContext context,
+List groupIds
+) {
+if (!isActive.get()) {
+return 
CompletableFuture.completedFuture(ConsumerGroupDescribeRequest.getErrorDescribedGroupList(
+groupIds,
+Errors.COORDINATOR_NOT_AVAILABLE
+));
+}
+
+final 
List>> 
futures =
+new ArrayList<>(groupIds.size());
+final Map> groupsByTopicPartition = new 
HashMap<>();
+groupIds.forEach(groupId -> {
+// For backwards compatibility, we support DescribeGroups for the 
empty group id.
+if (groupId == null) {

Review Comment:
   Actually, we don't support the empty group id for this new API. We should 
use `isGroupIdNotEmpty` here and remove the comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [2/N]: Client state machine updates [kafka]

2023-12-01 Thread via GitHub


dajac commented on code in PR #14857:
URL: https://github.com/apache/kafka/pull/14857#discussion_r1411824345


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -365,6 +366,16 @@ private void replaceUnresolvedAssignmentWithNewAssignment(
  */
 @Override
 public void transitionToFenced() {
+if (state == MemberState.PREPARE_LEAVING || state == 
MemberState.LEAVING) {
+log.debug("Member {} with epoch {} got fenced but it is already 
leaving the group " +
+"with state {}, so it won't attempt to rejoin.", memberId, 
memberEpoch, state);
+return;
+}
+if (state == MemberState.UNSUBSCRIBED) {
+log.debug("Member {} with epoch {} got fenced but it already left 
the group, so it " +
+"won't attempt to rejoin.", memberId, memberEpoch);
+return;
+}

Review Comment:
   You're absolutely right. This is the current behaviour in the legacy 
consumer. I was wondering if the current behaviour is actually correct in the 
legacy consumer as well. Anyway, we don't have to address this here as it is 
fine to replicate the current behaviour. However, we should think a little more 
about this case, I think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14509: [2/2] Implement server side logic for ConsumerGroupDescribe API [kafka]

2023-12-01 Thread via GitHub


dajac commented on code in PR #14544:
URL: https://github.com/apache/kafka/pull/14544#discussion_r1411799674


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -547,6 +549,69 @@ public CompletableFuture 
listGroups(
 return future;
 }
 
+/**
+ * See {@link GroupCoordinator#consumerGroupDescribe(RequestContext, 
List)}.
+ */
+@Override
+public 
CompletableFuture> 
consumerGroupDescribe(
+RequestContext context,
+List groupIds
+) {
+if (!isActive.get()) {
+return 
CompletableFuture.completedFuture(ConsumerGroupDescribeRequest.getErrorDescribedGroupList(
+groupIds,
+Errors.COORDINATOR_NOT_AVAILABLE
+));
+}
+
+final 
List>> 
futures =
+new ArrayList<>(groupIds.size());
+final Map> groupsByTopicPartition = new 
HashMap<>();
+groupIds.forEach(groupId -> {
+// For backwards compatibility, we support DescribeGroups for the 
empty group id.
+if (groupId == null) {
+
futures.add(CompletableFuture.completedFuture(Collections.singletonList(
+new ConsumerGroupDescribeResponseData.DescribedGroup()
+.setGroupId(null)
+.setErrorCode(Errors.INVALID_GROUP_ID.code())
+.setErrorMessage(Errors.INVALID_GROUP_ID.message())
+)));
+} else {
+groupsByTopicPartition
+.computeIfAbsent(topicPartitionFor(groupId), __ -> new 
ArrayList<>())
+.add(groupId);
+}
+});
+
+groupsByTopicPartition.forEach((topicPartition, groupList) -> {
+
CompletableFuture> 
future =
+runtime.scheduleReadOperation(
+"consumer-group-describe",
+topicPartition,
+(coordinator, lastCommittedOffset) -> 
coordinator.consumerGroupDescribe(groupIds, lastCommittedOffset)
+).exceptionally(exception -> {
+if (!(exception instanceof KafkaException)) {
+log.error("ConsumerGroupDescribe request {} hit an 
unexpected exception: {}.",
+groupList, exception.getMessage());
+}
+
+return 
ConsumerGroupDescribeRequest.getErrorDescribedGroupList(
+groupList,
+Errors.forException(exception)
+);
+});
+
+futures.add(future);
+});
+
+final CompletableFuture allFutures = 
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
+return allFutures.thenApply(v -> {
+final List res = 
new ArrayList<>();
+futures.forEach(future -> res.addAll(future.join()));
+return res;
+});

Review Comment:
   I think that we have more or less the same code elsewhere in this class. If 
you are interested, we could try to refactor this into an helper method as a 
follow-up. We can keep it as-is in this pull request though.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -468,6 +468,33 @@ public List 
listGroups(List statesFi
 return groupStream.map(group -> 
group.asListedGroup(committedOffset)).collect(Collectors.toList());
 }
 
+
+/**
+ * Handles a ConsumerGroupDescribe request.
+ * @param groupIds  The IDs of the groups to describe.

Review Comment:
   nit: We usually put an empty line between the description and the params.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -547,6 +549,69 @@ public CompletableFuture 
listGroups(
 return future;
 }
 
+/**
+ * See {@link GroupCoordinator#consumerGroupDescribe(RequestContext, 
List)}.
+ */
+@Override
+public 
CompletableFuture> 
consumerGroupDescribe(
+RequestContext context,
+List groupIds
+) {
+if (!isActive.get()) {
+return 
CompletableFuture.completedFuture(ConsumerGroupDescribeRequest.getErrorDescribedGroupList(
+groupIds,
+Errors.COORDINATOR_NOT_AVAILABLE
+));
+}
+
+final 
List>> 
futures =
+new ArrayList<>(groupIds.size());
+final Map> groupsByTopicPartition = new 
HashMap<>();
+groupIds.forEach(groupId -> {
+// For backwards compatibility, we support DescribeGroups for the 
empty group id.
+if (groupId == null) {
+
futures.add(CompletableFuture.completedFuture(Collections.singletonList(
+new ConsumerGroupDescribeResponseData.DescribedGroup()
+.setGroupId(null)
+ 

Re: [PR] KAFKA-15945 : Flaky test - testSyncTopicConfigs() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest [kafka]

2023-12-01 Thread via GitHub


ashwinpankaj commented on code in PR #14893:
URL: https://github.com/apache/kafka/pull/14893#discussion_r1411813212


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -770,6 +771,9 @@ public void testSyncTopicConfigs() throws 
InterruptedException {
 Map> configOps = 
Collections.singletonMap(configResource, ops);
 // alter configs on target cluster
 backup.kafka().incrementalAlterConfigs(configOps);
+// wait until the configs are changed
+waitForConfigValueChange(backup, backupTopic, "delete.retention.ms", 
"2000");
+waitForConfigValueChange(backup, backupTopic, "retention.bytes", 
"2000");

Review Comment:
   thanks looking into this @atu-sharm !
   
   I wonder if a wait for `retention.bytes` to be equal to 2000 will always 
succeed.
   waitForCondition has a default poll interval of 100 ms. If the value is set 
to 2000 and then changed to 1000 by MirrorMaker within the time the thread 
sleeps (100ms) , we may miss the value change and timeout.
   
   Can't we just wait on the futures returned by incrementalAlterConfigs ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15945 : Flaky test - testSyncTopicConfigs() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest [kafka]

2023-12-01 Thread via GitHub


atu-sharm commented on PR #14893:
URL: https://github.com/apache/kafka/pull/14893#issuecomment-1835716040

   Hi @mimaison @C0urante can you have a look at this once?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15945) Flaky test - testSyncTopicConfigs() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest

2023-12-01 Thread Atul Sharma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17791951#comment-17791951
 ] 

Atul Sharma commented on KAFKA-15945:
-

Raised PR: https://github.com/apache/kafka/pull/14893

> Flaky test - testSyncTopicConfigs() – 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
> -
>
> Key: KAFKA-15945
> URL: https://issues.apache.org/jira/browse/KAFKA-15945
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.7.0
>Reporter: Andrew Schofield
>Priority: Major
>  Labels: flaky-test
>
> Last seen: 
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14811/7/tests
> Error
> org.opentest4j.AssertionFailedError: `delete.retention.ms` should be 2000, 
> because it's explicitly defined on the target topic!  ==> expected: <2000> 
> but was: <8640>
> Stacktrace
> org.opentest4j.AssertionFailedError: `delete.retention.ms` should be 2000, 
> because it's explicitly defined on the target topic!  ==> expected: <2000> 
> but was: <8640>
>   at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>   at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>   at 
> app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
>   at 
> app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)
>   at 
> app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1152)
>   at 
> app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.lambda$testSyncTopicConfigs$8(MirrorConnectorsIntegrationBaseTest.java:780)
>   at 
> app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:331)
>   at 
> app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379)
>   at 
> app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:328)
>   at 
> app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:312)
>   at 
> app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:302)
>   at 
> app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testSyncTopicConfigs(MirrorConnectorsIntegrationBaseTest.java:774)
>   at 
> java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>   at 
> java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>   at 
> java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568)
>   at 
> app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
>   at 
> app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>   at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
>   at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
>   at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
>   at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
>   at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
>   at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
>   at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
>   at 
> 

Re: [PR] KAFKA-15276: Implement event plumbing for ConsumerRebalanceListener callbacks [kafka]

2023-12-01 Thread via GitHub


dajac commented on PR #14640:
URL: https://github.com/apache/kafka/pull/14640#issuecomment-1835695981

   @kirktrue Thanks for the PR. Could you please update the description? The 
current one looks pretty outdated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15695: Update the local log start offset of a log after rebuilding the auxiliary state [kafka]

2023-12-01 Thread via GitHub


satishd commented on PR #14649:
URL: https://github.com/apache/kafka/pull/14649#issuecomment-1835672440

   @nikramakrishnan Can you pull out the integration test from these changes 
and raise a separate PR resolving the intermittent failure so that we can 
unblock merging this change with the added UTs in this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Disable FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor [kafka]

2023-12-01 Thread via GitHub


dajac merged PR #14876:
URL: https://github.com/apache/kafka/pull/14876


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



<    1   2