Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]

2023-11-10 Thread via GitHub


mdedetrich commented on code in PR #12728:
URL: https://github.com/apache/kafka/pull/12728#discussion_r1389047935


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -817,19 +731,11 @@ public void testAccessors() throws Exception {
 Herder.Created connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
 assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
 
-EasyMock.reset(transformer);
-EasyMock.expect(transformer.transform(eq(CONNECTOR_NAME), 
EasyMock.anyObject()))

Review Comment:
   Committed and pushed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]

2023-11-10 Thread via GitHub


mdedetrich commented on code in PR #12728:
URL: https://github.com/apache/kafka/pull/12728#discussion_r1389049719


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -838,39 +733,32 @@ public void testPutConnectorConfig() throws Exception {
 Map newConnConfig = new HashMap<>(connConfig);
 newConnConfig.put("foo", "bar");
 
-Callback> connectorConfigCb = 
PowerMock.createMock(Callback.class);
-// Callback> putConnectorConfigCb = 
PowerMock.createMock(Callback.class);
+Callback> connectorConfigCb = mock(Callback.class);
 
 // Create
-connector = PowerMock.createMock(BogusSourceConnector.class);
+connector = mock(BogusSourceConnector.class);
 expectAdd(SourceSink.SOURCE);
-Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+Connector connectorMock = mock(SourceConnector.class);
 expectConfigValidation(connectorMock, true, connConfig);
 
 // Should get first config
-connectorConfigCb.onCompletion(null, connConfig);
-EasyMock.expectLastCall();
+doNothing().when(connectorConfigCb).onCompletion(null, connConfig);
 // Update config, which requires stopping and restarting
-worker.stopAndAwaitConnector(CONNECTOR_NAME);
-EasyMock.expectLastCall();
-Capture> capturedConfig = EasyMock.newCapture();
-Capture> onStart = EasyMock.newCapture();
-worker.startConnector(eq(CONNECTOR_NAME), 
EasyMock.capture(capturedConfig), EasyMock.anyObject(),
-eq(herder), eq(TargetState.STARTED), 
EasyMock.capture(onStart));
-EasyMock.expectLastCall().andAnswer(() -> {
+doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME);
+final ArgumentCaptor> capturedConfig = 
ArgumentCaptor.forClass(Map.class);
+final ArgumentCaptor> onStart = 
ArgumentCaptor.forClass(Callback.class);
+doAnswer(invocation -> {
 onStart.getValue().onCompletion(null, TargetState.STARTED);
 return true;
-});
+}).when(worker).startConnector(eq(CONNECTOR_NAME), 
capturedConfig.capture(), any(),
+eq(herder), eq(TargetState.STARTED), onStart.capture());
 // Generate same task config, which should result in no additional 
action to restart tasks
-EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, new 
SourceConnectorConfig(plugins, newConnConfig, true)))
-.andReturn(singletonList(taskConfig(SourceSink.SOURCE)));
+when(worker.connectorTaskConfigs(CONNECTOR_NAME, new 
SourceConnectorConfig(plugins, newConnConfig, true)))
+.thenReturn(singletonList(taskConfig(SourceSink.SOURCE)));
 
 expectConfigValidation(connectorMock, false, newConnConfig);
-connectorConfigCb.onCompletion(null, newConnConfig);
-EasyMock.expectLastCall();
-EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
-
-PowerMock.replayAll();
+doNothing().when(connectorConfigCb).onCompletion(null, newConnConfig);

Review Comment:
   Done and pushed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]

2023-11-10 Thread via GitHub


mdedetrich commented on code in PR #12728:
URL: https://github.com/apache/kafka/pull/12728#discussion_r1389050516


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -838,39 +733,32 @@ public void testPutConnectorConfig() throws Exception {
 Map newConnConfig = new HashMap<>(connConfig);
 newConnConfig.put("foo", "bar");
 
-Callback> connectorConfigCb = 
PowerMock.createMock(Callback.class);
-// Callback> putConnectorConfigCb = 
PowerMock.createMock(Callback.class);
+Callback> connectorConfigCb = mock(Callback.class);
 
 // Create
-connector = PowerMock.createMock(BogusSourceConnector.class);
+connector = mock(BogusSourceConnector.class);
 expectAdd(SourceSink.SOURCE);
-Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+Connector connectorMock = mock(SourceConnector.class);
 expectConfigValidation(connectorMock, true, connConfig);
 
 // Should get first config
-connectorConfigCb.onCompletion(null, connConfig);
-EasyMock.expectLastCall();
+doNothing().when(connectorConfigCb).onCompletion(null, connConfig);
 // Update config, which requires stopping and restarting
-worker.stopAndAwaitConnector(CONNECTOR_NAME);
-EasyMock.expectLastCall();
-Capture> capturedConfig = EasyMock.newCapture();
-Capture> onStart = EasyMock.newCapture();
-worker.startConnector(eq(CONNECTOR_NAME), 
EasyMock.capture(capturedConfig), EasyMock.anyObject(),
-eq(herder), eq(TargetState.STARTED), 
EasyMock.capture(onStart));
-EasyMock.expectLastCall().andAnswer(() -> {
+doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME);
+final ArgumentCaptor> capturedConfig = 
ArgumentCaptor.forClass(Map.class);
+final ArgumentCaptor> onStart = 
ArgumentCaptor.forClass(Callback.class);
+doAnswer(invocation -> {
 onStart.getValue().onCompletion(null, TargetState.STARTED);
 return true;
-});
+}).when(worker).startConnector(eq(CONNECTOR_NAME), 
capturedConfig.capture(), any(),
+eq(herder), eq(TargetState.STARTED), onStart.capture());
 // Generate same task config, which should result in no additional 
action to restart tasks
-EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, new 
SourceConnectorConfig(plugins, newConnConfig, true)))
-.andReturn(singletonList(taskConfig(SourceSink.SOURCE)));
+when(worker.connectorTaskConfigs(CONNECTOR_NAME, new 
SourceConnectorConfig(plugins, newConnConfig, true)))
+.thenReturn(singletonList(taskConfig(SourceSink.SOURCE)));
 
 expectConfigValidation(connectorMock, false, newConnConfig);
-connectorConfigCb.onCompletion(null, newConnConfig);
-EasyMock.expectLastCall();
-EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
-
-PowerMock.replayAll();
+doNothing().when(connectorConfigCb).onCompletion(null, newConnConfig);
+when(worker.getPlugins()).thenReturn(plugins);

Review Comment:
   Done and pushed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15552 Fix Producer ID ZK migration [kafka]

2023-11-10 Thread via GitHub


showuon commented on PR #14506:
URL: https://github.com/apache/kafka/pull/14506#issuecomment-1805278003

   @jolshan , I just created a [PR](https://github.com/apache/kafka/pull/14730) 
to revert the change in this PR, and the test results also failed 
`testSingleIP` test. So, let's keep monitoring it, and maybe we need to change 
the test, because resolving localhost into multiple IPs should be fine.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15711: KRaft support in LogRecoveryTest [kafka]

2023-11-10 Thread via GitHub


linzihao1999 commented on code in PR #14693:
URL: https://github.com/apache/kafka/pull/14693#discussion_r1389074063


##
core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala:
##
@@ -212,7 +228,7 @@ class LogRecoveryTest extends QuorumTestHarness {
 server2.startup()
 updateProducer()
 // check if leader moves to the other server
-leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 
oldLeaderOpt = Some(leader))
+leader = awaitLeaderChange(servers, topicPartition, leader)

Review Comment:
   I think waitUntilLeaderIsElectedOrChangedWithAdmin can use to instead of it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15711: KRaft support in LogRecoveryTest [kafka]

2023-11-10 Thread via GitHub


linzihao1999 commented on code in PR #14693:
URL: https://github.com/apache/kafka/pull/14693#discussion_r1389074063


##
core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala:
##
@@ -212,7 +228,7 @@ class LogRecoveryTest extends QuorumTestHarness {
 server2.startup()
 updateProducer()
 // check if leader moves to the other server
-leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 
oldLeaderOpt = Some(leader))
+leader = awaitLeaderChange(servers, topicPartition, leader)

Review Comment:
   I think waitUntilLeaderIsElectedOrChangedWithAdmin can use to instead of it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [Draft] KIP-994 [kafka]

2023-11-10 Thread via GitHub


RamanVerma opened a new pull request, #14731:
URL: https://github.com/apache/kafka/pull/14731

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15360: Include dirs in BrokerRegistration [kafka]

2023-11-10 Thread via GitHub


dajac commented on PR #14392:
URL: https://github.com/apache/kafka/pull/14392#issuecomment-1805313339

   This PR has introduces a compilation error (JDK 8 and Scala 2.12):
   
   ```
   > Task :core:compileScala
   
   [Error] 
/home/jenkins/workspace/Kafka_kafka-pr_PR-14392/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:305:49:
 value incl is not a member of 
scala.collection.immutable.Set[org.apache.kafka.common.Uuid]
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15711: KRaft support in LogRecoveryTest [kafka]

2023-11-10 Thread via GitHub


linzihao1999 commented on code in PR #14693:
URL: https://github.com/apache/kafka/pull/14693#discussion_r1389074590


##
core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala:
##
@@ -212,7 +228,7 @@ class LogRecoveryTest extends QuorumTestHarness {
 server2.startup()
 updateProducer()
 // check if leader moves to the other server
-leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 
oldLeaderOpt = Some(leader))
+leader = awaitLeaderChange(servers, topicPartition, leader)

Review Comment:
   I think waitUntilLeaderIsElectedOrChangedWithAdmin can use to instead of it.



##
core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala:
##
@@ -78,15 +84,20 @@ class LogRecoveryTest extends QuorumTestHarness {
   override def setUp(testInfo: TestInfo): Unit = {
 super.setUp(testInfo)
 
-configs = TestUtils.createBrokerConfigs(2, zkConnect, 
enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
+configs = TestUtils.createBrokerConfigs(2, zkConnectOrNull, 
enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
 
 // start both servers
-server1 = TestUtils.createServer(configProps1)
-server2 = TestUtils.createServer(configProps2)
+server1 = createBroker(configProps1)
+server2 = createBroker(configProps2)
 servers = List(server1, server2)
 
 // create topic with 1 partition, 2 replicas, one on each broker
-createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> 
Seq(0,1)), servers = servers)
+if (isKRaftTest()) {
+  admin = createAdminClient(servers, 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
+  createTopicWithAdmin(admin, topic, servers, 2, replicaAssignment = Map(0 
-> Seq(0, 1)))
+} else {
+  createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> 
Seq(0, 1)), servers = servers)
+}

Review Comment:
   I think createTopicWithAdmin includes ZK and KRaft mode, and no need to use 
`isKRaftTest` to explicitly determine it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15645 ReplicationQuotasTestRig rewritten in java [kafka]

2023-11-10 Thread via GitHub


nizhikov commented on PR #14588:
URL: https://github.com/apache/kafka/pull/14588#issuecomment-1805320880

   Now only 24 tests failed.
   And the all seems not related to the tool.
   
   @mimaison do you have more comments to the PR?
   Can we merge it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] HOTFIX: Fix compilation error in BrokerLifecycleManager [kafka]

2023-11-10 Thread via GitHub


dajac opened a new pull request, #14732:
URL: https://github.com/apache/kafka/pull/14732

   https://github.com/apache/kafka/pull/14392 has introduces a compilation 
error (JDK 8 and Scala 2.12):
   
   ```
   > Task :core:compileScala
   
   [Error] 
/home/jenkins/workspace/Kafka_kafka-pr_PR-14392/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:305:49:
 value incl is not a member of 
scala.collection.immutable.Set[org.apache.kafka.common.Uuid]
   ```
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] HOTFIX: Fix compilation error in BrokerLifecycleManager [kafka]

2023-11-10 Thread via GitHub


dajac commented on PR #14732:
URL: https://github.com/apache/kafka/pull/14732#issuecomment-1805328685

   cc @soarez 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] HOTFIX: Fix compilation error in BrokerLifecycleManager [kafka]

2023-11-10 Thread via GitHub


dajac commented on PR #14732:
URL: https://github.com/apache/kafka/pull/14732#issuecomment-1805337071

   @showuon @mimaison @cadonna @lucasbru If you are around, could you take a 
quick look at this one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] HOTFIX: Fix compilation error in BrokerLifecycleManager [kafka]

2023-11-10 Thread via GitHub


dajac commented on PR #14732:
URL: https://github.com/apache/kafka/pull/14732#issuecomment-1805372330

   The compilation step has passed for all builds and I have verified that 
`kafka.server.BrokerLifecycleManagerTest` passed locally with both scala 
versions. Therefore, I will merge this PR without waiting on the full build to 
complete in order to unblock all the other PRs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] HOTFIX: Fix compilation error in BrokerLifecycleManager [kafka]

2023-11-10 Thread via GitHub


dajac commented on PR #14732:
URL: https://github.com/apache/kafka/pull/14732#issuecomment-1805370594

   @showuon Thanks for the review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] HOTFIX: Fix compilation error in BrokerLifecycleManager [kafka]

2023-11-10 Thread via GitHub


dajac merged PR #14732:
URL: https://github.com/apache/kafka/pull/14732


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]

2023-11-10 Thread via GitHub


C0urante commented on code in PR #12728:
URL: https://github.com/apache/kafka/pull/12728#discussion_r1388621193


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -722,89 +646,68 @@ public void testRestartConnectorAndTasksBoth() throws 
Exception {
 FutureCallback restartCallback = new 
FutureCallback<>();
 herder.restartConnectorAndTasks(restartRequest, restartCallback);
 assertEquals(connectorStateInfo, restartCallback.get(1000L, 
TimeUnit.MILLISECONDS));
-PowerMock.verifyAll();
+
+verifyConnectorStatusRestart();
+verify(statusBackingStore).put(taskStatus.capture());
 }
 
 @Test
 public void testCreateAndStop() throws Exception {
-connector = PowerMock.createMock(BogusSourceConnector.class);
+connector = mock(BogusSourceConnector.class);
 expectAdd(SourceSink.SOURCE);
 
 Map connectorConfig = 
connectorConfig(SourceSink.SOURCE);
-Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+Connector connectorMock = mock(SourceConnector.class);
 expectConfigValidation(connectorMock, true, connectorConfig);
 
-// herder.stop() should stop any running connectors and tasks even if 
destroyConnector was not invoked
 expectStop();
 
-statusBackingStore.put(new TaskStatus(new 
ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 
0));
-EasyMock.expectLastCall();
-
-statusBackingStore.stop();
-EasyMock.expectLastCall();
-worker.stop();
-EasyMock.expectLastCall();
-
-PowerMock.replayAll();
-
 herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, 
createCallback);
 Herder.Created connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
 assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
 
+// herder.stop() should stop any running connectors and tasks even if 
destroyConnector was not invoked
 herder.stop();
 assertTrue(noneConnectorClientConfigOverridePolicy.isClosed());
-
-PowerMock.verifyAll();
+verify(worker).stop();
+verify(statusBackingStore).stop();
+verify(statusBackingStore).put(new TaskStatus(new 
ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 
0));
 }
 
 @Test
 public void testAccessors() throws Exception {
 Map connConfig = connectorConfig(SourceSink.SOURCE);
 System.out.println(connConfig);
 
-Callback> listConnectorsCb = 
PowerMock.createMock(Callback.class);
-Callback connectorInfoCb = 
PowerMock.createMock(Callback.class);
-Callback> connectorConfigCb = 
PowerMock.createMock(Callback.class);
-Callback> taskConfigsCb = 
PowerMock.createMock(Callback.class);
-Callback>> tasksConfigCb = 
PowerMock.createMock(Callback.class);
+Callback> listConnectorsCb = mock(Callback.class);
+Callback connectorInfoCb = mock(Callback.class);
+Callback> connectorConfigCb = mock(Callback.class);
+Callback> taskConfigsCb = mock(Callback.class);
+Callback>> tasksConfigCb = 
mock(Callback.class);
 
 // Check accessors with empty worker
-listConnectorsCb.onCompletion(null, Collections.EMPTY_SET);
-EasyMock.expectLastCall();
-connectorInfoCb.onCompletion(EasyMock.anyObject(), 
EasyMock.isNull());
-EasyMock.expectLastCall();
-
connectorConfigCb.onCompletion(EasyMock.anyObject(), 
EasyMock.isNull());
-EasyMock.expectLastCall();

Review Comment:
   Aren't we missing a corresponding `doNothing().when(...)` line for this 
expectation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2023-11-10 Thread via GitHub


VictorvandenHoven commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1389220068


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -609,17 +842,18 @@ public void testOrdering() {
 inputTopic1.pipeInput(1, "A1", 100L);
 processor.checkAndClearProcessResult();
 
-// push one item to the other window that has a join; this should 
produce non-joined records with a closed window first, then
-// the joined records
+// push one item to the other window that has a join; 
+// this should produce the joined record first;
+// then non-joined record with a closed window
 // by the time they were produced before
 // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // w2 = { }
 // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // --> w2 = { 1:a1 (ts: 110) }
 inputTopic2.pipeInput(1, "a1", 110L);
 processor.checkAndClearProcessResult(
-new KeyValueTimestamp<>(0, "A0+null", 0L),
-new KeyValueTimestamp<>(1, "A1+a1", 110L)
+new KeyValueTimestamp<>(1, "A1+a1", 110L),
+new KeyValueTimestamp<>(0, "A0+null", 0L)

Review Comment:
   Can you point me to the ordering-rules in case of a  left- outer- inner- 
join, then I can have a look.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-10 Thread via GitHub


dajac commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1389220863


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -96,7 +96,7 @@ public NetworkClientDelegate.PollResult poll(final long 
currentTimeMs) {
  * @param topic to be requested. If empty, return the metadata for all 
topics.
  * @return the future of the metadata request.
  */
-public CompletableFuture>> 
requestTopicMetadata(final Optional topic) {
+public CompletableFuture>> 
requestTopicMetadata(final Optional topic) {

Review Comment:
   Do we still need the changes in this class? It seems that we don't use them 
anymore. I have the same question for the new `Topic` class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Added topicExists functions in AdminClient [kafka]

2023-11-10 Thread via GitHub


makarushka closed pull request #8340: Added topicExists functions in AdminClient
URL: https://github.com/apache/kafka/pull/8340


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]

2023-11-10 Thread via GitHub


mdedetrich commented on code in PR #12728:
URL: https://github.com/apache/kafka/pull/12728#discussion_r1389227114


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -656,48 +586,43 @@ public void testRestartConnectorAndTasksOnlyTasks() 
throws Exception {
 FutureCallback restartCallback = new 
FutureCallback<>();
 herder.restartConnectorAndTasks(restartRequest, restartCallback);
 assertEquals(connectorStateInfo, restartCallback.get(1000L, 
TimeUnit.MILLISECONDS));
-PowerMock.verifyAll();
+ArgumentCaptor taskStatus = 
ArgumentCaptor.forClass(TaskStatus.class);
+verify(statusBackingStore).put(taskStatus.capture());
+assertEquals(AbstractStatus.State.RESTARTING, 
taskStatus.getValue().state());

Review Comment:
   Commited and pushed



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -722,89 +646,68 @@ public void testRestartConnectorAndTasksBoth() throws 
Exception {
 FutureCallback restartCallback = new 
FutureCallback<>();
 herder.restartConnectorAndTasks(restartRequest, restartCallback);
 assertEquals(connectorStateInfo, restartCallback.get(1000L, 
TimeUnit.MILLISECONDS));
-PowerMock.verifyAll();
+
+verifyConnectorStatusRestart();
+verify(statusBackingStore).put(taskStatus.capture());

Review Comment:
   Commited and pushed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-10 Thread via GitHub


soarez commented on code in PR #14369:
URL: https://github.com/apache/kafka/pull/14369#discussion_r1389229875


##
core/src/main/java/kafka/server/AssignmentsManager.java:
##
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData;
+import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
+import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class AssignmentsManager {
+
+private static final Logger log = 
LoggerFactory.getLogger(AssignmentsManager.class);
+
+/**
+ * Assignments are dispatched to the controller this long after
+ * being submitted to {@link AssignmentsManager}, if there
+ * is no request in flight already.
+ * The interval is reset when a new assignment is submitted.
+ */
+private static final long DISPATCH_INTERVAL_NS = 
TimeUnit.SECONDS.toNanos(1);
+
+private static final long MAX_BACKOFF_INTERVAL_MS = 
TimeUnit.SECONDS.toNanos(10);
+
+private final Time time;
+private final NodeToControllerChannelManager channelManager;
+private final int brokerId;
+private final Supplier brokerEpochSupplier;
+private final KafkaEventQueue eventQueue;
+
+// These variables should only be mutated from the event loop thread
+private Map inflight = null;
+private Map pending = new HashMap<>();
+private final ExponentialBackoff resendExponentialBackoff =
+new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02);
+private int failedAttempts = 0;
+
+public AssignmentsManager(Time time,
+  NodeToControllerChannelManager channelManager,
+  int brokerId,
+  Supplier brokerEpochSupplier) {
+this.time = time;
+this.channelManager = channelManager;
+this.brokerId = brokerId;
+this.brokerEpochSupplier = brokerEpochSupplier;
+this.eventQueue = new KafkaEventQueue(time,
+new LogContext("[AssignmentsManager id=" + brokerId + "]"),
+"broker-" + brokerId + "-directory-assignments-manager-");
+}
+
+public void close() throws InterruptedException {
+eventQueue.close();
+channelManager.shutdown();
+}
+
+public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) {
+eventQueue.append(new AssignmentEvent(time.nanoseconds(), 
topicPartition, dirId));
+}
+
+// only for testing
+void wakeup() {
+eventQueue.wakeup();
+}
+
+/**
+ * Base class for all the events handled by {@link AssignmentsManager}.
+ */
+private abstract static class Event implements EventQueue.Event {
+/**
+ * Override the default behavior in
+ * {@link EventQueue.Event#handleException}
+ * which swallows the exception.
+ */
+@Override
+public void handleException(Throwable e) {
+throw new RuntimeException(e);
+}
+}
+
+/**
+ * Hand

Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]

2023-11-10 Thread via GitHub


mdedetrich commented on code in PR #12728:
URL: https://github.com/apache/kafka/pull/12728#discussion_r1389230779


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -722,89 +646,68 @@ public void testRestartConnectorAndTasksBoth() throws 
Exception {
 FutureCallback restartCallback = new 
FutureCallback<>();
 herder.restartConnectorAndTasks(restartRequest, restartCallback);
 assertEquals(connectorStateInfo, restartCallback.get(1000L, 
TimeUnit.MILLISECONDS));
-PowerMock.verifyAll();
+
+verifyConnectorStatusRestart();
+verify(statusBackingStore).put(taskStatus.capture());
 }
 
 @Test
 public void testCreateAndStop() throws Exception {
-connector = PowerMock.createMock(BogusSourceConnector.class);
+connector = mock(BogusSourceConnector.class);
 expectAdd(SourceSink.SOURCE);
 
 Map connectorConfig = 
connectorConfig(SourceSink.SOURCE);
-Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+Connector connectorMock = mock(SourceConnector.class);
 expectConfigValidation(connectorMock, true, connectorConfig);
 
-// herder.stop() should stop any running connectors and tasks even if 
destroyConnector was not invoked
 expectStop();
 
-statusBackingStore.put(new TaskStatus(new 
ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 
0));
-EasyMock.expectLastCall();
-
-statusBackingStore.stop();
-EasyMock.expectLastCall();
-worker.stop();
-EasyMock.expectLastCall();
-
-PowerMock.replayAll();
-
 herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, 
createCallback);
 Herder.Created connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
 assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
 
+// herder.stop() should stop any running connectors and tasks even if 
destroyConnector was not invoked
 herder.stop();
 assertTrue(noneConnectorClientConfigOverridePolicy.isClosed());
-
-PowerMock.verifyAll();
+verify(worker).stop();
+verify(statusBackingStore).stop();
+verify(statusBackingStore).put(new TaskStatus(new 
ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 
0));
 }
 
 @Test
 public void testAccessors() throws Exception {
 Map connConfig = connectorConfig(SourceSink.SOURCE);
 System.out.println(connConfig);
 
-Callback> listConnectorsCb = 
PowerMock.createMock(Callback.class);
-Callback connectorInfoCb = 
PowerMock.createMock(Callback.class);
-Callback> connectorConfigCb = 
PowerMock.createMock(Callback.class);
-Callback> taskConfigsCb = 
PowerMock.createMock(Callback.class);
-Callback>> tasksConfigCb = 
PowerMock.createMock(Callback.class);
+Callback> listConnectorsCb = mock(Callback.class);
+Callback connectorInfoCb = mock(Callback.class);
+Callback> connectorConfigCb = mock(Callback.class);
+Callback> taskConfigsCb = mock(Callback.class);
+Callback>> tasksConfigCb = 
mock(Callback.class);
 
 // Check accessors with empty worker
-listConnectorsCb.onCompletion(null, Collections.EMPTY_SET);
-EasyMock.expectLastCall();
-connectorInfoCb.onCompletion(EasyMock.anyObject(), 
EasyMock.isNull());
-EasyMock.expectLastCall();
-
connectorConfigCb.onCompletion(EasyMock.anyObject(), 
EasyMock.isNull());
-EasyMock.expectLastCall();

Review Comment:
   Committed and pushed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15346: add support for 'single key single timestamp' IQs with versioned state stores (KIP-960) [kafka]

2023-11-10 Thread via GitHub


aliehsaeedii commented on PR #14596:
URL: https://github.com/apache/kafka/pull/14596#issuecomment-1805669950

   @mjsax 
   Cool. Thanks for unblocking me.
   Maybe now ready to merge?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15707: KRaft support in TopicBasedRemoteLogMetadataManagerHarness [kafka]

2023-11-10 Thread via GitHub


linzihao1999 opened a new pull request, #14733:
URL: https://github.com/apache/kafka/pull/14733

   KRaft support in TopicBasedRemoteLogMetadataManagerHarness
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15745) KRaft support in RequestQuotaTest

2023-11-10 Thread Zihao Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zihao Lin resolved KAFKA-15745.
---
Resolution: Duplicate

> KRaft support in RequestQuotaTest
> -
>
> Key: KAFKA-15745
> URL: https://issues.apache.org/jira/browse/KAFKA-15745
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Sameer Tejani
>Priority: Minor
>  Labels: kraft, kraft-test, newbie
>
> The following tests in RequestQuotaTest in 
> core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala need to be 
> updated to support KRaft
> 132 : def testResponseThrottleTime(): Unit = {
> 140 : def testResponseThrottleTimeWhenBothProduceAndRequestQuotasViolated(): 
> Unit = {
> 146 : def testResponseThrottleTimeWhenBothFetchAndRequestQuotasViolated(): 
> Unit = {
> 152 : def testUnthrottledClient(): Unit = {
> 161 : def testExemptRequestTime(): Unit = {
> 171 : def testUnauthorizedThrottle(): Unit = {
> Scanned 801 lines. Found 0 KRaft tests out of 6 tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15804: Close SocketServer channels when calling shutdown before enableRequestProcessing [kafka]

2023-11-10 Thread via GitHub


ex172000 commented on code in PR #14729:
URL: https://github.com/apache/kafka/pull/14729#discussion_r1389423646


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -679,6 +679,9 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
   def close(): Unit = {
 beginShutdown()
 thread.join()
+if (!started) {

Review Comment:
   Do we want to put the `started` as a parameter to the method to make it look 
cleaner?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15805) Fetch Remote Indexes at once

2023-11-10 Thread Jorge Esteban Quilcate Otoya (Jira)
Jorge Esteban Quilcate Otoya created KAFKA-15805:


 Summary: Fetch Remote Indexes at once
 Key: KAFKA-15805
 URL: https://issues.apache.org/jira/browse/KAFKA-15805
 Project: Kafka
  Issue Type: Improvement
  Components: Tiered-Storage
Reporter: Jorge Esteban Quilcate Otoya
Assignee: Jorge Esteban Quilcate Otoya


Reduce Tiered Storage latency when fetching indexes by allowing to fetch many 
indexes at once.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15806) Signal next segment when remote fetching

2023-11-10 Thread Jorge Esteban Quilcate Otoya (Jira)
Jorge Esteban Quilcate Otoya created KAFKA-15806:


 Summary: Signal next segment when remote fetching 
 Key: KAFKA-15806
 URL: https://issues.apache.org/jira/browse/KAFKA-15806
 Project: Kafka
  Issue Type: Improvement
  Components: Tiered-Storage
Reporter: Jorge Esteban Quilcate Otoya
Assignee: Jorge Esteban Quilcate Otoya


Improve remote fetching performance when fetching across segment by signaling 
the next segment and allow Remote Storage Manager implementations to optimize 
their pre-fetching.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14133: Migrate ProcessorStateManagerTest and StreamThreadTest to Mockito [kafka]

2023-11-10 Thread via GitHub


clolov commented on PR #13932:
URL: https://github.com/apache/kafka/pull/13932#issuecomment-1805861321

   This has since been rebased!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests

2023-11-10 Thread Christo Lolov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christo Lolov updated KAFKA-14133:
--
Description: 
{color:#de350b}There are tests which use both PowerMock and EasyMock. I have 
put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely 
solely on EasyMock.{color}

Unless stated in brackets the tests are in the streams module.

A list of tests which still require to be moved from EasyMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}In Review{color}
{color:#00875a}Merged{color}
 # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: 
[~yash.mayya] )
 # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo)
 # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo)
 # {color:#00875a}KStreamPrintTest{color} (owner: Christo)
 # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo)
 # {color:#00875a}MaterializedInternalTest{color} (owner: Christo)
 # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo)
 # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo)
 # {color:#00875a}ClientUtilsTest{color} (owner: Christo)
 # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: 
Christo)
 # {color:#00875a}TopologyTest{color} (owner: Christo)
 # {color:#00875a}KTableSuppressProcessorTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: 
Christo)
 # {color:#00875a}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}MeteredTimestampedWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}StreamsRebalanceListenerTest{color} (owner: Christo)
 # {color:#00875a}TimestampedKeyValueStoreMaterializerTest{color} (owner: 
Christo)
 # {color:#00875a}CachingInMemoryKeyValueStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingInMemorySessionStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingPersistentSessionStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingPersistentWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} (owner: 
Christo)
 # {color:#00875a}CompositeReadOnlyWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}KeyValueStoreBuilderTest{color} (owner: Christo)
 # {color:#00875a}RocksDBStoreTest{color} (owner: Christo)
 # {color:#00875a}StreamThreadStateStoreProviderTest{color} (owner: Christo)
 # {color:#ff8b00}TaskManagerTest{color} (owner: Christo) 
[https://github.com/apache/kafka/pull/13874, 
https://github.com/apache/kafka/pull/13897, 
https://github.com/apache/kafka/pull/13873|https://github.com/apache/kafka/pull/13874]
 # {color:#00875a}InternalTopicManagerTest{color} (owner: Christo)
 # {color:#00875a}ProcessorContextImplTest{color} (owner: Christo)
 # {color:#00875a}WriteConsistencyVectorTest{color} (owner: Christo)
 # {color:#00875a}StreamsAssignmentScaleTest{color} (owner: Christo)
 # {color:#00875a}StreamsPartitionAssignorTest{color} (owner: Christo)
 # {color:#00875a}AssignmentTestUtils{color} (owner: Christo)
 # {color:#ff8b00}ProcessorStateManagerTest{color} (owner: Matthew) (takeover: 
Christo) [https://github.com/apache/kafka/pull/13932] 
 # {color:#00875a}StandbyTaskTest{color} (owner: Matthew) (takeover: Christo)
 # {color:#ff8b00}StoreChangelogReaderTest{color} (owner: Matthew) 
[https://github.com/apache/kafka/pull/12524]
 # {color:#ff8b00}StreamTaskTest{color} (owner: Matthew) (takeover: Christo) 
[https://github.com/apache/kafka/pull/14716] 
 # {color:#ff8b00}StreamThreadTest{color} (owner: Matthew) (takeover: Christo) 
[https://github.com/apache/kafka/pull/13932] 
 # {color:#00875a}StreamsMetricsImplTest{color} (owner: Dalibor) (takeover: 
Christo)
 # {color:#00875a}TimeOrderedCachingPersistentWindowStoreTest{color} (owner: 
[~shekharrajak])
 # {color:#00875a}TimeOrderedWindowStoreTest{color} (owner: [~shekharrajak])
 # {color:#00875a}AbstractStreamTest{color} (owner: Christo)
 # {color:#00875a}KStreamTransformValuesTest{color} (owner: Christo)
 # {color:#00875a}KTableImplTest{color} (owner: Christo)
 # {color:#00875a}KTableTransformValuesTest{color} (owner: Christo)
 # {color:#00875a}SessionCacheFlushListenerTest{color} (owner: Christo)
 # {color:#00875a}TimestampedCacheFlushListenerTest{color} (owner: Christo)
 # {color:#00875a}TimestampedTupleForwarderTest{color} (owner: Christo)
 # {color:#00875a}ActiveTask

[jira] [Updated] (KAFKA-15806) Signal next segment when remote fetching

2023-11-10 Thread Jorge Esteban Quilcate Otoya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jorge Esteban Quilcate Otoya updated KAFKA-15806:
-
Description: 
Improve remote fetching performance when fetching across segment by signaling 
the next segment and allow Remote Storage Manager implementations to optimize 
their pre-fetching.

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-1003%3A+Signal+next+segment+when+remote+fetching]

  was:Improve remote fetching performance when fetching across segment by 
signaling the next segment and allow Remote Storage Manager implementations to 
optimize their pre-fetching.


> Signal next segment when remote fetching 
> -
>
> Key: KAFKA-15806
> URL: https://issues.apache.org/jira/browse/KAFKA-15806
> Project: Kafka
>  Issue Type: Improvement
>  Components: Tiered-Storage
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>  Labels: kip
>
> Improve remote fetching performance when fetching across segment by signaling 
> the next segment and allow Remote Storage Manager implementations to optimize 
> their pre-fetching.
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1003%3A+Signal+next+segment+when+remote+fetching]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15805) Fetch Remote Indexes at once

2023-11-10 Thread Jorge Esteban Quilcate Otoya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jorge Esteban Quilcate Otoya updated KAFKA-15805:
-
Description: 
Reduce Tiered Storage latency when fetching indexes by allowing to fetch many 
indexes at once.

KIP: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-1002%3A+Fetch+remote+segment+indexes+at+once]

  was:
Reduce Tiered Storage latency when fetching indexes by allowing to fetch many 
indexes at once.

 


> Fetch Remote Indexes at once
> 
>
> Key: KAFKA-15805
> URL: https://issues.apache.org/jira/browse/KAFKA-15805
> Project: Kafka
>  Issue Type: Improvement
>  Components: Tiered-Storage
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>  Labels: kip
>
> Reduce Tiered Storage latency when fetching indexes by allowing to fetch many 
> indexes at once.
> KIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1002%3A+Fetch+remote+segment+indexes+at+once]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15426) Process and persist directory assignments

2023-11-10 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15426?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez reassigned KAFKA-15426:
---

Assignee: Igor Soarez

> Process and persist directory assignments
> -
>
> Key: KAFKA-15426
> URL: https://issues.apache.org/jira/browse/KAFKA-15426
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Igor Soarez
>Assignee: Igor Soarez
>Priority: Major
>
> * Handle AssignReplicasToDirsRequest
>  * Persist metadata changes



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15361) Process and persist dir info with broker registration

2023-11-10 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez reassigned KAFKA-15361:
---

Assignee: Igor Soarez

> Process and persist dir info with broker registration
> -
>
> Key: KAFKA-15361
> URL: https://issues.apache.org/jira/browse/KAFKA-15361
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Igor Soarez
>Assignee: Igor Soarez
>Priority: Major
>
> Controllers should process and persist directory information from the broker 
> registration request



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15364) Handle log directory failure in the Controller

2023-11-10 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez reassigned KAFKA-15364:
---

Assignee: Igor Soarez

> Handle log directory failure in the Controller
> --
>
> Key: KAFKA-15364
> URL: https://issues.apache.org/jira/browse/KAFKA-15364
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Igor Soarez
>Assignee: Igor Soarez
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-10 Thread via GitHub


soarez commented on PR #14369:
URL: https://github.com/apache/kafka/pull/14369#issuecomment-1805892764

   Is there a way to trigger the tests again or do I need to push additional 
changes? All the test pipelines show a Jenkins error: "No space left on device".
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-10 Thread via GitHub


kirktrue closed pull request #14670: KAFKA-15277: Design & implement support 
for internal Consumer delegates
URL: https://github.com/apache/kafka/pull/14670


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-10 Thread via GitHub


kirktrue commented on PR #14670:
URL: https://github.com/apache/kafka/pull/14670#issuecomment-1805940138

   Re-triggered a build due to the abundance of seemingly unrelated test 
failures.
   
   For example, the first line of this test in `ClusterConnectionStatesTest` is 
failing across all builds because `localhost` is resolving to more than one IP 
address:
   
   ```java
   @Test
   public void testSingleIP() throws UnknownHostException {
   assertEquals(1, ClientUtils.resolve("localhost", 
singleIPHostResolver).size());
   
   connectionStates.connecting(nodeId1, time.milliseconds(), "localhost");
   InetAddress currAddress = connectionStates.currentAddress(nodeId1);
   connectionStates.connecting(nodeId1, time.milliseconds(), "localhost");
   assertSame(currAddress, connectionStates.currentAddress(nodeId1));
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Various cleanups in metadata [kafka]

2023-11-10 Thread via GitHub


mimaison opened a new pull request, #14734:
URL: https://github.com/apache/kafka/pull/14734

   - Remove unused code, suppression
   - Simplify/fix test assertions
   - Javadoc cleanups
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KIP-988 [kafka]

2023-11-10 Thread via GitHub


eduwercamacaro opened a new pull request, #14735:
URL: https://github.com/apache/kafka/pull/14735

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KRaft support in DescribeUserScramCredentialsRequestNotAuthorizedTest [kafka]

2023-11-10 Thread via GitHub


linzihao1999 opened a new pull request, #14736:
URL: https://github.com/apache/kafka/pull/14736

   KRaft support in DescribeUserScramCredentialsRequestNotAuthorizedTest
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15800: Prevent DataExceptions from corrupting KafkaOffsetBackingStore [kafka]

2023-11-10 Thread via GitHub


gharris1727 commented on PR #14718:
URL: https://github.com/apache/kafka/pull/14718#issuecomment-1806059864

   Test failures appear unrelated, and the connect and mirror tests pass 
locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15800: Prevent DataExceptions from corrupting KafkaOffsetBackingStore [kafka]

2023-11-10 Thread via GitHub


gharris1727 merged PR #14718:
URL: https://github.com/apache/kafka/pull/14718


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15804: Close SocketServer channels when calling shutdown before enableRequestProcessing [kafka]

2023-11-10 Thread via GitHub


gharris1727 commented on code in PR #14729:
URL: https://github.com/apache/kafka/pull/14729#discussion_r1389645226


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -679,6 +679,9 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
   def close(): Unit = {
 beginShutdown()
 thread.join()
+if (!started) {

Review Comment:
   Hey @ex172000 thanks for the review! Do you mean as a parameter to closeAll?
   
   I don't think that's necessary as started is an instance variable. Also, I 
didn't add the condition to finally block because it should always be true at 
that point in the code, so it's only present on this code path where the thread 
has not been started yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-10 Thread via GitHub


kirktrue closed pull request #14670: KAFKA-15277: Design & implement support 
for internal Consumer delegates
URL: https://github.com/apache/kafka/pull/14670


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-10 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1389684994


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import java.util.Collections;
+import java.util.regex.Pattern;
+import kafka.metrics.ClientMetricsConfigs;
+import kafka.metrics.ClientMetricsInstance;
+import kafka.metrics.ClientMetricsInstanceMetadata;
+import kafka.metrics.ClientMetricsReceiverPlugin;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.TelemetryTooLargeException;
+import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
+import org.apache.kafka.common.errors.UnknownSubscriptionIdException;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.requests.RequestContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.zip.CRC32;
+
+/**
+ * Handles client telemetry metrics requests/responses, subscriptions and 
instance information.
+ */
+public class ClientMetricsManager implements Closeable {
+
+private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
+private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+
+public static ClientMetricsManager instance() {
+return INSTANCE;
+}
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;

Review Comment:
   The should it be in low 100K i.e. 2^17 = 131072?



##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import java.util.Collections;
+import java.util.regex.Pattern;
+import kafka.metrics.ClientMetricsConfigs;
+import kafka.metrics.ClientMetricsInstance;
+import kafka.metrics.ClientMetricsInstanceMetadata;
+import kafka.metrics.ClientMetricsReceiverPlugin;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.errors.ApiException;
+import org.apac

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-10 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1389709078


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable {
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+
+// The last subscription updated time is used to determine if the next 
telemetry request needs
+// to re-evaluate the subscription id as per changes subscriptions.
+private long lastSubscriptionUpdateEpoch;
+
+// Visible for testing
+ClientMetricsManager() {
+subscriptionMap = new ConcurrentHashMap<>();
+clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+updateLastSubscriptionUpdateEpoch();
+}
+return;
+}
+
+ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+updateClientSubscription(subscriptionName, configs);
+/*
+ Update last subscription updated time to current time to indicate 
that there is a change
+ in the subscription. This will be used to determine if the next 
telemetry request needs
+ to re-evaluate the subscription id as per changes subscriptions.
+*/
+updateLastSubscriptionUpdateEpoch();
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = System.currentTimeMillis();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issued get telemetry
+ request prior to push interval, then the client should get a throttle 
error but if the
+ subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+} finally {
+clientInstance.lastGetRequestEpoch(now);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {
+String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+clientInstanceId);
+return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+}
+
+long now = System.currentTimeMillis();
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the push request parameters for the client instance.
+validatePushRequest(request, telemetryMaxBytes, clientInstance);
+} catch (ApiException exception) {
+clientInstance.lastKnownError(Errors.forException(exception));
+

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-10 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1389719566


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable {
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+
+// The last subscription updated time is used to determine if the next 
telemetry request needs
+// to re-evaluate the subscription id as per changes subscriptions.
+private long lastSubscriptionUpdateEpoch;
+
+// Visible for testing
+ClientMetricsManager() {
+subscriptionMap = new ConcurrentHashMap<>();
+clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+updateLastSubscriptionUpdateEpoch();
+}
+return;
+}
+
+ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+updateClientSubscription(subscriptionName, configs);
+/*
+ Update last subscription updated time to current time to indicate 
that there is a change
+ in the subscription. This will be used to determine if the next 
telemetry request needs
+ to re-evaluate the subscription id as per changes subscriptions.
+*/
+updateLastSubscriptionUpdateEpoch();
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = System.currentTimeMillis();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issued get telemetry
+ request prior to push interval, then the client should get a throttle 
error but if the
+ subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+} finally {
+clientInstance.lastGetRequestEpoch(now);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {
+String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+clientInstanceId);
+return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+}
+
+long now = System.currentTimeMillis();
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the push request parameters for the client instance.
+validatePushRequest(request, telemetryMaxBytes, clientInstance);
+} catch (ApiException exception) {
+clientInstance.lastKnownError(Errors.forException(exception));
+

Re: [PR] KAFKA-15681: Add support of client-metrics in kafka-configs.sh (KIP-714) [kafka]

2023-11-10 Thread via GitHub


apoorvmittal10 closed pull request #14632: KAFKA-15681: Add support of 
client-metrics in kafka-configs.sh (KIP-714)
URL: https://github.com/apache/kafka/pull/14632


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15807) Add support for compress/decompress metrics

2023-11-10 Thread Apoorv Mittal (Jira)
Apoorv Mittal created KAFKA-15807:
-

 Summary: Add support for compress/decompress metrics
 Key: KAFKA-15807
 URL: https://issues.apache.org/jira/browse/KAFKA-15807
 Project: Kafka
  Issue Type: Sub-task
Reporter: Apoorv Mittal
Assignee: Apoorv Mittal






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15807) Add support for compression/decompression of metrics

2023-11-10 Thread Apoorv Mittal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apoorv Mittal updated KAFKA-15807:
--
Summary: Add support for compression/decompression of metrics  (was: Add 
support for compress/decompress metrics)

> Add support for compression/decompression of metrics
> 
>
> Key: KAFKA-15807
> URL: https://issues.apache.org/jira/browse/KAFKA-15807
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-10 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1389684994


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import java.util.Collections;
+import java.util.regex.Pattern;
+import kafka.metrics.ClientMetricsConfigs;
+import kafka.metrics.ClientMetricsInstance;
+import kafka.metrics.ClientMetricsInstanceMetadata;
+import kafka.metrics.ClientMetricsReceiverPlugin;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.TelemetryTooLargeException;
+import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
+import org.apache.kafka.common.errors.UnknownSubscriptionIdException;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.requests.RequestContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.zip.CRC32;
+
+/**
+ * Handles client telemetry metrics requests/responses, subscriptions and 
instance information.
+ */
+public class ClientMetricsManager implements Closeable {
+
+private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
+private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+
+public static ClientMetricsManager instance() {
+return INSTANCE;
+}
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;

Review Comment:
   Then should it be in low 100K i.e. 2^17 = 131072? I am not sure what the 
right config should be here, I know the cloud providers like MSK typically 
provides 3000 active connections per broker where this limit is higher in case 
of Confluent but what typically are the active number of client connecting to 
broker should be. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-10 Thread via GitHub


kirktrue closed pull request #14670: KAFKA-15277: Design & implement support 
for internal Consumer delegates
URL: https://github.com/apache/kafka/pull/14670


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15341) Enabling TS for a topic during rolling restart causes problems

2023-11-10 Thread Phuc Hong Tran (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Phuc Hong Tran reassigned KAFKA-15341:
--

Assignee: Phuc Hong Tran

> Enabling TS for a topic during rolling restart causes problems
> --
>
> Key: KAFKA-15341
> URL: https://issues.apache.org/jira/browse/KAFKA-15341
> Project: Kafka
>  Issue Type: Bug
>Reporter: Divij Vaidya
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: KIP-405
> Fix For: 3.7.0
>
>
> When we are in a rolling restart to enable TS at system level, some brokers 
> have TS enabled on them and some don't. We send an alter config call to 
> enable TS for a topic, it hits a broker which has TS enabled, this broker 
> forwards it to the controller and controller will send the config update to 
> all brokers. When another broker which doesn't have TS enabled (because it 
> hasn't undergone the restart yet) gets this config change, it "should" fail 
> to apply it. But failing now is too late since alterConfig has already 
> succeeded since controller->broker config propagation is done async.
> With this JIRA, we want to have controller check if TS is enabled on all 
> brokers before applying alter config to turn on TS for a topic.
> Context: https://github.com/apache/kafka/pull/14176#discussion_r1291265129



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-15362: Resolve offline replicas in metadata cache [kafka]

2023-11-10 Thread via GitHub


soarez opened a new pull request, #14737:
URL: https://github.com/apache/kafka/pull/14737

   The metadata cache now considers registered log directories and directory 
assignments when determining offline replicas.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15362: Resolve offline replicas in metadata cache [kafka]

2023-11-10 Thread via GitHub


soarez commented on PR #14737:
URL: https://github.com/apache/kafka/pull/14737#issuecomment-1806580244

   @pprovenzano @cmccabe please have a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15774: refactor windowed stores to use StoreFactory [kafka]

2023-11-10 Thread via GitHub


agavra commented on PR #14708:
URL: https://github.com/apache/kafka/pull/14708#issuecomment-1806629224

   @ableegoldman is this good to merge? the failing tests seem unrelated and 
I've seen them be quite flaky 
   
![image](https://github.com/apache/kafka/assets/3172405/d02560d5-1ffe-443e-8eba-0826e50aefb2)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15774: refactor windowed stores to use StoreFactory [kafka]

2023-11-10 Thread via GitHub


ableegoldman merged PR #14708:
URL: https://github.com/apache/kafka/pull/14708


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]

2023-11-10 Thread via GitHub


C0urante commented on code in PR #12728:
URL: https://github.com/apache/kafka/pull/12728#discussion_r1390087872


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -817,19 +726,14 @@ public void testAccessors() throws Exception {
 Herder.Created connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
 assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
 
-EasyMock.reset(transformer);
-EasyMock.expect(transformer.transform(eq(CONNECTOR_NAME), 
EasyMock.anyObject()))
-.andThrow(new AssertionError("Config transformation should not 
occur when requesting connector or task info"))
-.anyTimes();
-EasyMock.replay(transformer);
-
+reset(transformer);
 herder.connectors(listConnectorsCb);
 herder.connectorInfo(CONNECTOR_NAME, connectorInfoCb);
 herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb);
 herder.taskConfigs(CONNECTOR_NAME, taskConfigsCb);
 herder.tasksConfig(CONNECTOR_NAME, tasksConfigCb);
-
-PowerMock.verifyAll();
+// Config transformation should not occur when requesting connector or 
task info
+verify(transformer, never()).transform(eq(CONNECTOR_NAME), any());

Review Comment:
   This is much cleaner than the existing strategy (where we cause invocations 
of `transformer::transform` to throw an exception). Nice job, thanks!



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -908,75 +799,58 @@ public void testCorruptConfig() throws Throwable {
 config.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
 config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
BogusSinkConnector.class.getName());
 config.put(SinkConnectorConfig.TOPICS_CONFIG, TOPICS_LIST_STR);
-Connector connectorMock = PowerMock.createMock(SinkConnector.class);
+Connector connectorMock = mock(SinkConnector.class);
 String error = "This is an error in your config!";
 List errors = new ArrayList<>(singletonList(error));
 String key = "foo.invalid.key";
-EasyMock.expect(connectorMock.validate(config)).andReturn(
+when(connectorMock.validate(config)).thenReturn(
 new Config(
 Arrays.asList(new ConfigValue(key, null, 
Collections.emptyList(), errors))
 )
 );
 ConfigDef configDef = new ConfigDef();
 configDef.define(key, ConfigDef.Type.STRING, 
ConfigDef.Importance.HIGH, "");
-
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
-final Capture> configCapture = 
EasyMock.newCapture();
-
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
-EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
-
EasyMock.expect(plugins.connectorLoader(EasyMock.anyString())).andReturn(pluginLoader);
-
EasyMock.expect(plugins.withClassLoader(pluginLoader)).andReturn(loaderSwap);
-EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
-
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
-EasyMock.expect(connectorMock.config()).andStubReturn(configDef);
-loaderSwap.close();

Review Comment:
   Don't we still want to verify that this takes place?



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -887,19 +783,14 @@ public void testPutConnectorConfig() throws Exception {
 
 assertEquals("bar", capturedConfig.getValue().get("foo"));
 herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb);

Review Comment:
   Can we also make sure that there aren't any errors reported with the 
`connectorConfigCb`?
   
   The easy way to do this would be to just add this line to the end of the 
test:
   ```java
   verifyNoMoreInteractions(connectorConfigCb);
   ```
   
   It'd probably be better to do away with mocked callbacks and switch to using 
a `FutureCallback` that we invoke `get` on (like we do with many other tests), 
but it's up to you if you'd like to implement that or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]

2023-11-10 Thread via GitHub


C0urante commented on code in PR #12728:
URL: https://github.com/apache/kafka/pull/12728#discussion_r1390091434


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -908,75 +799,58 @@ public void testCorruptConfig() throws Throwable {
 config.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
 config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
BogusSinkConnector.class.getName());
 config.put(SinkConnectorConfig.TOPICS_CONFIG, TOPICS_LIST_STR);
-Connector connectorMock = PowerMock.createMock(SinkConnector.class);
+Connector connectorMock = mock(SinkConnector.class);
 String error = "This is an error in your config!";
 List errors = new ArrayList<>(singletonList(error));
 String key = "foo.invalid.key";
-EasyMock.expect(connectorMock.validate(config)).andReturn(
+when(connectorMock.validate(config)).thenReturn(
 new Config(
 Arrays.asList(new ConfigValue(key, null, 
Collections.emptyList(), errors))
 )
 );
 ConfigDef configDef = new ConfigDef();
 configDef.define(key, ConfigDef.Type.STRING, 
ConfigDef.Importance.HIGH, "");
-
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
-final Capture> configCapture = 
EasyMock.newCapture();
-
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
-EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
-
EasyMock.expect(plugins.connectorLoader(EasyMock.anyString())).andReturn(pluginLoader);
-
EasyMock.expect(plugins.withClassLoader(pluginLoader)).andReturn(loaderSwap);
-EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
-
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
-EasyMock.expect(connectorMock.config()).andStubReturn(configDef);
-loaderSwap.close();

Review Comment:
   Don't we still want to verify that the call to `loaderSwap::close` takes 
place?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] This adds a default implementation to the KafkaProducer interface to … [kafka]

2023-11-10 Thread via GitHub


github-actions[bot] commented on PR #13981:
URL: https://github.com/apache/kafka/pull/13981#issuecomment-1806659081

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15341) Enabling TS for a topic during rolling restart causes problems

2023-11-10 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785136#comment-17785136
 ] 

Phuc Hong Tran commented on KAFKA-15341:


[~divijvaidya] controller doesn't know if individual broker has TS enable or 
not. It's best to have controller know the data about TS enable of brokers 
other than brokers rejecting TS per topic enablement call as it would be 
inefficient since we has to reach out to all those brokers only to get rejected.

> Enabling TS for a topic during rolling restart causes problems
> --
>
> Key: KAFKA-15341
> URL: https://issues.apache.org/jira/browse/KAFKA-15341
> Project: Kafka
>  Issue Type: Bug
>Reporter: Divij Vaidya
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: KIP-405
> Fix For: 3.7.0
>
>
> When we are in a rolling restart to enable TS at system level, some brokers 
> have TS enabled on them and some don't. We send an alter config call to 
> enable TS for a topic, it hits a broker which has TS enabled, this broker 
> forwards it to the controller and controller will send the config update to 
> all brokers. When another broker which doesn't have TS enabled (because it 
> hasn't undergone the restart yet) gets this config change, it "should" fail 
> to apply it. But failing now is too late since alterConfig has already 
> succeeded since controller->broker config propagation is done async.
> With this JIRA, we want to have controller check if TS is enabled on all 
> brokers before applying alter config to turn on TS for a topic.
> Context: https://github.com/apache/kafka/pull/14176#discussion_r1291265129



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15575) Prevent Connectors from exceeding tasks.max configuration

2023-11-10 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-15575:
--
Description: 
The Connector::taskConfigs(int maxTasks) function is used by Connectors to 
enumerate tasks configurations. This takes an argument which comes from the 
tasks.max connector config. This is the Javadoc for that method:
{noformat}
/**
 * Returns a set of configurations for Tasks based on the current configuration,
 * producing at most {@code maxTasks} configurations.
 *
 * @param maxTasks maximum number of configurations to generate
 * @return configurations for Tasks
 */
public abstract List> taskConfigs(int maxTasks);
{noformat}
This includes the constraint that the number of tasks is at most maxTasks, but 
this constraint is not enforced by the framework.

 

To enforce this constraint, we could begin dropping configs that exceed the 
limit, and log a warning. For sink connectors this should harmlessly rebalance 
the consumer subscriptions onto the remaining tasks. For source connectors that 
distribute their work via task configs, this may result in an interruption in 
data transfer.

  was:
The Connector::taskConfigs(int maxTasks) function is used by Connectors to 
enumerate tasks configurations. This takes an argument which comes from the 
tasks.max connector config. This is the Javadoc for that method:
{noformat}
/**
 * Returns a set of configurations for Tasks based on the current configuration,
 * producing at most {@code maxTasks} configurations.
 *
 * @param maxTasks maximum number of configurations to generate
 * @return configurations for Tasks
 */
public abstract List> taskConfigs(int maxTasks);
{noformat}
This includes the constraint that the number of tasks is at most maxTasks, but 
this constraint is not enforced by the framework.

 

We should begin enforcing this constraint by dropping configs that exceed the 
limit, and logging a warning. For sink connectors this should harmlessly 
rebalance the consumer subscriptions onto the remaining tasks. For source 
connectors that distribute their work via task configs, this may result in an 
interruption in data transfer.


> Prevent Connectors from exceeding tasks.max configuration
> -
>
> Key: KAFKA-15575
> URL: https://issues.apache.org/jira/browse/KAFKA-15575
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Greg Harris
>Assignee: Chris Egerton
>Priority: Minor
>  Labels: kip
>
> The Connector::taskConfigs(int maxTasks) function is used by Connectors to 
> enumerate tasks configurations. This takes an argument which comes from the 
> tasks.max connector config. This is the Javadoc for that method:
> {noformat}
> /**
>  * Returns a set of configurations for Tasks based on the current 
> configuration,
>  * producing at most {@code maxTasks} configurations.
>  *
>  * @param maxTasks maximum number of configurations to generate
>  * @return configurations for Tasks
>  */
> public abstract List> taskConfigs(int maxTasks);
> {noformat}
> This includes the constraint that the number of tasks is at most maxTasks, 
> but this constraint is not enforced by the framework.
>  
> To enforce this constraint, we could begin dropping configs that exceed the 
> limit, and log a warning. For sink connectors this should harmlessly 
> rebalance the consumer subscriptions onto the remaining tasks. For source 
> connectors that distribute their work via task configs, this may result in an 
> interruption in data transfer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15575) Prevent Connectors from exceeding tasks.max configuration

2023-11-10 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785147#comment-17785147
 ] 

Chris Egerton commented on KAFKA-15575:
---

I've published 
[KIP-1004|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1004%3A+Enforce+tasks.max+property+in+Kafka+Connect]
 to try to address this gap.

> Prevent Connectors from exceeding tasks.max configuration
> -
>
> Key: KAFKA-15575
> URL: https://issues.apache.org/jira/browse/KAFKA-15575
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Greg Harris
>Assignee: Chris Egerton
>Priority: Minor
>  Labels: kip
>
> The Connector::taskConfigs(int maxTasks) function is used by Connectors to 
> enumerate tasks configurations. This takes an argument which comes from the 
> tasks.max connector config. This is the Javadoc for that method:
> {noformat}
> /**
>  * Returns a set of configurations for Tasks based on the current 
> configuration,
>  * producing at most {@code maxTasks} configurations.
>  *
>  * @param maxTasks maximum number of configurations to generate
>  * @return configurations for Tasks
>  */
> public abstract List> taskConfigs(int maxTasks);
> {noformat}
> This includes the constraint that the number of tasks is at most maxTasks, 
> but this constraint is not enforced by the framework.
>  
> To enforce this constraint, we could begin dropping configs that exceed the 
> limit, and log a warning. For sink connectors this should harmlessly 
> rebalance the consumer subscriptions onto the remaining tasks. For source 
> connectors that distribute their work via task configs, this may result in an 
> interruption in data transfer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15808) kstreams app consumer threads all fenced due to outdated static membership ids during group coordinator migration

2023-11-10 Thread Carrie Hazelton (Jira)
Carrie Hazelton created KAFKA-15808:
---

 Summary: kstreams app consumer threads all fenced due to outdated 
static membership ids during group coordinator migration
 Key: KAFKA-15808
 URL: https://issues.apache.org/jira/browse/KAFKA-15808
 Project: Kafka
  Issue Type: Bug
  Components: group-coordinator, streams
Affects Versions: 3.5.0
Reporter: Carrie Hazelton
 Attachments: broker-50003-kafka.log, broker-55002-kafka.log, 
kstreams-55002-kafka.log

I am wondering if my team encountered a bug with static membership in a KStream 
application (river.stream-filter in attached logs). There was an abrupt 
shutdown of all of it's consumer threads after they all became fenced on 
membership id conflicts. It occurred during a rolling deployment to the Kafka 
broker fleet which triggered a migration of the group coordinator to a new 
broker (from kafka-broker-55002 to kafka-broker-50003).

My understanding is that a new group coordinator shouldn't start interacting 
with consumer threads until it has finished loading membership metadata during 
the migration. In this case, the new group coordinator started interacting with 
the thread consumers before it finished loading the latest membership 
information in the most recent generation of the membership metadata. As the 
migration was incomplete, the new group coordinator had outdated consumers 
membership IDs, which caused all the streaming app consumer threads to get 
fenced due to conflicting membership ids.

Major timeline in the attached logs:
 * 07:27:04 - 07:32:25 UTC: Deployment to broker kafka-broker-55002.
 * 07:32:07 - 07:32:10 UTC: 36 unavailable broker messages seen for 
kafka-broker-55002 and 3 for kafka-broker-50003 (overlapping in time).
 * 07:32:21,505 UTC: Controller kafka-broker-51005 adds broker 
kafka-broker-55002 back as a live broker.
 * 07:32:23,276 UTC: kafka-broker-50003 last mention of loading membership data.
 * 07:32:34,175 - 07:32:34,278 UTC - kstream app threads (consumers in group) 
using latest member IDs interact with group coordinator kafka-broker-50003 
which had outdated member IDs. Consumers threads from kstream app 
(river.stream-filter) are fenced and ejected from the consumer group.

When kafka-broker-50003 was assigned group coordinator it was loading the 
latest (737) generation membership metadata, but appears to have never 
finished. Review of a previous successful migration shows multiple log lines 
for a new generation, but only one was seen in this deployment. Below are some 
log lines that I found from a kstream host kafka.log that highlights the member 
id difference in for generation 737:

First generation and last seen log line on while the app was talking to group 
coordinator kafka-broker-50003:
{quote}
[2023-08-04 07:32:16,564] INFO Loaded member 
MemberMetadata(memberId=kafka-streams-55007.somewhere.com-2-8a5b4e68-7065-4125-9ba0-615b538a0e79,
 groupInstanceId=Some(kafka-streams-55007.somewhere.com-2), 
clientId=river.stream-filter.v1-d3473182-6af5-410d-82ef-4b85ea1b9a66-StreamThread-2-consumer,
 clientHost=/XX.XX.XX.XX, sessionTimeoutMs=45000, rebalanceTimeoutMs=30, 
supportedProtocols=List(stream)) in group river.stream-filter.v1 with 
generation 686. (kafka.coordinator.group.GroupMetadata$) 

{{{}{{}}{}}}{{{}{{...}}{}}}

[2023-08-04 07:32:23,276] INFO Loaded member 
MemberMetadata(memberId=kafka-streams-55007.somewhere.com-2-8a5b4e68-7065-4125-9ba0-615b538a0e79,
 groupInstanceId=Some(kafka-streams-55007.somewhere.com-2), 
clientId=river.stream-filter.v1-d3473182-6af5-410d-82ef-4b85ea1b9a66-StreamThread-2-consumer,
 clientHost=/XX.XX.XX.XX, sessionTimeoutMs=45000, rebalanceTimeoutMs=30, 
supportedProtocols=List(stream)) in group river.stream-filter.v1 with 
generation 737. (kafka.coordinator.group.GroupMetadata$)
{quote}
The memberId was the same in both generations (ending with {{615b538a0e79)}}

The consumers got fenced after the controller sent a message around the time of 
the above log line.

Later kafka-broker-55002 gets the group coordinator assignment back. These log 
lines look similar to a previous successful migration, and the memberId changes 
(from ending with {{615b538a0e79}} to {{{}4ba4c7f17eea{}}})

 
{quote}{{[2023-08-04 07:32:44,388] INFO Loaded member 
MemberMetadata(memberId=kafka-streams-55007.somewhere.com-2-8a5b4e68-7065-4125-9ba0-615b538a0e79,
 groupInstanceId=Some(kafka-streams-55007.somewhere.com-2), 
clientId=river.stream-filter.v1-d3473182-6af5-410d-82ef-4b85ea1b9a66-StreamThread-2-consumer,
 clientHost=/XX.XX.XX.XX, sessionTimeoutMs=45000, rebalanceTimeoutMs=30, 
supportedProtocols=List(stream)) in group river.stream-filter.v1 with 
generation 686. (kafka.coordinator.group.GroupMetadata$)}}

{{...}}

{{[2023-08-04 07:33:08,824] INFO Loaded member 
MemberMetadata(memberId=kafka-streams-55007.somewhere.com-2-e1dd9fd3-5887-4cd

Re: [PR] KAFKA-15740: KRaft support in DeleteOffsetsConsumerGroupCommandIntegrationTest [kafka]

2023-11-10 Thread via GitHub


linzihao1999 commented on PR #14669:
URL: https://github.com/apache/kafka/pull/14669#issuecomment-1806736102

   Hi @cmccabe @jsancio 
   
   The CI-builds finished and I found no failure test report about 
DeleteOffsetsConsumerGroupCommandIntegrationTest.
   Is there more I need to do before code review. PTAL
   
   
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14669/2//testReport/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-12388: Share broker channel between alterIsrManager and lifecycleManager [kafka]

2023-11-10 Thread via GitHub


dengziming closed pull request #10255: KAFKA-12388: Share broker channel 
between alterIsrManager and lifecycleManager
URL: https://github.com/apache/kafka/pull/10255


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-6084: Propagate JSON parsing errors in ReassignPartitionsCommand [kafka]

2023-11-10 Thread via GitHub


dengziming closed pull request #10789: KAFKA-6084: Propagate JSON parsing 
errors in ReassignPartitionsCommand
URL: https://github.com/apache/kafka/pull/10789


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15741: KRaft support in DescribeConsumerGroupTest [kafka]

2023-11-10 Thread via GitHub


dengziming commented on PR #14668:
URL: https://github.com/apache/kafka/pull/14668#issuecomment-1806737490

   There seems some flaky related to this PR here: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14668/2/#showFailuresLink


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org