Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]
mdedetrich commented on code in PR #12728: URL: https://github.com/apache/kafka/pull/12728#discussion_r1389047935 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ## @@ -817,19 +731,11 @@ public void testAccessors() throws Exception { Herder.Created connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); -EasyMock.reset(transformer); -EasyMock.expect(transformer.transform(eq(CONNECTOR_NAME), EasyMock.anyObject())) Review Comment: Committed and pushed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]
mdedetrich commented on code in PR #12728: URL: https://github.com/apache/kafka/pull/12728#discussion_r1389049719 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ## @@ -838,39 +733,32 @@ public void testPutConnectorConfig() throws Exception { Map newConnConfig = new HashMap<>(connConfig); newConnConfig.put("foo", "bar"); -Callback> connectorConfigCb = PowerMock.createMock(Callback.class); -// Callback> putConnectorConfigCb = PowerMock.createMock(Callback.class); +Callback> connectorConfigCb = mock(Callback.class); // Create -connector = PowerMock.createMock(BogusSourceConnector.class); +connector = mock(BogusSourceConnector.class); expectAdd(SourceSink.SOURCE); -Connector connectorMock = PowerMock.createMock(SourceConnector.class); +Connector connectorMock = mock(SourceConnector.class); expectConfigValidation(connectorMock, true, connConfig); // Should get first config -connectorConfigCb.onCompletion(null, connConfig); -EasyMock.expectLastCall(); +doNothing().when(connectorConfigCb).onCompletion(null, connConfig); // Update config, which requires stopping and restarting -worker.stopAndAwaitConnector(CONNECTOR_NAME); -EasyMock.expectLastCall(); -Capture> capturedConfig = EasyMock.newCapture(); -Capture> onStart = EasyMock.newCapture(); -worker.startConnector(eq(CONNECTOR_NAME), EasyMock.capture(capturedConfig), EasyMock.anyObject(), -eq(herder), eq(TargetState.STARTED), EasyMock.capture(onStart)); -EasyMock.expectLastCall().andAnswer(() -> { +doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME); +final ArgumentCaptor> capturedConfig = ArgumentCaptor.forClass(Map.class); +final ArgumentCaptor> onStart = ArgumentCaptor.forClass(Callback.class); +doAnswer(invocation -> { onStart.getValue().onCompletion(null, TargetState.STARTED); return true; -}); +}).when(worker).startConnector(eq(CONNECTOR_NAME), capturedConfig.capture(), any(), +eq(herder), eq(TargetState.STARTED), onStart.capture()); // Generate same task config, which should result in no additional action to restart tasks -EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, new SourceConnectorConfig(plugins, newConnConfig, true))) -.andReturn(singletonList(taskConfig(SourceSink.SOURCE))); +when(worker.connectorTaskConfigs(CONNECTOR_NAME, new SourceConnectorConfig(plugins, newConnConfig, true))) +.thenReturn(singletonList(taskConfig(SourceSink.SOURCE))); expectConfigValidation(connectorMock, false, newConnConfig); -connectorConfigCb.onCompletion(null, newConnConfig); -EasyMock.expectLastCall(); -EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes(); - -PowerMock.replayAll(); +doNothing().when(connectorConfigCb).onCompletion(null, newConnConfig); Review Comment: Done and pushed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]
mdedetrich commented on code in PR #12728: URL: https://github.com/apache/kafka/pull/12728#discussion_r1389050516 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ## @@ -838,39 +733,32 @@ public void testPutConnectorConfig() throws Exception { Map newConnConfig = new HashMap<>(connConfig); newConnConfig.put("foo", "bar"); -Callback> connectorConfigCb = PowerMock.createMock(Callback.class); -// Callback> putConnectorConfigCb = PowerMock.createMock(Callback.class); +Callback> connectorConfigCb = mock(Callback.class); // Create -connector = PowerMock.createMock(BogusSourceConnector.class); +connector = mock(BogusSourceConnector.class); expectAdd(SourceSink.SOURCE); -Connector connectorMock = PowerMock.createMock(SourceConnector.class); +Connector connectorMock = mock(SourceConnector.class); expectConfigValidation(connectorMock, true, connConfig); // Should get first config -connectorConfigCb.onCompletion(null, connConfig); -EasyMock.expectLastCall(); +doNothing().when(connectorConfigCb).onCompletion(null, connConfig); // Update config, which requires stopping and restarting -worker.stopAndAwaitConnector(CONNECTOR_NAME); -EasyMock.expectLastCall(); -Capture> capturedConfig = EasyMock.newCapture(); -Capture> onStart = EasyMock.newCapture(); -worker.startConnector(eq(CONNECTOR_NAME), EasyMock.capture(capturedConfig), EasyMock.anyObject(), -eq(herder), eq(TargetState.STARTED), EasyMock.capture(onStart)); -EasyMock.expectLastCall().andAnswer(() -> { +doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME); +final ArgumentCaptor> capturedConfig = ArgumentCaptor.forClass(Map.class); +final ArgumentCaptor> onStart = ArgumentCaptor.forClass(Callback.class); +doAnswer(invocation -> { onStart.getValue().onCompletion(null, TargetState.STARTED); return true; -}); +}).when(worker).startConnector(eq(CONNECTOR_NAME), capturedConfig.capture(), any(), +eq(herder), eq(TargetState.STARTED), onStart.capture()); // Generate same task config, which should result in no additional action to restart tasks -EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, new SourceConnectorConfig(plugins, newConnConfig, true))) -.andReturn(singletonList(taskConfig(SourceSink.SOURCE))); +when(worker.connectorTaskConfigs(CONNECTOR_NAME, new SourceConnectorConfig(plugins, newConnConfig, true))) +.thenReturn(singletonList(taskConfig(SourceSink.SOURCE))); expectConfigValidation(connectorMock, false, newConnConfig); -connectorConfigCb.onCompletion(null, newConnConfig); -EasyMock.expectLastCall(); -EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes(); - -PowerMock.replayAll(); +doNothing().when(connectorConfigCb).onCompletion(null, newConnConfig); +when(worker.getPlugins()).thenReturn(plugins); Review Comment: Done and pushed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15552 Fix Producer ID ZK migration [kafka]
showuon commented on PR #14506: URL: https://github.com/apache/kafka/pull/14506#issuecomment-1805278003 @jolshan , I just created a [PR](https://github.com/apache/kafka/pull/14730) to revert the change in this PR, and the test results also failed `testSingleIP` test. So, let's keep monitoring it, and maybe we need to change the test, because resolving localhost into multiple IPs should be fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15711: KRaft support in LogRecoveryTest [kafka]
linzihao1999 commented on code in PR #14693: URL: https://github.com/apache/kafka/pull/14693#discussion_r1389074063 ## core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala: ## @@ -212,7 +228,7 @@ class LogRecoveryTest extends QuorumTestHarness { server2.startup() updateProducer() // check if leader moves to the other server -leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, oldLeaderOpt = Some(leader)) +leader = awaitLeaderChange(servers, topicPartition, leader) Review Comment: I think waitUntilLeaderIsElectedOrChangedWithAdmin can use to instead of it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15711: KRaft support in LogRecoveryTest [kafka]
linzihao1999 commented on code in PR #14693: URL: https://github.com/apache/kafka/pull/14693#discussion_r1389074063 ## core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala: ## @@ -212,7 +228,7 @@ class LogRecoveryTest extends QuorumTestHarness { server2.startup() updateProducer() // check if leader moves to the other server -leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, oldLeaderOpt = Some(leader)) +leader = awaitLeaderChange(servers, topicPartition, leader) Review Comment: I think waitUntilLeaderIsElectedOrChangedWithAdmin can use to instead of it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [Draft] KIP-994 [kafka]
RamanVerma opened a new pull request, #14731: URL: https://github.com/apache/kafka/pull/14731 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15360: Include dirs in BrokerRegistration [kafka]
dajac commented on PR #14392: URL: https://github.com/apache/kafka/pull/14392#issuecomment-1805313339 This PR has introduces a compilation error (JDK 8 and Scala 2.12): ``` > Task :core:compileScala [Error] /home/jenkins/workspace/Kafka_kafka-pr_PR-14392/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:305:49: value incl is not a member of scala.collection.immutable.Set[org.apache.kafka.common.Uuid] ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15711: KRaft support in LogRecoveryTest [kafka]
linzihao1999 commented on code in PR #14693: URL: https://github.com/apache/kafka/pull/14693#discussion_r1389074590 ## core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala: ## @@ -212,7 +228,7 @@ class LogRecoveryTest extends QuorumTestHarness { server2.startup() updateProducer() // check if leader moves to the other server -leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, oldLeaderOpt = Some(leader)) +leader = awaitLeaderChange(servers, topicPartition, leader) Review Comment: I think waitUntilLeaderIsElectedOrChangedWithAdmin can use to instead of it. ## core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala: ## @@ -78,15 +84,20 @@ class LogRecoveryTest extends QuorumTestHarness { override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) -configs = TestUtils.createBrokerConfigs(2, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps)) +configs = TestUtils.createBrokerConfigs(2, zkConnectOrNull, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps)) // start both servers -server1 = TestUtils.createServer(configProps1) -server2 = TestUtils.createServer(configProps2) +server1 = createBroker(configProps1) +server2 = createBroker(configProps2) servers = List(server1, server2) // create topic with 1 partition, 2 replicas, one on each broker -createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(0,1)), servers = servers) +if (isKRaftTest()) { + admin = createAdminClient(servers, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)) + createTopicWithAdmin(admin, topic, servers, 2, replicaAssignment = Map(0 -> Seq(0, 1))) +} else { + createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(0, 1)), servers = servers) +} Review Comment: I think createTopicWithAdmin includes ZK and KRaft mode, and no need to use `isKRaftTest` to explicitly determine it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15645 ReplicationQuotasTestRig rewritten in java [kafka]
nizhikov commented on PR #14588: URL: https://github.com/apache/kafka/pull/14588#issuecomment-1805320880 Now only 24 tests failed. And the all seems not related to the tool. @mimaison do you have more comments to the PR? Can we merge it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] HOTFIX: Fix compilation error in BrokerLifecycleManager [kafka]
dajac opened a new pull request, #14732: URL: https://github.com/apache/kafka/pull/14732 https://github.com/apache/kafka/pull/14392 has introduces a compilation error (JDK 8 and Scala 2.12): ``` > Task :core:compileScala [Error] /home/jenkins/workspace/Kafka_kafka-pr_PR-14392/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:305:49: value incl is not a member of scala.collection.immutable.Set[org.apache.kafka.common.Uuid] ``` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] HOTFIX: Fix compilation error in BrokerLifecycleManager [kafka]
dajac commented on PR #14732: URL: https://github.com/apache/kafka/pull/14732#issuecomment-1805328685 cc @soarez -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] HOTFIX: Fix compilation error in BrokerLifecycleManager [kafka]
dajac commented on PR #14732: URL: https://github.com/apache/kafka/pull/14732#issuecomment-1805337071 @showuon @mimaison @cadonna @lucasbru If you are around, could you take a quick look at this one? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] HOTFIX: Fix compilation error in BrokerLifecycleManager [kafka]
dajac commented on PR #14732: URL: https://github.com/apache/kafka/pull/14732#issuecomment-1805372330 The compilation step has passed for all builds and I have verified that `kafka.server.BrokerLifecycleManagerTest` passed locally with both scala versions. Therefore, I will merge this PR without waiting on the full build to complete in order to unblock all the other PRs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] HOTFIX: Fix compilation error in BrokerLifecycleManager [kafka]
dajac commented on PR #14732: URL: https://github.com/apache/kafka/pull/14732#issuecomment-1805370594 @showuon Thanks for the review! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] HOTFIX: Fix compilation error in BrokerLifecycleManager [kafka]
dajac merged PR #14732: URL: https://github.com/apache/kafka/pull/14732 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]
C0urante commented on code in PR #12728: URL: https://github.com/apache/kafka/pull/12728#discussion_r1388621193 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ## @@ -722,89 +646,68 @@ public void testRestartConnectorAndTasksBoth() throws Exception { FutureCallback restartCallback = new FutureCallback<>(); herder.restartConnectorAndTasks(restartRequest, restartCallback); assertEquals(connectorStateInfo, restartCallback.get(1000L, TimeUnit.MILLISECONDS)); -PowerMock.verifyAll(); + +verifyConnectorStatusRestart(); +verify(statusBackingStore).put(taskStatus.capture()); } @Test public void testCreateAndStop() throws Exception { -connector = PowerMock.createMock(BogusSourceConnector.class); +connector = mock(BogusSourceConnector.class); expectAdd(SourceSink.SOURCE); Map connectorConfig = connectorConfig(SourceSink.SOURCE); -Connector connectorMock = PowerMock.createMock(SourceConnector.class); +Connector connectorMock = mock(SourceConnector.class); expectConfigValidation(connectorMock, true, connectorConfig); -// herder.stop() should stop any running connectors and tasks even if destroyConnector was not invoked expectStop(); -statusBackingStore.put(new TaskStatus(new ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 0)); -EasyMock.expectLastCall(); - -statusBackingStore.stop(); -EasyMock.expectLastCall(); -worker.stop(); -EasyMock.expectLastCall(); - -PowerMock.replayAll(); - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, createCallback); Herder.Created connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); +// herder.stop() should stop any running connectors and tasks even if destroyConnector was not invoked herder.stop(); assertTrue(noneConnectorClientConfigOverridePolicy.isClosed()); - -PowerMock.verifyAll(); +verify(worker).stop(); +verify(statusBackingStore).stop(); +verify(statusBackingStore).put(new TaskStatus(new ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 0)); } @Test public void testAccessors() throws Exception { Map connConfig = connectorConfig(SourceSink.SOURCE); System.out.println(connConfig); -Callback> listConnectorsCb = PowerMock.createMock(Callback.class); -Callback connectorInfoCb = PowerMock.createMock(Callback.class); -Callback> connectorConfigCb = PowerMock.createMock(Callback.class); -Callback> taskConfigsCb = PowerMock.createMock(Callback.class); -Callback>> tasksConfigCb = PowerMock.createMock(Callback.class); +Callback> listConnectorsCb = mock(Callback.class); +Callback connectorInfoCb = mock(Callback.class); +Callback> connectorConfigCb = mock(Callback.class); +Callback> taskConfigsCb = mock(Callback.class); +Callback>> tasksConfigCb = mock(Callback.class); // Check accessors with empty worker -listConnectorsCb.onCompletion(null, Collections.EMPTY_SET); -EasyMock.expectLastCall(); -connectorInfoCb.onCompletion(EasyMock.anyObject(), EasyMock.isNull()); -EasyMock.expectLastCall(); - connectorConfigCb.onCompletion(EasyMock.anyObject(), EasyMock.isNull()); -EasyMock.expectLastCall(); Review Comment: Aren't we missing a corresponding `doNothing().when(...)` line for this expectation? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1389220068 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -609,17 +842,18 @@ public void testOrdering() { inputTopic1.pipeInput(1, "A1", 100L); processor.checkAndClearProcessResult(); -// push one item to the other window that has a join; this should produce non-joined records with a closed window first, then -// the joined records +// push one item to the other window that has a join; +// this should produce the joined record first; +// then non-joined record with a closed window // by the time they were produced before // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // w2 = { } // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // --> w2 = { 1:a1 (ts: 110) } inputTopic2.pipeInput(1, "a1", 110L); processor.checkAndClearProcessResult( -new KeyValueTimestamp<>(0, "A0+null", 0L), -new KeyValueTimestamp<>(1, "A1+a1", 110L) +new KeyValueTimestamp<>(1, "A1+a1", 110L), +new KeyValueTimestamp<>(0, "A0+null", 0L) Review Comment: Can you point me to the ordering-rules in case of a left- outer- inner- join, then I can have a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
dajac commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1389220863 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -96,7 +96,7 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { * @param topic to be requested. If empty, return the metadata for all topics. * @return the future of the metadata request. */ -public CompletableFuture>> requestTopicMetadata(final Optional topic) { +public CompletableFuture>> requestTopicMetadata(final Optional topic) { Review Comment: Do we still need the changes in this class? It seems that we don't use them anymore. I have the same question for the new `Topic` class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Added topicExists functions in AdminClient [kafka]
makarushka closed pull request #8340: Added topicExists functions in AdminClient URL: https://github.com/apache/kafka/pull/8340 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]
mdedetrich commented on code in PR #12728: URL: https://github.com/apache/kafka/pull/12728#discussion_r1389227114 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ## @@ -656,48 +586,43 @@ public void testRestartConnectorAndTasksOnlyTasks() throws Exception { FutureCallback restartCallback = new FutureCallback<>(); herder.restartConnectorAndTasks(restartRequest, restartCallback); assertEquals(connectorStateInfo, restartCallback.get(1000L, TimeUnit.MILLISECONDS)); -PowerMock.verifyAll(); +ArgumentCaptor taskStatus = ArgumentCaptor.forClass(TaskStatus.class); +verify(statusBackingStore).put(taskStatus.capture()); +assertEquals(AbstractStatus.State.RESTARTING, taskStatus.getValue().state()); Review Comment: Commited and pushed ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ## @@ -722,89 +646,68 @@ public void testRestartConnectorAndTasksBoth() throws Exception { FutureCallback restartCallback = new FutureCallback<>(); herder.restartConnectorAndTasks(restartRequest, restartCallback); assertEquals(connectorStateInfo, restartCallback.get(1000L, TimeUnit.MILLISECONDS)); -PowerMock.verifyAll(); + +verifyConnectorStatusRestart(); +verify(statusBackingStore).put(taskStatus.capture()); Review Comment: Commited and pushed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
soarez commented on code in PR #14369: URL: https://github.com/apache/kafka/pull/14369#discussion_r1389229875 ## core/src/main/java/kafka/server/AssignmentsManager.java: ## @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData; +import org.apache.kafka.common.message.AssignReplicasToDirsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AssignReplicasToDirsRequest; +import org.apache.kafka.common.requests.AssignReplicasToDirsResponse; +import org.apache.kafka.common.utils.ExponentialBackoff; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.queue.EventQueue; +import org.apache.kafka.queue.KafkaEventQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class AssignmentsManager { + +private static final Logger log = LoggerFactory.getLogger(AssignmentsManager.class); + +/** + * Assignments are dispatched to the controller this long after + * being submitted to {@link AssignmentsManager}, if there + * is no request in flight already. + * The interval is reset when a new assignment is submitted. + */ +private static final long DISPATCH_INTERVAL_NS = TimeUnit.SECONDS.toNanos(1); + +private static final long MAX_BACKOFF_INTERVAL_MS = TimeUnit.SECONDS.toNanos(10); + +private final Time time; +private final NodeToControllerChannelManager channelManager; +private final int brokerId; +private final Supplier brokerEpochSupplier; +private final KafkaEventQueue eventQueue; + +// These variables should only be mutated from the event loop thread +private Map inflight = null; +private Map pending = new HashMap<>(); +private final ExponentialBackoff resendExponentialBackoff = +new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02); +private int failedAttempts = 0; + +public AssignmentsManager(Time time, + NodeToControllerChannelManager channelManager, + int brokerId, + Supplier brokerEpochSupplier) { +this.time = time; +this.channelManager = channelManager; +this.brokerId = brokerId; +this.brokerEpochSupplier = brokerEpochSupplier; +this.eventQueue = new KafkaEventQueue(time, +new LogContext("[AssignmentsManager id=" + brokerId + "]"), +"broker-" + brokerId + "-directory-assignments-manager-"); +} + +public void close() throws InterruptedException { +eventQueue.close(); +channelManager.shutdown(); +} + +public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) { +eventQueue.append(new AssignmentEvent(time.nanoseconds(), topicPartition, dirId)); +} + +// only for testing +void wakeup() { +eventQueue.wakeup(); +} + +/** + * Base class for all the events handled by {@link AssignmentsManager}. + */ +private abstract static class Event implements EventQueue.Event { +/** + * Override the default behavior in + * {@link EventQueue.Event#handleException} + * which swallows the exception. + */ +@Override +public void handleException(Throwable e) { +throw new RuntimeException(e); +} +} + +/** + * Hand
Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]
mdedetrich commented on code in PR #12728: URL: https://github.com/apache/kafka/pull/12728#discussion_r1389230779 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ## @@ -722,89 +646,68 @@ public void testRestartConnectorAndTasksBoth() throws Exception { FutureCallback restartCallback = new FutureCallback<>(); herder.restartConnectorAndTasks(restartRequest, restartCallback); assertEquals(connectorStateInfo, restartCallback.get(1000L, TimeUnit.MILLISECONDS)); -PowerMock.verifyAll(); + +verifyConnectorStatusRestart(); +verify(statusBackingStore).put(taskStatus.capture()); } @Test public void testCreateAndStop() throws Exception { -connector = PowerMock.createMock(BogusSourceConnector.class); +connector = mock(BogusSourceConnector.class); expectAdd(SourceSink.SOURCE); Map connectorConfig = connectorConfig(SourceSink.SOURCE); -Connector connectorMock = PowerMock.createMock(SourceConnector.class); +Connector connectorMock = mock(SourceConnector.class); expectConfigValidation(connectorMock, true, connectorConfig); -// herder.stop() should stop any running connectors and tasks even if destroyConnector was not invoked expectStop(); -statusBackingStore.put(new TaskStatus(new ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 0)); -EasyMock.expectLastCall(); - -statusBackingStore.stop(); -EasyMock.expectLastCall(); -worker.stop(); -EasyMock.expectLastCall(); - -PowerMock.replayAll(); - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, createCallback); Herder.Created connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); +// herder.stop() should stop any running connectors and tasks even if destroyConnector was not invoked herder.stop(); assertTrue(noneConnectorClientConfigOverridePolicy.isClosed()); - -PowerMock.verifyAll(); +verify(worker).stop(); +verify(statusBackingStore).stop(); +verify(statusBackingStore).put(new TaskStatus(new ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 0)); } @Test public void testAccessors() throws Exception { Map connConfig = connectorConfig(SourceSink.SOURCE); System.out.println(connConfig); -Callback> listConnectorsCb = PowerMock.createMock(Callback.class); -Callback connectorInfoCb = PowerMock.createMock(Callback.class); -Callback> connectorConfigCb = PowerMock.createMock(Callback.class); -Callback> taskConfigsCb = PowerMock.createMock(Callback.class); -Callback>> tasksConfigCb = PowerMock.createMock(Callback.class); +Callback> listConnectorsCb = mock(Callback.class); +Callback connectorInfoCb = mock(Callback.class); +Callback> connectorConfigCb = mock(Callback.class); +Callback> taskConfigsCb = mock(Callback.class); +Callback>> tasksConfigCb = mock(Callback.class); // Check accessors with empty worker -listConnectorsCb.onCompletion(null, Collections.EMPTY_SET); -EasyMock.expectLastCall(); -connectorInfoCb.onCompletion(EasyMock.anyObject(), EasyMock.isNull()); -EasyMock.expectLastCall(); - connectorConfigCb.onCompletion(EasyMock.anyObject(), EasyMock.isNull()); -EasyMock.expectLastCall(); Review Comment: Committed and pushed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15346: add support for 'single key single timestamp' IQs with versioned state stores (KIP-960) [kafka]
aliehsaeedii commented on PR #14596: URL: https://github.com/apache/kafka/pull/14596#issuecomment-1805669950 @mjsax Cool. Thanks for unblocking me. Maybe now ready to merge? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15707: KRaft support in TopicBasedRemoteLogMetadataManagerHarness [kafka]
linzihao1999 opened a new pull request, #14733: URL: https://github.com/apache/kafka/pull/14733 KRaft support in TopicBasedRemoteLogMetadataManagerHarness ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15745) KRaft support in RequestQuotaTest
[ https://issues.apache.org/jira/browse/KAFKA-15745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zihao Lin resolved KAFKA-15745. --- Resolution: Duplicate > KRaft support in RequestQuotaTest > - > > Key: KAFKA-15745 > URL: https://issues.apache.org/jira/browse/KAFKA-15745 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Sameer Tejani >Priority: Minor > Labels: kraft, kraft-test, newbie > > The following tests in RequestQuotaTest in > core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala need to be > updated to support KRaft > 132 : def testResponseThrottleTime(): Unit = { > 140 : def testResponseThrottleTimeWhenBothProduceAndRequestQuotasViolated(): > Unit = { > 146 : def testResponseThrottleTimeWhenBothFetchAndRequestQuotasViolated(): > Unit = { > 152 : def testUnthrottledClient(): Unit = { > 161 : def testExemptRequestTime(): Unit = { > 171 : def testUnauthorizedThrottle(): Unit = { > Scanned 801 lines. Found 0 KRaft tests out of 6 tests -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15804: Close SocketServer channels when calling shutdown before enableRequestProcessing [kafka]
ex172000 commented on code in PR #14729: URL: https://github.com/apache/kafka/pull/14729#discussion_r1389423646 ## core/src/main/scala/kafka/network/SocketServer.scala: ## @@ -679,6 +679,9 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer, def close(): Unit = { beginShutdown() thread.join() +if (!started) { Review Comment: Do we want to put the `started` as a parameter to the method to make it look cleaner? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15805) Fetch Remote Indexes at once
Jorge Esteban Quilcate Otoya created KAFKA-15805: Summary: Fetch Remote Indexes at once Key: KAFKA-15805 URL: https://issues.apache.org/jira/browse/KAFKA-15805 Project: Kafka Issue Type: Improvement Components: Tiered-Storage Reporter: Jorge Esteban Quilcate Otoya Assignee: Jorge Esteban Quilcate Otoya Reduce Tiered Storage latency when fetching indexes by allowing to fetch many indexes at once. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15806) Signal next segment when remote fetching
Jorge Esteban Quilcate Otoya created KAFKA-15806: Summary: Signal next segment when remote fetching Key: KAFKA-15806 URL: https://issues.apache.org/jira/browse/KAFKA-15806 Project: Kafka Issue Type: Improvement Components: Tiered-Storage Reporter: Jorge Esteban Quilcate Otoya Assignee: Jorge Esteban Quilcate Otoya Improve remote fetching performance when fetching across segment by signaling the next segment and allow Remote Storage Manager implementations to optimize their pre-fetching. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14133: Migrate ProcessorStateManagerTest and StreamThreadTest to Mockito [kafka]
clolov commented on PR #13932: URL: https://github.com/apache/kafka/pull/13932#issuecomment-1805861321 This has since been rebased! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christo Lolov updated KAFKA-14133: -- Description: {color:#de350b}There are tests which use both PowerMock and EasyMock. I have put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely solely on EasyMock.{color} Unless stated in brackets the tests are in the streams module. A list of tests which still require to be moved from EasyMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#ff8b00}In Review{color} {color:#00875a}Merged{color} # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo) # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo) # {color:#00875a}KStreamPrintTest{color} (owner: Christo) # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo) # {color:#00875a}MaterializedInternalTest{color} (owner: Christo) # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo) # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo) # {color:#00875a}ClientUtilsTest{color} (owner: Christo) # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: Christo) # {color:#00875a}TopologyTest{color} (owner: Christo) # {color:#00875a}KTableSuppressProcessorTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo) # {color:#00875a}MeteredTimestampedWindowStoreTest{color} (owner: Christo) # {color:#00875a}StreamsRebalanceListenerTest{color} (owner: Christo) # {color:#00875a}TimestampedKeyValueStoreMaterializerTest{color} (owner: Christo) # {color:#00875a}CachingInMemoryKeyValueStoreTest{color} (owner: Christo) # {color:#00875a}CachingInMemorySessionStoreTest{color} (owner: Christo) # {color:#00875a}CachingPersistentSessionStoreTest{color} (owner: Christo) # {color:#00875a}CachingPersistentWindowStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} (owner: Christo) # {color:#00875a}CompositeReadOnlyWindowStoreTest{color} (owner: Christo) # {color:#00875a}KeyValueStoreBuilderTest{color} (owner: Christo) # {color:#00875a}RocksDBStoreTest{color} (owner: Christo) # {color:#00875a}StreamThreadStateStoreProviderTest{color} (owner: Christo) # {color:#ff8b00}TaskManagerTest{color} (owner: Christo) [https://github.com/apache/kafka/pull/13874, https://github.com/apache/kafka/pull/13897, https://github.com/apache/kafka/pull/13873|https://github.com/apache/kafka/pull/13874] # {color:#00875a}InternalTopicManagerTest{color} (owner: Christo) # {color:#00875a}ProcessorContextImplTest{color} (owner: Christo) # {color:#00875a}WriteConsistencyVectorTest{color} (owner: Christo) # {color:#00875a}StreamsAssignmentScaleTest{color} (owner: Christo) # {color:#00875a}StreamsPartitionAssignorTest{color} (owner: Christo) # {color:#00875a}AssignmentTestUtils{color} (owner: Christo) # {color:#ff8b00}ProcessorStateManagerTest{color} (owner: Matthew) (takeover: Christo) [https://github.com/apache/kafka/pull/13932] # {color:#00875a}StandbyTaskTest{color} (owner: Matthew) (takeover: Christo) # {color:#ff8b00}StoreChangelogReaderTest{color} (owner: Matthew) [https://github.com/apache/kafka/pull/12524] # {color:#ff8b00}StreamTaskTest{color} (owner: Matthew) (takeover: Christo) [https://github.com/apache/kafka/pull/14716] # {color:#ff8b00}StreamThreadTest{color} (owner: Matthew) (takeover: Christo) [https://github.com/apache/kafka/pull/13932] # {color:#00875a}StreamsMetricsImplTest{color} (owner: Dalibor) (takeover: Christo) # {color:#00875a}TimeOrderedCachingPersistentWindowStoreTest{color} (owner: [~shekharrajak]) # {color:#00875a}TimeOrderedWindowStoreTest{color} (owner: [~shekharrajak]) # {color:#00875a}AbstractStreamTest{color} (owner: Christo) # {color:#00875a}KStreamTransformValuesTest{color} (owner: Christo) # {color:#00875a}KTableImplTest{color} (owner: Christo) # {color:#00875a}KTableTransformValuesTest{color} (owner: Christo) # {color:#00875a}SessionCacheFlushListenerTest{color} (owner: Christo) # {color:#00875a}TimestampedCacheFlushListenerTest{color} (owner: Christo) # {color:#00875a}TimestampedTupleForwarderTest{color} (owner: Christo) # {color:#00875a}ActiveTask
[jira] [Updated] (KAFKA-15806) Signal next segment when remote fetching
[ https://issues.apache.org/jira/browse/KAFKA-15806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jorge Esteban Quilcate Otoya updated KAFKA-15806: - Description: Improve remote fetching performance when fetching across segment by signaling the next segment and allow Remote Storage Manager implementations to optimize their pre-fetching. [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1003%3A+Signal+next+segment+when+remote+fetching] was:Improve remote fetching performance when fetching across segment by signaling the next segment and allow Remote Storage Manager implementations to optimize their pre-fetching. > Signal next segment when remote fetching > - > > Key: KAFKA-15806 > URL: https://issues.apache.org/jira/browse/KAFKA-15806 > Project: Kafka > Issue Type: Improvement > Components: Tiered-Storage >Reporter: Jorge Esteban Quilcate Otoya >Assignee: Jorge Esteban Quilcate Otoya >Priority: Major > Labels: kip > > Improve remote fetching performance when fetching across segment by signaling > the next segment and allow Remote Storage Manager implementations to optimize > their pre-fetching. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1003%3A+Signal+next+segment+when+remote+fetching] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15805) Fetch Remote Indexes at once
[ https://issues.apache.org/jira/browse/KAFKA-15805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jorge Esteban Quilcate Otoya updated KAFKA-15805: - Description: Reduce Tiered Storage latency when fetching indexes by allowing to fetch many indexes at once. KIP: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1002%3A+Fetch+remote+segment+indexes+at+once] was: Reduce Tiered Storage latency when fetching indexes by allowing to fetch many indexes at once. > Fetch Remote Indexes at once > > > Key: KAFKA-15805 > URL: https://issues.apache.org/jira/browse/KAFKA-15805 > Project: Kafka > Issue Type: Improvement > Components: Tiered-Storage >Reporter: Jorge Esteban Quilcate Otoya >Assignee: Jorge Esteban Quilcate Otoya >Priority: Major > Labels: kip > > Reduce Tiered Storage latency when fetching indexes by allowing to fetch many > indexes at once. > KIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1002%3A+Fetch+remote+segment+indexes+at+once] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15426) Process and persist directory assignments
[ https://issues.apache.org/jira/browse/KAFKA-15426?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez reassigned KAFKA-15426: --- Assignee: Igor Soarez > Process and persist directory assignments > - > > Key: KAFKA-15426 > URL: https://issues.apache.org/jira/browse/KAFKA-15426 > Project: Kafka > Issue Type: Sub-task >Reporter: Igor Soarez >Assignee: Igor Soarez >Priority: Major > > * Handle AssignReplicasToDirsRequest > * Persist metadata changes -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15361) Process and persist dir info with broker registration
[ https://issues.apache.org/jira/browse/KAFKA-15361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez reassigned KAFKA-15361: --- Assignee: Igor Soarez > Process and persist dir info with broker registration > - > > Key: KAFKA-15361 > URL: https://issues.apache.org/jira/browse/KAFKA-15361 > Project: Kafka > Issue Type: Sub-task >Reporter: Igor Soarez >Assignee: Igor Soarez >Priority: Major > > Controllers should process and persist directory information from the broker > registration request -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15364) Handle log directory failure in the Controller
[ https://issues.apache.org/jira/browse/KAFKA-15364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez reassigned KAFKA-15364: --- Assignee: Igor Soarez > Handle log directory failure in the Controller > -- > > Key: KAFKA-15364 > URL: https://issues.apache.org/jira/browse/KAFKA-15364 > Project: Kafka > Issue Type: Sub-task >Reporter: Igor Soarez >Assignee: Igor Soarez >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
soarez commented on PR #14369: URL: https://github.com/apache/kafka/pull/14369#issuecomment-1805892764 Is there a way to trigger the tests again or do I need to push additional changes? All the test pipelines show a Jenkins error: "No space left on device". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]
kirktrue closed pull request #14670: KAFKA-15277: Design & implement support for internal Consumer delegates URL: https://github.com/apache/kafka/pull/14670 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]
kirktrue commented on PR #14670: URL: https://github.com/apache/kafka/pull/14670#issuecomment-1805940138 Re-triggered a build due to the abundance of seemingly unrelated test failures. For example, the first line of this test in `ClusterConnectionStatesTest` is failing across all builds because `localhost` is resolving to more than one IP address: ```java @Test public void testSingleIP() throws UnknownHostException { assertEquals(1, ClientUtils.resolve("localhost", singleIPHostResolver).size()); connectionStates.connecting(nodeId1, time.milliseconds(), "localhost"); InetAddress currAddress = connectionStates.currentAddress(nodeId1); connectionStates.connecting(nodeId1, time.milliseconds(), "localhost"); assertSame(currAddress, connectionStates.currentAddress(nodeId1)); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Various cleanups in metadata [kafka]
mimaison opened a new pull request, #14734: URL: https://github.com/apache/kafka/pull/14734 - Remove unused code, suppression - Simplify/fix test assertions - Javadoc cleanups ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KIP-988 [kafka]
eduwercamacaro opened a new pull request, #14735: URL: https://github.com/apache/kafka/pull/14735 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KRaft support in DescribeUserScramCredentialsRequestNotAuthorizedTest [kafka]
linzihao1999 opened a new pull request, #14736: URL: https://github.com/apache/kafka/pull/14736 KRaft support in DescribeUserScramCredentialsRequestNotAuthorizedTest ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15800: Prevent DataExceptions from corrupting KafkaOffsetBackingStore [kafka]
gharris1727 commented on PR #14718: URL: https://github.com/apache/kafka/pull/14718#issuecomment-1806059864 Test failures appear unrelated, and the connect and mirror tests pass locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15800: Prevent DataExceptions from corrupting KafkaOffsetBackingStore [kafka]
gharris1727 merged PR #14718: URL: https://github.com/apache/kafka/pull/14718 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15804: Close SocketServer channels when calling shutdown before enableRequestProcessing [kafka]
gharris1727 commented on code in PR #14729: URL: https://github.com/apache/kafka/pull/14729#discussion_r1389645226 ## core/src/main/scala/kafka/network/SocketServer.scala: ## @@ -679,6 +679,9 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer, def close(): Unit = { beginShutdown() thread.join() +if (!started) { Review Comment: Hey @ex172000 thanks for the review! Do you mean as a parameter to closeAll? I don't think that's necessary as started is an instance variable. Also, I didn't add the condition to finally block because it should always be true at that point in the code, so it's only present on this code path where the thread has not been started yet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]
kirktrue closed pull request #14670: KAFKA-15277: Design & implement support for internal Consumer delegates URL: https://github.com/apache/kafka/pull/14670 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1389684994 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server; + +import java.util.Collections; +import java.util.regex.Pattern; +import kafka.metrics.ClientMetricsConfigs; +import kafka.metrics.ClientMetricsInstance; +import kafka.metrics.ClientMetricsInstanceMetadata; +import kafka.metrics.ClientMetricsReceiverPlugin; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.TelemetryTooLargeException; +import org.apache.kafka.common.errors.ThrottlingQuotaExceededException; +import org.apache.kafka.common.errors.UnknownSubscriptionIdException; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.requests.RequestContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.zip.CRC32; + +/** + * Handles client telemetry metrics requests/responses, subscriptions and instance information. + */ +public class ClientMetricsManager implements Closeable { + +private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); +private static final ClientMetricsManager INSTANCE = new ClientMetricsManager(); + +public static ClientMetricsManager instance() { +return INSTANCE; +} +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; Review Comment: The should it be in low 100K i.e. 2^17 = 131072? ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server; + +import java.util.Collections; +import java.util.regex.Pattern; +import kafka.metrics.ClientMetricsConfigs; +import kafka.metrics.ClientMetricsInstance; +import kafka.metrics.ClientMetricsInstanceMetadata; +import kafka.metrics.ClientMetricsReceiverPlugin; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.errors.ApiException; +import org.apac
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1389709078 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable { public static ClientMetricsManager instance() { return INSTANCE; } +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; +private final Map subscriptionMap; + +// The last subscription updated time is used to determine if the next telemetry request needs +// to re-evaluate the subscription id as per changes subscriptions. +private long lastSubscriptionUpdateEpoch; + +// Visible for testing +ClientMetricsManager() { +subscriptionMap = new ConcurrentHashMap<>(); +clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +} public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +updateLastSubscriptionUpdateEpoch(); +} +return; +} + +ClientMetricsConfigs configs = new ClientMetricsConfigs(properties); +updateClientSubscription(subscriptionName, configs); +/* + Update last subscription updated time to current time to indicate that there is a change + in the subscription. This will be used to determine if the next telemetry request needs + to re-evaluate the subscription id as per changes subscriptions. +*/ +updateLastSubscriptionUpdateEpoch(); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +long now = System.currentTimeMillis(); +Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) +.filter(id -> !id.equals(Uuid.ZERO_UUID)) +.orElse(generateNewClientId()); + +/* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issued get telemetry + request prior to push interval, then the client should get a throttle error but if the + subscription has changed since the last request then the client should get the updated + subscription immediately. +*/ +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the get request parameters for the client instance. +validateGetRequest(request, clientInstance); +} catch (ApiException exception) { +return request.getErrorResponse(throttleMs, exception); +} finally { +clientInstance.lastGetRequestEpoch(now); +} + +clientInstance.lastKnownError(Errors.NONE); +return createGetSubscriptionResponse(clientInstanceId, clientInstance, telemetryMaxBytes, throttleMs); +} + +public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, +int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +Uuid clientInstanceId = request.data().clientInstanceId(); +if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) { +String msg = String.format("Invalid request from the client [%s], invalid client instance id", +clientInstanceId); +return request.getErrorResponse(throttleMs, new InvalidRequestException(msg)); +} + +long now = System.currentTimeMillis(); +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the push request parameters for the client instance. +validatePushRequest(request, telemetryMaxBytes, clientInstance); +} catch (ApiException exception) { +clientInstance.lastKnownError(Errors.forException(exception)); +
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1389719566 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable { public static ClientMetricsManager instance() { return INSTANCE; } +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; +private final Map subscriptionMap; + +// The last subscription updated time is used to determine if the next telemetry request needs +// to re-evaluate the subscription id as per changes subscriptions. +private long lastSubscriptionUpdateEpoch; + +// Visible for testing +ClientMetricsManager() { +subscriptionMap = new ConcurrentHashMap<>(); +clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +} public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +updateLastSubscriptionUpdateEpoch(); +} +return; +} + +ClientMetricsConfigs configs = new ClientMetricsConfigs(properties); +updateClientSubscription(subscriptionName, configs); +/* + Update last subscription updated time to current time to indicate that there is a change + in the subscription. This will be used to determine if the next telemetry request needs + to re-evaluate the subscription id as per changes subscriptions. +*/ +updateLastSubscriptionUpdateEpoch(); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +long now = System.currentTimeMillis(); +Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) +.filter(id -> !id.equals(Uuid.ZERO_UUID)) +.orElse(generateNewClientId()); + +/* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issued get telemetry + request prior to push interval, then the client should get a throttle error but if the + subscription has changed since the last request then the client should get the updated + subscription immediately. +*/ +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the get request parameters for the client instance. +validateGetRequest(request, clientInstance); +} catch (ApiException exception) { +return request.getErrorResponse(throttleMs, exception); +} finally { +clientInstance.lastGetRequestEpoch(now); +} + +clientInstance.lastKnownError(Errors.NONE); +return createGetSubscriptionResponse(clientInstanceId, clientInstance, telemetryMaxBytes, throttleMs); +} + +public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, +int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +Uuid clientInstanceId = request.data().clientInstanceId(); +if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) { +String msg = String.format("Invalid request from the client [%s], invalid client instance id", +clientInstanceId); +return request.getErrorResponse(throttleMs, new InvalidRequestException(msg)); +} + +long now = System.currentTimeMillis(); +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the push request parameters for the client instance. +validatePushRequest(request, telemetryMaxBytes, clientInstance); +} catch (ApiException exception) { +clientInstance.lastKnownError(Errors.forException(exception)); +
Re: [PR] KAFKA-15681: Add support of client-metrics in kafka-configs.sh (KIP-714) [kafka]
apoorvmittal10 closed pull request #14632: KAFKA-15681: Add support of client-metrics in kafka-configs.sh (KIP-714) URL: https://github.com/apache/kafka/pull/14632 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15807) Add support for compress/decompress metrics
Apoorv Mittal created KAFKA-15807: - Summary: Add support for compress/decompress metrics Key: KAFKA-15807 URL: https://issues.apache.org/jira/browse/KAFKA-15807 Project: Kafka Issue Type: Sub-task Reporter: Apoorv Mittal Assignee: Apoorv Mittal -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15807) Add support for compression/decompression of metrics
[ https://issues.apache.org/jira/browse/KAFKA-15807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apoorv Mittal updated KAFKA-15807: -- Summary: Add support for compression/decompression of metrics (was: Add support for compress/decompress metrics) > Add support for compression/decompression of metrics > > > Key: KAFKA-15807 > URL: https://issues.apache.org/jira/browse/KAFKA-15807 > Project: Kafka > Issue Type: Sub-task >Reporter: Apoorv Mittal >Assignee: Apoorv Mittal >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1389684994 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server; + +import java.util.Collections; +import java.util.regex.Pattern; +import kafka.metrics.ClientMetricsConfigs; +import kafka.metrics.ClientMetricsInstance; +import kafka.metrics.ClientMetricsInstanceMetadata; +import kafka.metrics.ClientMetricsReceiverPlugin; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.TelemetryTooLargeException; +import org.apache.kafka.common.errors.ThrottlingQuotaExceededException; +import org.apache.kafka.common.errors.UnknownSubscriptionIdException; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.requests.RequestContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.zip.CRC32; + +/** + * Handles client telemetry metrics requests/responses, subscriptions and instance information. + */ +public class ClientMetricsManager implements Closeable { + +private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); +private static final ClientMetricsManager INSTANCE = new ClientMetricsManager(); + +public static ClientMetricsManager instance() { +return INSTANCE; +} +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; Review Comment: Then should it be in low 100K i.e. 2^17 = 131072? I am not sure what the right config should be here, I know the cloud providers like MSK typically provides 3000 active connections per broker where this limit is higher in case of Confluent but what typically are the active number of client connecting to broker should be. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]
kirktrue closed pull request #14670: KAFKA-15277: Design & implement support for internal Consumer delegates URL: https://github.com/apache/kafka/pull/14670 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15341) Enabling TS for a topic during rolling restart causes problems
[ https://issues.apache.org/jira/browse/KAFKA-15341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Phuc Hong Tran reassigned KAFKA-15341: -- Assignee: Phuc Hong Tran > Enabling TS for a topic during rolling restart causes problems > -- > > Key: KAFKA-15341 > URL: https://issues.apache.org/jira/browse/KAFKA-15341 > Project: Kafka > Issue Type: Bug >Reporter: Divij Vaidya >Assignee: Phuc Hong Tran >Priority: Major > Labels: KIP-405 > Fix For: 3.7.0 > > > When we are in a rolling restart to enable TS at system level, some brokers > have TS enabled on them and some don't. We send an alter config call to > enable TS for a topic, it hits a broker which has TS enabled, this broker > forwards it to the controller and controller will send the config update to > all brokers. When another broker which doesn't have TS enabled (because it > hasn't undergone the restart yet) gets this config change, it "should" fail > to apply it. But failing now is too late since alterConfig has already > succeeded since controller->broker config propagation is done async. > With this JIRA, we want to have controller check if TS is enabled on all > brokers before applying alter config to turn on TS for a topic. > Context: https://github.com/apache/kafka/pull/14176#discussion_r1291265129 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-15362: Resolve offline replicas in metadata cache [kafka]
soarez opened a new pull request, #14737: URL: https://github.com/apache/kafka/pull/14737 The metadata cache now considers registered log directories and directory assignments when determining offline replicas. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15362: Resolve offline replicas in metadata cache [kafka]
soarez commented on PR #14737: URL: https://github.com/apache/kafka/pull/14737#issuecomment-1806580244 @pprovenzano @cmccabe please have a look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15774: refactor windowed stores to use StoreFactory [kafka]
agavra commented on PR #14708: URL: https://github.com/apache/kafka/pull/14708#issuecomment-1806629224 @ableegoldman is this good to merge? the failing tests seem unrelated and I've seen them be quite flaky ![image](https://github.com/apache/kafka/assets/3172405/d02560d5-1ffe-443e-8eba-0826e50aefb2) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15774: refactor windowed stores to use StoreFactory [kafka]
ableegoldman merged PR #14708: URL: https://github.com/apache/kafka/pull/14708 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]
C0urante commented on code in PR #12728: URL: https://github.com/apache/kafka/pull/12728#discussion_r1390087872 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ## @@ -817,19 +726,14 @@ public void testAccessors() throws Exception { Herder.Created connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); -EasyMock.reset(transformer); -EasyMock.expect(transformer.transform(eq(CONNECTOR_NAME), EasyMock.anyObject())) -.andThrow(new AssertionError("Config transformation should not occur when requesting connector or task info")) -.anyTimes(); -EasyMock.replay(transformer); - +reset(transformer); herder.connectors(listConnectorsCb); herder.connectorInfo(CONNECTOR_NAME, connectorInfoCb); herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb); herder.taskConfigs(CONNECTOR_NAME, taskConfigsCb); herder.tasksConfig(CONNECTOR_NAME, tasksConfigCb); - -PowerMock.verifyAll(); +// Config transformation should not occur when requesting connector or task info +verify(transformer, never()).transform(eq(CONNECTOR_NAME), any()); Review Comment: This is much cleaner than the existing strategy (where we cause invocations of `transformer::transform` to throw an exception). Nice job, thanks! ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ## @@ -908,75 +799,58 @@ public void testCorruptConfig() throws Throwable { config.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME); config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSinkConnector.class.getName()); config.put(SinkConnectorConfig.TOPICS_CONFIG, TOPICS_LIST_STR); -Connector connectorMock = PowerMock.createMock(SinkConnector.class); +Connector connectorMock = mock(SinkConnector.class); String error = "This is an error in your config!"; List errors = new ArrayList<>(singletonList(error)); String key = "foo.invalid.key"; -EasyMock.expect(connectorMock.validate(config)).andReturn( +when(connectorMock.validate(config)).thenReturn( new Config( Arrays.asList(new ConfigValue(key, null, Collections.emptyList(), errors)) ) ); ConfigDef configDef = new ConfigDef(); configDef.define(key, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, ""); - EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); -final Capture> configCapture = EasyMock.newCapture(); - EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); -EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); - EasyMock.expect(plugins.connectorLoader(EasyMock.anyString())).andReturn(pluginLoader); - EasyMock.expect(plugins.withClassLoader(pluginLoader)).andReturn(loaderSwap); -EasyMock.expect(worker.getPlugins()).andStubReturn(plugins); - EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); -EasyMock.expect(connectorMock.config()).andStubReturn(configDef); -loaderSwap.close(); Review Comment: Don't we still want to verify that this takes place? ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ## @@ -887,19 +783,14 @@ public void testPutConnectorConfig() throws Exception { assertEquals("bar", capturedConfig.getValue().get("foo")); herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb); Review Comment: Can we also make sure that there aren't any errors reported with the `connectorConfigCb`? The easy way to do this would be to just add this line to the end of the test: ```java verifyNoMoreInteractions(connectorConfigCb); ``` It'd probably be better to do away with mocked callbacks and switch to using a `FutureCallback` that we invoke `get` on (like we do with many other tests), but it's up to you if you'd like to implement that or not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]
C0urante commented on code in PR #12728: URL: https://github.com/apache/kafka/pull/12728#discussion_r1390091434 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ## @@ -908,75 +799,58 @@ public void testCorruptConfig() throws Throwable { config.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME); config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSinkConnector.class.getName()); config.put(SinkConnectorConfig.TOPICS_CONFIG, TOPICS_LIST_STR); -Connector connectorMock = PowerMock.createMock(SinkConnector.class); +Connector connectorMock = mock(SinkConnector.class); String error = "This is an error in your config!"; List errors = new ArrayList<>(singletonList(error)); String key = "foo.invalid.key"; -EasyMock.expect(connectorMock.validate(config)).andReturn( +when(connectorMock.validate(config)).thenReturn( new Config( Arrays.asList(new ConfigValue(key, null, Collections.emptyList(), errors)) ) ); ConfigDef configDef = new ConfigDef(); configDef.define(key, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, ""); - EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); -final Capture> configCapture = EasyMock.newCapture(); - EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); -EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); - EasyMock.expect(plugins.connectorLoader(EasyMock.anyString())).andReturn(pluginLoader); - EasyMock.expect(plugins.withClassLoader(pluginLoader)).andReturn(loaderSwap); -EasyMock.expect(worker.getPlugins()).andStubReturn(plugins); - EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); -EasyMock.expect(connectorMock.config()).andStubReturn(configDef); -loaderSwap.close(); Review Comment: Don't we still want to verify that the call to `loaderSwap::close` takes place? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] This adds a default implementation to the KafkaProducer interface to … [kafka]
github-actions[bot] commented on PR #13981: URL: https://github.com/apache/kafka/pull/13981#issuecomment-1806659081 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15341) Enabling TS for a topic during rolling restart causes problems
[ https://issues.apache.org/jira/browse/KAFKA-15341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785136#comment-17785136 ] Phuc Hong Tran commented on KAFKA-15341: [~divijvaidya] controller doesn't know if individual broker has TS enable or not. It's best to have controller know the data about TS enable of brokers other than brokers rejecting TS per topic enablement call as it would be inefficient since we has to reach out to all those brokers only to get rejected. > Enabling TS for a topic during rolling restart causes problems > -- > > Key: KAFKA-15341 > URL: https://issues.apache.org/jira/browse/KAFKA-15341 > Project: Kafka > Issue Type: Bug >Reporter: Divij Vaidya >Assignee: Phuc Hong Tran >Priority: Major > Labels: KIP-405 > Fix For: 3.7.0 > > > When we are in a rolling restart to enable TS at system level, some brokers > have TS enabled on them and some don't. We send an alter config call to > enable TS for a topic, it hits a broker which has TS enabled, this broker > forwards it to the controller and controller will send the config update to > all brokers. When another broker which doesn't have TS enabled (because it > hasn't undergone the restart yet) gets this config change, it "should" fail > to apply it. But failing now is too late since alterConfig has already > succeeded since controller->broker config propagation is done async. > With this JIRA, we want to have controller check if TS is enabled on all > brokers before applying alter config to turn on TS for a topic. > Context: https://github.com/apache/kafka/pull/14176#discussion_r1291265129 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15575) Prevent Connectors from exceeding tasks.max configuration
[ https://issues.apache.org/jira/browse/KAFKA-15575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-15575: -- Description: The Connector::taskConfigs(int maxTasks) function is used by Connectors to enumerate tasks configurations. This takes an argument which comes from the tasks.max connector config. This is the Javadoc for that method: {noformat} /** * Returns a set of configurations for Tasks based on the current configuration, * producing at most {@code maxTasks} configurations. * * @param maxTasks maximum number of configurations to generate * @return configurations for Tasks */ public abstract List> taskConfigs(int maxTasks); {noformat} This includes the constraint that the number of tasks is at most maxTasks, but this constraint is not enforced by the framework. To enforce this constraint, we could begin dropping configs that exceed the limit, and log a warning. For sink connectors this should harmlessly rebalance the consumer subscriptions onto the remaining tasks. For source connectors that distribute their work via task configs, this may result in an interruption in data transfer. was: The Connector::taskConfigs(int maxTasks) function is used by Connectors to enumerate tasks configurations. This takes an argument which comes from the tasks.max connector config. This is the Javadoc for that method: {noformat} /** * Returns a set of configurations for Tasks based on the current configuration, * producing at most {@code maxTasks} configurations. * * @param maxTasks maximum number of configurations to generate * @return configurations for Tasks */ public abstract List> taskConfigs(int maxTasks); {noformat} This includes the constraint that the number of tasks is at most maxTasks, but this constraint is not enforced by the framework. We should begin enforcing this constraint by dropping configs that exceed the limit, and logging a warning. For sink connectors this should harmlessly rebalance the consumer subscriptions onto the remaining tasks. For source connectors that distribute their work via task configs, this may result in an interruption in data transfer. > Prevent Connectors from exceeding tasks.max configuration > - > > Key: KAFKA-15575 > URL: https://issues.apache.org/jira/browse/KAFKA-15575 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Reporter: Greg Harris >Assignee: Chris Egerton >Priority: Minor > Labels: kip > > The Connector::taskConfigs(int maxTasks) function is used by Connectors to > enumerate tasks configurations. This takes an argument which comes from the > tasks.max connector config. This is the Javadoc for that method: > {noformat} > /** > * Returns a set of configurations for Tasks based on the current > configuration, > * producing at most {@code maxTasks} configurations. > * > * @param maxTasks maximum number of configurations to generate > * @return configurations for Tasks > */ > public abstract List> taskConfigs(int maxTasks); > {noformat} > This includes the constraint that the number of tasks is at most maxTasks, > but this constraint is not enforced by the framework. > > To enforce this constraint, we could begin dropping configs that exceed the > limit, and log a warning. For sink connectors this should harmlessly > rebalance the consumer subscriptions onto the remaining tasks. For source > connectors that distribute their work via task configs, this may result in an > interruption in data transfer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15575) Prevent Connectors from exceeding tasks.max configuration
[ https://issues.apache.org/jira/browse/KAFKA-15575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785147#comment-17785147 ] Chris Egerton commented on KAFKA-15575: --- I've published [KIP-1004|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1004%3A+Enforce+tasks.max+property+in+Kafka+Connect] to try to address this gap. > Prevent Connectors from exceeding tasks.max configuration > - > > Key: KAFKA-15575 > URL: https://issues.apache.org/jira/browse/KAFKA-15575 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Reporter: Greg Harris >Assignee: Chris Egerton >Priority: Minor > Labels: kip > > The Connector::taskConfigs(int maxTasks) function is used by Connectors to > enumerate tasks configurations. This takes an argument which comes from the > tasks.max connector config. This is the Javadoc for that method: > {noformat} > /** > * Returns a set of configurations for Tasks based on the current > configuration, > * producing at most {@code maxTasks} configurations. > * > * @param maxTasks maximum number of configurations to generate > * @return configurations for Tasks > */ > public abstract List> taskConfigs(int maxTasks); > {noformat} > This includes the constraint that the number of tasks is at most maxTasks, > but this constraint is not enforced by the framework. > > To enforce this constraint, we could begin dropping configs that exceed the > limit, and log a warning. For sink connectors this should harmlessly > rebalance the consumer subscriptions onto the remaining tasks. For source > connectors that distribute their work via task configs, this may result in an > interruption in data transfer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15808) kstreams app consumer threads all fenced due to outdated static membership ids during group coordinator migration
Carrie Hazelton created KAFKA-15808: --- Summary: kstreams app consumer threads all fenced due to outdated static membership ids during group coordinator migration Key: KAFKA-15808 URL: https://issues.apache.org/jira/browse/KAFKA-15808 Project: Kafka Issue Type: Bug Components: group-coordinator, streams Affects Versions: 3.5.0 Reporter: Carrie Hazelton Attachments: broker-50003-kafka.log, broker-55002-kafka.log, kstreams-55002-kafka.log I am wondering if my team encountered a bug with static membership in a KStream application (river.stream-filter in attached logs). There was an abrupt shutdown of all of it's consumer threads after they all became fenced on membership id conflicts. It occurred during a rolling deployment to the Kafka broker fleet which triggered a migration of the group coordinator to a new broker (from kafka-broker-55002 to kafka-broker-50003). My understanding is that a new group coordinator shouldn't start interacting with consumer threads until it has finished loading membership metadata during the migration. In this case, the new group coordinator started interacting with the thread consumers before it finished loading the latest membership information in the most recent generation of the membership metadata. As the migration was incomplete, the new group coordinator had outdated consumers membership IDs, which caused all the streaming app consumer threads to get fenced due to conflicting membership ids. Major timeline in the attached logs: * 07:27:04 - 07:32:25 UTC: Deployment to broker kafka-broker-55002. * 07:32:07 - 07:32:10 UTC: 36 unavailable broker messages seen for kafka-broker-55002 and 3 for kafka-broker-50003 (overlapping in time). * 07:32:21,505 UTC: Controller kafka-broker-51005 adds broker kafka-broker-55002 back as a live broker. * 07:32:23,276 UTC: kafka-broker-50003 last mention of loading membership data. * 07:32:34,175 - 07:32:34,278 UTC - kstream app threads (consumers in group) using latest member IDs interact with group coordinator kafka-broker-50003 which had outdated member IDs. Consumers threads from kstream app (river.stream-filter) are fenced and ejected from the consumer group. When kafka-broker-50003 was assigned group coordinator it was loading the latest (737) generation membership metadata, but appears to have never finished. Review of a previous successful migration shows multiple log lines for a new generation, but only one was seen in this deployment. Below are some log lines that I found from a kstream host kafka.log that highlights the member id difference in for generation 737: First generation and last seen log line on while the app was talking to group coordinator kafka-broker-50003: {quote} [2023-08-04 07:32:16,564] INFO Loaded member MemberMetadata(memberId=kafka-streams-55007.somewhere.com-2-8a5b4e68-7065-4125-9ba0-615b538a0e79, groupInstanceId=Some(kafka-streams-55007.somewhere.com-2), clientId=river.stream-filter.v1-d3473182-6af5-410d-82ef-4b85ea1b9a66-StreamThread-2-consumer, clientHost=/XX.XX.XX.XX, sessionTimeoutMs=45000, rebalanceTimeoutMs=30, supportedProtocols=List(stream)) in group river.stream-filter.v1 with generation 686. (kafka.coordinator.group.GroupMetadata$) {{{}{{}}{}}}{{{}{{...}}{}}} [2023-08-04 07:32:23,276] INFO Loaded member MemberMetadata(memberId=kafka-streams-55007.somewhere.com-2-8a5b4e68-7065-4125-9ba0-615b538a0e79, groupInstanceId=Some(kafka-streams-55007.somewhere.com-2), clientId=river.stream-filter.v1-d3473182-6af5-410d-82ef-4b85ea1b9a66-StreamThread-2-consumer, clientHost=/XX.XX.XX.XX, sessionTimeoutMs=45000, rebalanceTimeoutMs=30, supportedProtocols=List(stream)) in group river.stream-filter.v1 with generation 737. (kafka.coordinator.group.GroupMetadata$) {quote} The memberId was the same in both generations (ending with {{615b538a0e79)}} The consumers got fenced after the controller sent a message around the time of the above log line. Later kafka-broker-55002 gets the group coordinator assignment back. These log lines look similar to a previous successful migration, and the memberId changes (from ending with {{615b538a0e79}} to {{{}4ba4c7f17eea{}}}) {quote}{{[2023-08-04 07:32:44,388] INFO Loaded member MemberMetadata(memberId=kafka-streams-55007.somewhere.com-2-8a5b4e68-7065-4125-9ba0-615b538a0e79, groupInstanceId=Some(kafka-streams-55007.somewhere.com-2), clientId=river.stream-filter.v1-d3473182-6af5-410d-82ef-4b85ea1b9a66-StreamThread-2-consumer, clientHost=/XX.XX.XX.XX, sessionTimeoutMs=45000, rebalanceTimeoutMs=30, supportedProtocols=List(stream)) in group river.stream-filter.v1 with generation 686. (kafka.coordinator.group.GroupMetadata$)}} {{...}} {{[2023-08-04 07:33:08,824] INFO Loaded member MemberMetadata(memberId=kafka-streams-55007.somewhere.com-2-e1dd9fd3-5887-4cd
Re: [PR] KAFKA-15740: KRaft support in DeleteOffsetsConsumerGroupCommandIntegrationTest [kafka]
linzihao1999 commented on PR #14669: URL: https://github.com/apache/kafka/pull/14669#issuecomment-1806736102 Hi @cmccabe @jsancio The CI-builds finished and I found no failure test report about DeleteOffsetsConsumerGroupCommandIntegrationTest. Is there more I need to do before code review. PTAL https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14669/2//testReport/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-12388: Share broker channel between alterIsrManager and lifecycleManager [kafka]
dengziming closed pull request #10255: KAFKA-12388: Share broker channel between alterIsrManager and lifecycleManager URL: https://github.com/apache/kafka/pull/10255 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-6084: Propagate JSON parsing errors in ReassignPartitionsCommand [kafka]
dengziming closed pull request #10789: KAFKA-6084: Propagate JSON parsing errors in ReassignPartitionsCommand URL: https://github.com/apache/kafka/pull/10789 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15741: KRaft support in DescribeConsumerGroupTest [kafka]
dengziming commented on PR #14668: URL: https://github.com/apache/kafka/pull/14668#issuecomment-1806737490 There seems some flaky related to this PR here: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14668/2/#showFailuresLink -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org