Re: [PR] KAFKA-16696: Removed the in-memory implementation of RSM and RLMM [kafka]
chia7712 commented on code in PR #15911: URL: https://github.com/apache/kafka/pull/15911#discussion_r1596287627 ## storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java: ## @@ -49,9 +45,10 @@ public class RemoteLogMetadataManagerTest { private final Time time = new MockTime(1); -@ParameterizedTest(name = "remoteLogMetadataManager = {0}") -@MethodSource("remoteLogMetadataManagers") -public void testFetchSegments(RemoteLogMetadataManager remoteLogMetadataManager) throws Exception { +private RemoteLogMetadataManager remoteLogMetadataManager = new TopicBasedRemoteLogMetadataManagerWrapperWithHarness(); + +@Test +public void testFetchSegments() throws Exception { try { Review Comment: It seems to me `TopicBasedRemoteLogMetadataManagerWrapperWithHarness` can be removed also. We don't use the wrapper actually. This test can be modified by following style: ```java @Test public void testFetchSegments() throws Exception { try (TopicBasedRemoteLogMetadataManagerHarness remoteLogMetadataManagerHarness = new TopicBasedRemoteLogMetadataManagerHarness()) { RemoteLogMetadataManager remoteLogMetadataManager = remoteLogMetadataManagerHarness.remoteLogMetadataManager(); ``` noted `TopicBasedRemoteLogMetadataManagerHarness` needs to implement `AutoClosable`, and `remoteLogMetadataManager` should be a public method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on PR #15825: URL: https://github.com/apache/kafka/pull/15825#issuecomment-2103912972 While going through the usages, it looks to me that the LogOffsetMetadata conversion happens in the KafkaMetadataLog is not correct. Could someone please double check? ``` org.apache.kafka.storage.internals.log.LogOffsetMetadata -> org.apache.kafka.raft.LogOffsetMetadata ``` https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala#L226 Question: Why do we make the org.apache.kafka.raft.LogOffsetMetadata#segmentPosition as empty when hwm.messageOffsetOnly is false? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
chia7712 commented on code in PR #15779: URL: https://github.com/apache/kafka/pull/15779#discussion_r1596279690 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java: ## @@ -415,153 +474,246 @@ public void testResetOffsetsExportImportPlan() throws Exception { TopicPartition t1p1 = new TopicPartition(topic1, 1); TopicPartition t2p0 = new TopicPartition(topic2, 0); TopicPartition t2p1 = new TopicPartition(topic2, 1); -createTopic(topic1, 2, 1, new Properties(), listenerName(), new Properties()); -createTopic(topic2, 2, 1, new Properties(), listenerName(), new Properties()); +String[] cgcArgs = buildArgsForGroups(cluster, asList(group1, group2), +"--all-topics", "--to-offset", "2", "--export"); +File file = TestUtils.tempFile("reset", ".csv"); +// Multiple --group's offset import +String[] cgcArgsExec = buildArgsForGroups(cluster, asList(group1, group2), +"--all-topics", +"--from-file", file.getCanonicalPath(), "--dry-run"); +// Single --group offset import using "group,topic,partition,offset" csv format +String[] cgcArgsExec2 = buildArgsForGroup(cluster, group1, "--all-topics", +"--from-file", file.getCanonicalPath(), "--dry-run"); -String[] cgcArgs = buildArgsForGroups(Arrays.asList(group1, group2), "--all-topics", "--to-offset", "2", "--export"); -ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand = getConsumerGroupService(cgcArgs); +try (Admin admin = cluster.createAdminClient(); + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + BufferedWriter bw = new BufferedWriter(new FileWriter(file)); + ConsumerGroupCommand.ConsumerGroupService serviceExec = getConsumerGroupService(cgcArgsExec); + ConsumerGroupCommand.ConsumerGroupService serviceExec2 = getConsumerGroupService(cgcArgsExec2)) { -produceConsumeAndShutdown(topic1, group1, 100, 1); -produceConsumeAndShutdown(topic2, group2, 100, 1); +admin.createTopics(asList(new NewTopic(topic1, 2, (short) 1), +new NewTopic(topic2, 2, (short) 1))).all().get(); -awaitConsumerGroupInactive(consumerGroupCommand, group1); -awaitConsumerGroupInactive(consumerGroupCommand, group2); +produceConsumeAndShutdown(cluster, topic1, group1, 1); +produceConsumeAndShutdown(cluster, topic2, group2, 1); -File file = TestUtils.tempFile("reset", ".csv"); +awaitConsumerGroupInactive(service, group1); +awaitConsumerGroupInactive(service, group2); -Map> exportedOffsets = consumerGroupCommand.resetOffsets(); -BufferedWriter bw = new BufferedWriter(new FileWriter(file)); -bw.write(consumerGroupCommand.exportOffsetsToCsv(exportedOffsets)); -bw.close(); -Map exp1 = new HashMap<>(); -exp1.put(t1p0, 2L); -exp1.put(t1p1, 2L); -Map exp2 = new HashMap<>(); -exp2.put(t2p0, 2L); -exp2.put(t2p1, 2L); +Map> exportedOffsets = service.resetOffsets(); +bw.write(service.exportOffsetsToCsv(exportedOffsets)); +Map exp1 = new HashMap<>(); +exp1.put(t1p0, 2L); +exp1.put(t1p1, 2L); +Map exp2 = new HashMap<>(); +exp2.put(t2p0, 2L); +exp2.put(t2p1, 2L); -assertEquals(exp1, toOffsetMap(exportedOffsets.get(group1))); -assertEquals(exp2, toOffsetMap(exportedOffsets.get(group2))); +assertEquals(exp1, toOffsetMap(exportedOffsets.get(group1))); +assertEquals(exp2, toOffsetMap(exportedOffsets.get(group2))); -// Multiple --group's offset import -String[] cgcArgsExec = buildArgsForGroups(Arrays.asList(group1, group2), "--all-topics", "--from-file", file.getCanonicalPath(), "--dry-run"); -ConsumerGroupCommand.ConsumerGroupService consumerGroupCommandExec = getConsumerGroupService(cgcArgsExec); -Map> importedOffsets = consumerGroupCommandExec.resetOffsets(); -assertEquals(exp1, toOffsetMap(importedOffsets.get(group1))); -assertEquals(exp2, toOffsetMap(importedOffsets.get(group2))); +Map> importedOffsets = serviceExec.resetOffsets(); +assertEquals(exp1, toOffsetMap(importedOffsets.get(group1))); +assertEquals(exp2, toOffsetMap(importedOffsets.get(group2))); -// Single --group offset import using "group,topic,partition,offset" csv format -String[] cgcArgsExec2 = buildArgsForGroup(group1, "--all-topics", "--from-file", file.getCanonicalPath(), "--dry-run"); -ConsumerGroupCommand.ConsumerGroupService consumerGroupCommandExec2 = getConsumerGroupService(cgcArgsExec2); -Map> importedOffsets2 = consumerGroupCommandExec2.resetOffsets();
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1596278649 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException Review Comment: Removed the `checkLogStartOffset` from the `convertToOffsetMetadataOrThrow` method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1596276757 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException +* 2. If the message offset is lesser than the local-log-start-offset, then it returns the message-only metadata +* 3. If the message offset is greater than the log-end-offset, then it returns the message-only metadata */ - private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { + private[log] def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { Review Comment: This is a potential loop (not sure when it would be triggered), updated the logic to return the message-only metadata. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15615: Improve handling of fetching during metadata updates [kafka]
appchemist commented on code in PR #15647: URL: https://github.com/apache/kafka/pull/15647#discussion_r1580562449 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java: ## @@ -326,22 +326,34 @@ private void handleInitializeErrors(final CompletedFetch completedFetch, final E final TopicPartition tp = completedFetch.partition; final long fetchOffset = completedFetch.nextFetchOffset(); -if (error == Errors.NOT_LEADER_OR_FOLLOWER || -error == Errors.REPLICA_NOT_AVAILABLE || -error == Errors.KAFKA_STORAGE_ERROR || -error == Errors.FENCED_LEADER_EPOCH || +if (error == Errors.REPLICA_NOT_AVAILABLE) { +log.debug("Received replica not available error in fetch for partition {}", tp); Review Comment: @kirktrue It's just a debug log, but it's different from the previous log. Is that okay? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java: ## @@ -200,6 +200,9 @@ protected void handleFetchSuccess(final Node fetchTarget, if (partitionData.currentLeader().leaderId() != -1 && partitionData.currentLeader().leaderEpoch() != -1) { partitionsWithUpdatedLeaderInfo.put(partition, new Metadata.LeaderIdAndEpoch( Optional.of(partitionData.currentLeader().leaderId()), Optional.of(partitionData.currentLeader().leaderEpoch(; +} else { +requestMetadataUpdate(metadata, subscriptions, partition); +subscriptions.awaitUpdate(partition); Review Comment: changed ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java: ## @@ -327,21 +327,27 @@ private void handleInitializeErrors(final CompletedFetch completedFetch, final E final long fetchOffset = completedFetch.nextFetchOffset(); if (error == Errors.NOT_LEADER_OR_FOLLOWER || -error == Errors.REPLICA_NOT_AVAILABLE || +error == Errors.FENCED_LEADER_EPOCH) { +log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName()); +requestMetadataUpdate(metadata, subscriptions, tp); +} else if (error == Errors.REPLICA_NOT_AVAILABLE || error == Errors.KAFKA_STORAGE_ERROR || -error == Errors.FENCED_LEADER_EPOCH || error == Errors.OFFSET_NOT_AVAILABLE) { log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName()); requestMetadataUpdate(metadata, subscriptions, tp); +subscriptions.awaitUpdate(tp); Review Comment: changed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14885: fix kafka client connect to the broker that offline from… [kafka]
Stephan14 commented on PR #13531: URL: https://github.com/apache/kafka/pull/13531#issuecomment-2103820670 Hi @divijvaidya, Can you help me to review it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16652: add unit test for ClusterTemplate offering zero ClusterConfig [kafka]
chia7712 commented on code in PR #15862: URL: https://github.com/apache/kafka/pull/15862#discussion_r1596214008 ## core/src/test/java/kafka/test/junit/ClusterTestExtensionsUnitTest.java: ## @@ -18,31 +18,53 @@ package kafka.test.junit; import kafka.test.annotation.ClusterTemplate; +import kafka.test.ClusterGenerator; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.extension.ExtensionContext; -import org.junit.jupiter.api.extension.TestTemplateInvocationContext; -import java.util.function.Consumer; +import java.lang.reflect.Method; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class ClusterTestExtensionsUnitTest { + +static class StubTest { +@ClusterTemplate("cfgFoo") +void testFoo() {} +static void cfgFoo(ClusterGenerator gen) { /* ... */ } + +@ClusterTemplate("") +void testBar() {} + +}; + +private ExtensionContext buildExtensionContext(String methodName) throws Exception { Review Comment: please add `@SuppressWarnings({"unchecked", "rawtypes"})` ## core/src/test/java/kafka/test/junit/ClusterTestExtensionsUnitTest.java: ## @@ -18,31 +18,53 @@ package kafka.test.junit; import kafka.test.annotation.ClusterTemplate; +import kafka.test.ClusterGenerator; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.extension.ExtensionContext; -import org.junit.jupiter.api.extension.TestTemplateInvocationContext; -import java.util.function.Consumer; +import java.lang.reflect.Method; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class ClusterTestExtensionsUnitTest { + +static class StubTest { +@ClusterTemplate("cfgFoo") +void testFoo() {} +static void cfgFoo(ClusterGenerator gen) { /* ... */ } + +@ClusterTemplate("") +void testBar() {} + +}; + +private ExtensionContext buildExtensionContext(String methodName) throws Exception { +ExtensionContext extensionContext = mock(ExtensionContext.class); +Class clazz = StubTest.class; +Method method = clazz.getDeclaredMethod(methodName); +when(extensionContext.getRequiredTestClass()).thenReturn(clazz); +when(extensionContext.getRequiredTestMethod()).thenReturn(method); +return extensionContext; +} @Test @SuppressWarnings("unchecked") Review Comment: this is unused now. ## core/src/test/java/kafka/test/junit/ClusterTestExtensionsUnitTest.java: ## @@ -18,31 +18,53 @@ package kafka.test.junit; import kafka.test.annotation.ClusterTemplate; +import kafka.test.ClusterGenerator; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.extension.ExtensionContext; -import org.junit.jupiter.api.extension.TestTemplateInvocationContext; -import java.util.function.Consumer; +import java.lang.reflect.Method; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class ClusterTestExtensionsUnitTest { + +static class StubTest { +@ClusterTemplate("cfgFoo") +void testFoo() {} +static void cfgFoo(ClusterGenerator gen) { /* ... */ } + +@ClusterTemplate("") +void testBar() {} + +}; + +private ExtensionContext buildExtensionContext(String methodName) throws Exception { +ExtensionContext extensionContext = mock(ExtensionContext.class); +Class clazz = StubTest.class; +Method method = clazz.getDeclaredMethod(methodName); +when(extensionContext.getRequiredTestClass()).thenReturn(clazz); +when(extensionContext.getRequiredTestMethod()).thenReturn(method); +return extensionContext; +} @Test @SuppressWarnings("unchecked") void testProcessClusterTemplate() { -ClusterTestExtensions ext = new ClusterTestExtensions(); -ExtensionContext context = mock(ExtensionContext.class); -Consumer testInvocations = mock(Consumer.class); -ClusterTemplate annot = mock(ClusterTemplate.class); -when(annot.value()).thenReturn("").thenReturn(" "); - -Assertions.assertThrows(IllegalStateException.class, () -> -ext.processClusterTemplate(context, annot, testInvocations) +ClusterTestExtensions clusterTestExtensions = new ClusterTestExtensions(); + +assertEquals("ClusterConfig generator method should provide at least one config.",
[jira] [Resolved] (KAFKA-12947) Replace EasyMock and PowerMock with Mockito for StreamsMetricsImplTest ...
[ https://issues.apache.org/jira/browse/KAFKA-12947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-12947. Resolution: Duplicate this is fixed by https://github.com/apache/kafka/pull/14623 > Replace EasyMock and PowerMock with Mockito for StreamsMetricsImplTest ... > -- > > Key: KAFKA-12947 > URL: https://issues.apache.org/jira/browse/KAFKA-12947 > Project: Kafka > Issue Type: Sub-task >Reporter: YI-CHEN WANG >Assignee: Dalibor Plavcic >Priority: Major > > For Kafka-7438 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]
chia7712 merged PR #14716: URL: https://github.com/apache/kafka/pull/14716 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15821: URL: https://github.com/apache/kafka/pull/15821#discussion_r1596196298 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java: ## @@ -92,6 +94,7 @@ static void generator(ClusterGenerator clusterGenerator) { static AutoCloseable buildConsumers(int numberOfConsumers, Review Comment: Could we have two `buildConsumers` to deal with "assign"/"subscribe" individually? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16643 Fix chaos modifier [kafka]
chia7712 commented on PR #15890: URL: https://github.com/apache/kafka/pull/15890#issuecomment-2103767353 > But I'm confused. Is there anything I can do about it @dongjinleekr had filed a PR (#10428) to be the start. However, the PR gets conflicts now, and not sure whether @dongjinleekr has free cycles to address it. You can file a new PR to address that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR; Validate at least one control record [kafka]
chia7712 commented on code in PR #15912: URL: https://github.com/apache/kafka/pull/15912#discussion_r1596192617 ## raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java: ## @@ -284,6 +284,8 @@ private int validateMemoryRecordAndReturnCount(MemoryRecords memoryRecord) { ); } else if (numberOfRecords == null) { throw new IllegalArgumentException("valueCreator didn't create a batch with the count"); +} else if (numberOfRecords < 1) { Review Comment: Is it possible to add new UT to `BatchAccumulatorTest` for this case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]
chickenchickenlove commented on PR #15573: URL: https://github.com/apache/kafka/pull/15573#issuecomment-2103761393 Thanks for your guideline. It was very helpful to me ♂️♂️♂️ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15307: update/note deprecated configs [kafka]
mjsax commented on PR #14360: URL: https://github.com/apache/kafka/pull/14360#issuecomment-2103760848 Thanks for the PR @Cerchie! Merged to `trunk`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15307: update/note deprecated configs [kafka]
mjsax merged PR #14360: URL: https://github.com/apache/kafka/pull/14360 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15307: update/note deprecated configs [kafka]
mjsax commented on code in PR #14360: URL: https://github.com/apache/kafka/pull/14360#discussion_r1596187852 ## docs/streams/developer-guide/config-streams.html: ## @@ -300,12 +306,12 @@ num.standby.replicasnull - default.windowed.key.serde.inner + default.windowed.key.serde.inner (Deprecated. Use windowed.inner.class.serde instead.) Medium Default serializer/deserializer for the inner class of windowed keys, implementing the Serde interface. null - default.windowed.value.serde.inner + default.windowed.value.serde.inner (Deprecated. Use windowed.inner.class.serde instead.) Medium Default serializer/deserializer for the inner class of windowed values, implementing the Serde interface. null Review Comment: I just see by change, that we use a different formatting for marking a deprecated config: ``` default.dsl.store Low DEPRECATED] The default state store type used by DSL operators. Deprecated in favor of dsl.store.suppliers.class ``` I think we should use a unified formatting -- don't have a preference which one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]
mjsax commented on PR #15573: URL: https://github.com/apache/kafka/pull/15573#issuecomment-2103753531 Thanks for the PR @chickenchickenlove! Merged to `trunk`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16688: Use helper method to shutdown ExecutorService [kafka]
chia7712 commented on PR #15886: URL: https://github.com/apache/kafka/pull/15886#issuecomment-2103753542 agree to @soarez observation. I feel this patch can get merged to save all Kafka developers life. #15891 can revert this patch to observe which timer task gets hanging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]
mjsax merged PR #15573: URL: https://github.com/apache/kafka/pull/15573 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16484) Support to define per broker/controller property by ClusterConfigProperty
[ https://issues.apache.org/jira/browse/KAFKA-16484?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-16484. Fix Version/s: 3.8.0 Resolution: Fixed > Support to define per broker/controller property by ClusterConfigProperty > - > > Key: KAFKA-16484 > URL: https://issues.apache.org/jira/browse/KAFKA-16484 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Assignee: Kuan Po Tseng >Priority: Major > Fix For: 3.8.0 > > > the property set to `ClusterConfigProperty` gets applied to all brokers, and > hence we can't have individual props for each broker to test racks. > > It seems to me we can add new field "id" to `ClusterConfigProperty` to > declare the property should be applied to specific broker (or controller). > the default value is -1 and it should be applied to all nodes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16484: Support to define per broker/controller property by ClusterConfigProperty [kafka]
chia7712 merged PR #15715: URL: https://github.com/apache/kafka/pull/15715 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]
mjsax commented on code in PR #15790: URL: https://github.com/apache/kafka/pull/15790#discussion_r1596180605 ## streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.StateSerdes; + + +public class StoreSerdeInitializer { +static StateSerdes prepareStoreSerde(final StateStoreContext context, final String storeName, + final String changelogTopic, final Serde keySerde, Review Comment: nit formatting. We should have a single parameter per line, not multiple (both line above) -- also below ## streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java: ## @@ -58,8 +60,21 @@ public void addChild(final ProcessorNode child) { public void init(final InternalProcessorContext context) { super.init(context); this.context = context; -keySerializer = prepareKeySerializer(keySerializer, context, this.name()); -valSerializer = prepareValueSerializer(valSerializer, context, this.name()); +try { +keySerializer = prepareKeySerializer(keySerializer, context, this.name()); +} catch (final ConfigException e) { +throw new ConfigException(String.format("Failed to initialize key serdes for sink node %s", name())); Review Comment: Should we somehow preserve `e.getMessage()` -- it seems useful? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java: ## @@ -58,8 +60,21 @@ public void addChild(final ProcessorNode child) { public void init(final InternalProcessorContext context) { super.init(context); this.context = context; -keySerializer = prepareKeySerializer(keySerializer, context, this.name()); -valSerializer = prepareValueSerializer(valSerializer, context, this.name()); +try { +keySerializer = prepareKeySerializer(keySerializer, context, this.name()); Review Comment: I did dig into `prepareKeySerializer` and `prepareValueSerializer` which both use `WrappingNullableUtils#prepareSerializer()` which might call both `context.keySerde()` and `context.valueSerde()`, and thus, I believe we could currently get an exception when trying to get the key serde, even if default key serde is set, but default value serde is not set? I think this internal helper method needs some updated, too. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java: ## @@ -58,8 +60,21 @@ public void addChild(final ProcessorNode child) { public void init(final InternalProcessorContext context) { super.init(context); this.context = context; -keySerializer = prepareKeySerializer(keySerializer, context, this.name()); -valSerializer = prepareValueSerializer(valSerializer, context, this.name()); +try { +keySerializer = prepareKeySerializer(keySerializer, context, this.name()); +} catch (final ConfigException e) { +throw new ConfigException(String.format("Failed to initialize key serdes for sink node %s", name())); +} catch (final StreamsException e) { Review Comment: Why are we catching `StreamsException`? Seems the only exception that might bubble up her is a `ConfigException`? ## streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java: ## @@ -156,8 +156,7 @@ public void testTopologyLevelConfigException() { final ConfigException se =
Re: [PR] KAFKA-16643 Fix chaos modifier [kafka]
gongxuanzhang commented on PR #15890: URL: https://github.com/apache/kafka/pull/15890#issuecomment-2103743672 @chia7712 get it! But I'm confused. Is there anything I can do about it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16677: Replace ClusterType#ALL and ClusterType#DEFAULT by Array [kafka]
chia7712 commented on code in PR #15897: URL: https://github.com/apache/kafka/pull/15897#discussion_r1596178459 ## core/src/test/java/kafka/test/annotation/ClusterTest.java: ## @@ -33,7 +33,7 @@ @Retention(RUNTIME) @TestTemplate public @interface ClusterTest { -Type clusterType() default Type.DEFAULT; +Type[] clusterTypes() default {}; Review Comment: > so wonder if having the full list as default here would be a good complementary change? > It would mean that we have well defined ClusterTypes as introduced by this PR, but every test running on ClusterTest would run for all types, unless specified. That removes lots of clusterTypes = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, it's probably the default behaviour we want for a test so that no ones accidentally forgets to run on any specific cluster type, and makes it easier to maintain (keeping the full list only in this single place, not on every test). Thoughts? I love this idea! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16677: Replace ClusterType#ALL and ClusterType#DEFAULT by Array [kafka]
chia7712 commented on code in PR #15897: URL: https://github.com/apache/kafka/pull/15897#discussion_r1596176943 ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -219,8 +221,8 @@ public static class Builder { private Builder() {} -public Builder setType(Type type) { -this.type = type; +public Builder setTypes(Set types) { +this.types = Collections.unmodifiableSet(new HashSet<>(types)); Review Comment: > so why creating a new HashSet and not simply Collections.unmodifiableSet(types)? We are trying to make `ClusterConfig` immutable. Without copy, users is able to changes the `ClusterConfig`'s `types` by modifying the input `types`. We can simplify the code by `Set.copy` after removing the support of JDK8 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16643 Fix chaos modifier [kafka]
chia7712 commented on PR #15890: URL: https://github.com/apache/kafka/pull/15890#issuecomment-2103727012 @gongxuanzhang This is still a huge patch, which contains a log of changes across all modules. This is a important improvement which can impact all Kafka developers, and so we should separate it to small PRs. Personally, the first patch should include 1) new checkstyle and 2) auto formatter ( see https://issues.apache.org/jira/browse/KAFKA-12572) After first patch gets merged, we can apply the rule to all modules "one by one". Yes, it needs many PRs but they make reviews workable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924) New interfaces and stubbed utility classes for pluggable TaskAssignors. [kafka]
ableegoldman commented on PR #15887: URL: https://github.com/apache/kafka/pull/15887#issuecomment-2103726299 Test failures were unrelated. Merged to trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924) New interfaces and stubbed utility classes for pluggable TaskAssignors. [kafka]
ableegoldman merged PR #15887: URL: https://github.com/apache/kafka/pull/15887 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: fix LogValidatorTest#checkNonCompressed [kafka]
chia7712 commented on PR #15904: URL: https://github.com/apache/kafka/pull/15904#issuecomment-2103720759 @junrao thanks for reviewing this rough patch :( -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16643 Fix chaos modifier [kafka]
gongxuanzhang commented on PR #15890: URL: https://github.com/apache/kafka/pull/15890#issuecomment-2103687645 @chia7712 I add rule into checkstyle.xml you can run ``` ./gradlew checkstyleMain checkstyleTest --continue ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
junrao commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1596055964 ## core/src/main/scala/kafka/server/DelayedFetch.scala: ## @@ -92,7 +92,10 @@ class DelayedFetch( // has just rolled, then the high watermark offset will remain the same but be on the old segment, // which would incorrectly be seen as an instance of Case F. if (endOffset.messageOffset != fetchOffset.messageOffset) { - if (endOffset.onOlderSegment(fetchOffset)) { + if (endOffset.messageOffsetOnly() || fetchOffset.messageOffsetOnly()) { +// This case is to handle the stale high-watermark on the leader until it gets updated with the correct value Review Comment: Perhaps change to sth like the following. "If we don't know the position of the offset on log segments, just pessimistically assume that we only gained 1 byte. This can happen when the high watermark is stale, but should be rare." ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException +* 2. If the message offset is lesser than the local-log-start-offset, then it returns the message-only metadata +* 3. If the message offset is greater than the log-end-offset, then it returns the message-only metadata */ - private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { + private[log] def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { Review Comment: `LocalLog.read()` also calls `convertToOffsetMetadataOrThrow`. ``` else if (startOffset > maxOffsetMetadata.messageOffset) emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns) ``` It seems this could lead to infinite recursion. To avoid that, we could change the above code to avoid calling `convertToOffsetMetadataOrThrow` and return a message-only `LogOffsetMetadata` instead to `emptyFetchDataInfo`. ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException Review Comment: Good point. Since we change the logic such that it's ok not to have the metadata for an offset, we could just skip the call to `checkLogStartOffset`. ## core/src/main/scala/kafka/server/DelayedFetch.scala: ## @@ -92,7 +92,10 @@ class DelayedFetch( // has just rolled, then the high watermark offset will remain the same but be on the old segment, // which would incorrectly be seen as an instance of Case F. if (endOffset.messageOffset != fetchOffset.messageOffset) { - if (endOffset.onOlderSegment(fetchOffset)) { + if (endOffset.messageOffsetOnly() || fetchOffset.messageOffsetOnly()) { Review Comment: `fetchOffset` typically shouldn't be message only. But it doesn't hurt to have the check. ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException +* 2. If the message offset is lesser than the local-log-start-offset, then it returns the message-only metadata Review Comment: lesser => less -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache
[ https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845142#comment-17845142 ] Ayoub Omari commented on KAFKA-4212: My company was looking for such feature earlier this year. For work-around we use a processor+punctuator similar to what [~savulchik] suggests. The problem is that we end up doing this for every KS application in our system. Potential suggestion: In _Topology_ class we can both _addStateStore_ and {_}addProcessor{_}, why not overloading _addStateStore_ with additional TTL parameter and defining the processor+punctuator within it (hence the user wouldn't have to do it himself). If we are unsure how frequently we should punctuate, we can give control to the user (TTL would be a class with additional period field) {code:java} TTL(value: Duration, punctuateInterval: Duration){code} > Add a key-value store that is a TTL persistent cache > > > Key: KAFKA-4212 > URL: https://issues.apache.org/jira/browse/KAFKA-4212 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Priority: Major > Labels: api > > Some jobs needs to maintain as state a large set of key-values for some > period of time. I.e. they need to maintain a TTL cache of values potentially > larger than memory. > Currently Kafka Streams provides non-windowed and windowed key-value stores. > Neither is an exact fit to this use case. > The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as > required, but does not support expiration. The TTL option of RocksDB is > explicitly not used. > The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment > dropping, but it stores multiple items per key, based on their timestamp. > But this store can be repurposed as a cache by fetching the items in reverse > chronological order and returning the first item found. > KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here > we desire a variable-capacity memory-overflowing TTL caching store. > Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be > useful to have an official and proper TTL cache API and implementation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]
junrao commented on code in PR #15859: URL: https://github.com/apache/kafka/pull/15859#discussion_r1595995650 ## raft/src/test/java/org/apache/kafka/raft/internals/VoterSetTest.java: ## @@ -79,20 +81,79 @@ void testRemoveVoter() { ); } +@Test +void testIsVoterWithDirectoryId() { +Map aVoterMap = voterMap(Arrays.asList(1, 2, 3), true); +VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap)); + +assertTrue(voterSet.isVoter(aVoterMap.get(1).voterKey())); +assertFalse(voterSet.isVoter(ReplicaKey.of(1, Optional.of(Uuid.randomUuid(); +assertFalse(voterSet.isVoter(ReplicaKey.of(1, Optional.empty(; +assertFalse( +voterSet.isVoter(ReplicaKey.of(2, aVoterMap.get(1).voterKey().directoryId())) +); +assertFalse( +voterSet.isVoter(ReplicaKey.of(4, aVoterMap.get(1).voterKey().directoryId())) +); +assertFalse(voterSet.isVoter(ReplicaKey.of(4, Optional.empty(; +} + +@Test +void testIsVoterWithoutDirectoryId() { +Map aVoterMap = voterMap(Arrays.asList(1, 2, 3), false); +VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap)); + +assertTrue(voterSet.isVoter(ReplicaKey.of(1, Optional.empty(; +assertTrue(voterSet.isVoter(ReplicaKey.of(1, Optional.of(Uuid.randomUuid(); +assertFalse(voterSet.isVoter(ReplicaKey.of(4, Optional.of(Uuid.randomUuid(); +assertFalse(voterSet.isVoter(ReplicaKey.of(4, Optional.empty(; +} + +@Test +void testStandaloneAndIsOnlyVoter() { Review Comment: testIsOnlyVoterInStandalone? ## raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java: ## @@ -73,23 +88,35 @@ private QuorumState buildQuorumState(Set voters) { ); } -@Test -public void shouldRecordVoterQuorumState() { -QuorumState state = buildQuorumState(Utils.mkSet(localId, 1, 2)); +@ParameterizedTest +@ValueSource(shorts = {0, 1}) +public void shouldRecordVoterQuorumState(short kraftVersion) { +boolean withDirectoryId = kraftVersion > 0; +Map voterMap = VoterSetTest.voterMap(Utils.mkSet(1, 2), withDirectoryId); +voterMap.put(localId, VoterSetTest.voterNode(ReplicaKey.of(localId, Optional.of(localDirectoryId; Review Comment: Should we use an empty directory id if kraftVersion is 0? ## raft/src/test/java/org/apache/kafka/raft/internals/VoterSetTest.java: ## @@ -79,20 +81,79 @@ void testRemoveVoter() { ); } +@Test +void testIsVoterWithDirectoryId() { +Map aVoterMap = voterMap(Arrays.asList(1, 2, 3), true); +VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap)); + +assertTrue(voterSet.isVoter(aVoterMap.get(1).voterKey())); +assertFalse(voterSet.isVoter(ReplicaKey.of(1, Optional.of(Uuid.randomUuid(); +assertFalse(voterSet.isVoter(ReplicaKey.of(1, Optional.empty(; +assertFalse( +voterSet.isVoter(ReplicaKey.of(2, aVoterMap.get(1).voterKey().directoryId())) +); +assertFalse( +voterSet.isVoter(ReplicaKey.of(4, aVoterMap.get(1).voterKey().directoryId())) +); +assertFalse(voterSet.isVoter(ReplicaKey.of(4, Optional.empty(; +} + +@Test +void testIsVoterWithoutDirectoryId() { +Map aVoterMap = voterMap(Arrays.asList(1, 2, 3), false); +VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap)); + +assertTrue(voterSet.isVoter(ReplicaKey.of(1, Optional.empty(; +assertTrue(voterSet.isVoter(ReplicaKey.of(1, Optional.of(Uuid.randomUuid(); +assertFalse(voterSet.isVoter(ReplicaKey.of(4, Optional.of(Uuid.randomUuid(); +assertFalse(voterSet.isVoter(ReplicaKey.of(4, Optional.empty(; +} + +@Test +void testStandaloneAndIsOnlyVoter() { +Map aVoterMap = voterMap(Arrays.asList(1), true); +VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap)); + +assertTrue(voterSet.isOnlyVoter(aVoterMap.get(1).voterKey())); +assertFalse(voterSet.isOnlyVoter(ReplicaKey.of(1, Optional.of(Uuid.randomUuid(); +assertFalse(voterSet.isOnlyVoter(ReplicaKey.of(1, Optional.empty(; +assertFalse( +voterSet.isOnlyVoter(ReplicaKey.of(4, aVoterMap.get(1).voterKey().directoryId())) +); +assertFalse(voterSet.isOnlyVoter(ReplicaKey.of(4, Optional.empty(; +} + +@Test +void testNotStandaloneAndIsOnlyVoter() { Review Comment: testIsOnlyVoterInNonStandalone? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15649: Handle directory failure timeout [kafka]
soarez commented on code in PR #15697: URL: https://github.com/apache/kafka/pull/15697#discussion_r1593094798 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2466,7 +2467,6 @@ class ReplicaManager(val config: KafkaConfig, s"for partitions ${partitionsWithOfflineFutureReplica.mkString(",")} because they are in the failed log directory $dir.") } // retrieve the UUID here because logManager.handleLogDirFailure handler removes it Review Comment: We should move or remove this comment now that the `uuid` declaration has been moved up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924) New interfaces and stubbed utility classes for pluggable TaskAssignors. [kafka]
ableegoldman commented on code in PR #15887: URL: https://github.com/apache/kafka/pull/15887#discussion_r1595980303 ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java: ## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment; + +import java.util.List; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.streams.StreamsConfig; + +/** + * Assignment related configs for the Kafka Streams {@link TaskAssignor}. + */ +public class AssignmentConfigs { +private final long acceptableRecoveryLag; +private final int maxWarmupReplicas; +private final int nonOverlapCost; +private final int numStandbyReplicas; +private final long probingRebalanceIntervalMs; +private final List rackAwareAssignmentTags; +private final int trafficCost; +private final String assignmentStrategy; + +public AssignmentConfigs(final StreamsConfig configs) { +acceptableRecoveryLag = configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG); +maxWarmupReplicas = configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG); +numStandbyReplicas = configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG); +probingRebalanceIntervalMs = configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG); +rackAwareAssignmentTags = configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG); +trafficCost = configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG); +nonOverlapCost = configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG); +assignmentStrategy = configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG); +} + +public AssignmentConfigs(final Long acceptableRecoveryLag, + final Integer maxWarmupReplicas, Review Comment: nit: fix alignment for this and below params ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java: ## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment; + +import java.util.List; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.streams.StreamsConfig; + +/** + * Assignment related configs for the Kafka Streams {@link TaskAssignor}. + */ +public class AssignmentConfigs { +private final long acceptableRecoveryLag; +private final int maxWarmupReplicas; +private final int nonOverlapCost; +private final int numStandbyReplicas; +private final long probingRebalanceIntervalMs; +private final List rackAwareAssignmentTags; +private final int trafficCost; +private final String assignmentStrategy; + +public AssignmentConfigs(final StreamsConfig configs) { +acceptableRecoveryLag = configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG); Review Comment: nit: just call the below constructor ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java: ## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1595963491 ## raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java: ## @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.feature.SupportedVersionRange; +import org.apache.kafka.common.message.VotersRecord; +import org.apache.kafka.common.utils.Utils; + +/** + * A type for representing the set of voters for a topic partition. + * + * It encapsulates static information like a voter's endpoint and their supported kraft.version. + * + * It providees functionality for converting to and from {@code VotersRecord} and for converting + * from the static configuration. + */ +final public class VoterSet { +private final Map voters; Review Comment: I think I already updated the KIP to match this comment but I'll take a look again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16695: Improve expired poll logging [kafka]
ableegoldman commented on code in PR #15909: URL: https://github.com/apache/kafka/pull/15909#discussion_r1595957783 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -193,11 +193,12 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) { } pollTimer.update(currentTimeMs); if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) { -logger.warn("Consumer poll timeout has expired. This means the time between " + +logger.warn("Consumer poll timeout has expired, exceeded by {} ms. This means the time between " + Review Comment: IIUC this is what gets logged when the _heartbeat thread_ notices the consumer has failed to poll in time and dropped out of the group -- so the "time exceeded" is just going to be roughly the max poll interval + the heartbeat interval, no? I do think it's a great idea to log the amount of time by which the max poll interval was exceeded, but imo the more useful information is how long after the max poll interval the consumer took to actually hit poll again, not how long the heartbeat thread took to notice it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR; Validate at least one control record [kafka]
junrao commented on code in PR #15912: URL: https://github.com/apache/kafka/pull/15912#discussion_r1595930091 ## raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java: ## @@ -161,8 +161,8 @@ public VotersRecord toVotersRecord(short version) { * An overlapping majority means that for all majorities in {@code this} set of voters and for * all majority in {@code that} set of voters, they have at least one voter in common. * - * If this function returns true is means that one of the voter set commits an offset, it means - * that the other voter set cannot commit a conflicting offset. + * If this function returns true, it means that if one of the set of voters commits an offset, the + * the other set of voters cannot commit a conflicting offset. Review Comment: the the other => the other -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
junrao commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1595923193 ## raft/src/main/java/org/apache/kafka/raft/internals/TreeMapLogHistory.java: ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.util.NavigableMap; +import java.util.Optional; +import java.util.TreeMap; +import java.util.Map; + +/** + * A implementation for {@code LogHistory} which uses a red-black tree to store values sorted by offset. + */ +final public class TreeMapLogHistory implements LogHistory { +private final NavigableMap history = new TreeMap<>(); + +@Override +public void addAt(long offset, T value) { +if (offset < 0) { +throw new IllegalArgumentException( +String.format("Next offset %d must be greater than or equal to 0", offset) +); +} + +Map.Entry lastEntry = history.lastEntry(); +if (lastEntry != null && offset <= lastEntry.getKey()) { +throw new IllegalArgumentException( +String.format("Next offset %d must be greater than the last offset %d", offset, lastEntry.getKey()) +); +} + +history.put(offset, value); +} + +@Override +public Optional valueAtOrBefore(long offset) { +return Optional.ofNullable(history.floorEntry(offset)).map(Map.Entry::getValue); +} + +@Override +public Optional> lastEntry() { +return Optional.ofNullable(history.lastEntry()).map(entry -> new Entry<>(entry.getKey(), entry.getValue())); +} + +@Override +public void truncateNewEntries(long endOffset) { +history.tailMap(endOffset, true).clear(); +} + +@Override +public void truncateOldEntries(long startOffset) { +NavigableMap lesserValues = history.headMap(startOffset, true); +while (lesserValues.size() > 1) { Review Comment: Thanks for the explanation. I can see that it works now since `lesserValues.size() > 1` is testing greater than, instead of greater than or equal to. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
junrao commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1595922267 ## raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java: ## @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.feature.SupportedVersionRange; +import org.apache.kafka.common.message.VotersRecord; +import org.apache.kafka.common.utils.Utils; + +/** + * A type for representing the set of voters for a topic partition. + * + * It encapsulates static information like a voter's endpoint and their supported kraft.version. + * + * It providees functionality for converting to and from {@code VotersRecord} and for converting + * from the static configuration. + */ +final public class VoterSet { +private final Map voters; Review Comment: This approach is fine, but we need to update the KIP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1595916951 ## raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java: ## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.util.Optional; + +/** + * A type for storing the historical value of the set of voters. + * + * This type can be use to keep track in-memory the sets for voters stored in the latest snapshot + * and log. This is useful when generating a new snapshot at a given offset or when evaulating + * the latest set of voters. + */ +final public class VoterSetHistory { +private final Optional staticVoterSet; +private final LogHistory votersHistory = new TreeMapLogHistory<>(); + +VoterSetHistory(Optional staticVoterSet) { +this.staticVoterSet = staticVoterSet; +} + +/** + * Add a new value at a given offset. + * + * The provided {@code offset} must be greater than or equal to 0 and must be greater than the + * offset of all previous calls to this method. + * + * @param offset the offset + * @param value the value to store Review Comment: Fixed. ## raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java: ## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.util.Optional; + +/** + * A type for storing the historical value of the set of voters. + * + * This type can be use to keep track in-memory the sets for voters stored in the latest snapshot + * and log. This is useful when generating a new snapshot at a given offset or when evaulating + * the latest set of voters. + */ +final public class VoterSetHistory { +private final Optional staticVoterSet; +private final LogHistory votersHistory = new TreeMapLogHistory<>(); + +VoterSetHistory(Optional staticVoterSet) { +this.staticVoterSet = staticVoterSet; +} + +/** + * Add a new value at a given offset. + * + * The provided {@code offset} must be greater than or equal to 0 and must be greater than the + * offset of all previous calls to this method. + * + * @param offset the offset + * @param value the value to store + * @throws IllegalArgumentException if the offset is not greater than all previous offsets + */ +public void addAt(long offset, VoterSet voters) { +Optional> lastEntry = votersHistory.lastEntry(); +if (lastEntry.isPresent() && lastEntry.get().offset() >= 0) { +// If the last voter set comes from the replicated log then the majorities must overlap. +// This ignores the static voter set and the bootstrapped voter set since they come from +// the configuration and the KRaft leader never guaranteed that they are the same across +// all replicas. +VoterSet lastVoterSet = lastEntry.get().value(); +if (!lastVoterSet.hasOverlappingMajority(voters)) { +throw new IllegalArgumentException( +String.format( +"Last voter set %s doesn't have an overlapping majority with the new voter set %s", +lastVoterSet, +voters +) +); +} +} + +
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1595915596 ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java: ## @@ -728,83 +730,114 @@ public static MemoryRecords withLeaderChangeMessage( ByteBuffer buffer, LeaderChangeMessage leaderChangeMessage ) { -writeLeaderChangeMessage(buffer, initialOffset, timestamp, leaderEpoch, leaderChangeMessage); -buffer.flip(); -return MemoryRecords.readableRecords(buffer); +try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder( +initialOffset, +timestamp, +leaderEpoch, +buffer +) +) { +builder.appendLeaderChangeMessage(timestamp, leaderChangeMessage); +return builder.build(); +} } -private static void writeLeaderChangeMessage( -ByteBuffer buffer, +public static MemoryRecords withSnapshotHeaderRecord( long initialOffset, long timestamp, int leaderEpoch, -LeaderChangeMessage leaderChangeMessage +ByteBuffer buffer, +SnapshotHeaderRecord snapshotHeaderRecord ) { -try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder( -buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, -TimestampType.CREATE_TIME, initialOffset, timestamp, -RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, -false, true, leaderEpoch, buffer.capacity()) +try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder( +initialOffset, +timestamp, +leaderEpoch, +buffer +) ) { -builder.appendLeaderChangeMessage(timestamp, leaderChangeMessage); +builder.appendSnapshotHeaderMessage(timestamp, snapshotHeaderRecord); +return builder.build(); } } -public static MemoryRecords withSnapshotHeaderRecord( +public static MemoryRecords withSnapshotFooterRecord( long initialOffset, long timestamp, int leaderEpoch, ByteBuffer buffer, -SnapshotHeaderRecord snapshotHeaderRecord +SnapshotFooterRecord snapshotFooterRecord ) { -writeSnapshotHeaderRecord(buffer, initialOffset, timestamp, leaderEpoch, snapshotHeaderRecord); -buffer.flip(); -return MemoryRecords.readableRecords(buffer); +try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder( +initialOffset, +timestamp, +leaderEpoch, +buffer +) +) { +builder.appendSnapshotFooterMessage(timestamp, snapshotFooterRecord); +return builder.build(); +} } -private static void writeSnapshotHeaderRecord( -ByteBuffer buffer, +public static MemoryRecords withKRaftVersionRecord( long initialOffset, long timestamp, int leaderEpoch, -SnapshotHeaderRecord snapshotHeaderRecord +ByteBuffer buffer, +KRaftVersionRecord kraftVersionRecord ) { -try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder( -buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, -TimestampType.CREATE_TIME, initialOffset, timestamp, -RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, -false, true, leaderEpoch, buffer.capacity()) +try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder( +initialOffset, +timestamp, +leaderEpoch, +buffer +) ) { -builder.appendSnapshotHeaderMessage(timestamp, snapshotHeaderRecord); +builder.appendKRaftVersionMessage(timestamp, kraftVersionRecord); +return builder.build(); } } -public static MemoryRecords withSnapshotFooterRecord( +public static MemoryRecords withVotersRecord( long initialOffset, long timestamp, int leaderEpoch, ByteBuffer buffer, -SnapshotFooterRecord snapshotFooterRecord +VotersRecord votersRecord ) { -writeSnapshotFooterRecord(buffer, initialOffset, timestamp, leaderEpoch, snapshotFooterRecord); -buffer.flip(); -return MemoryRecords.readableRecords(buffer); +try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder( +initialOffset, +timestamp, +leaderEpoch, +buffer +) +) { +builder.appendVotersMessage(timestamp, votersRecord); +return builder.build(); +} } -private static void
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1595916619 ## raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java: ## @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.feature.SupportedVersionRange; +import org.apache.kafka.common.message.VotersRecord; +import org.apache.kafka.common.utils.Utils; + +/** + * A type for representing the set of voters for a topic partition. + * + * It encapsulates static information like a voter's endpoint and their supported kraft.version. + * + * It providees functionality for converting to and from {@code VotersRecord} and for converting Review Comment: Fixed. ## raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java: ## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.util.Optional; + +/** + * A type for storing the historical value of the set of voters. + * + * This type can be use to keep track in-memory the sets for voters stored in the latest snapshot Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1595916355 ## raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java: ## @@ -245,6 +255,42 @@ private void appendControlMessage(Function valueCreat } } +private int validateMemoryRecordAndReturnCount(MemoryRecords memoryRecord) { +// Confirm that it is at most one batch and it is a control record Review Comment: Yeah. I fixed the comment and the implementation. ## raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java: ## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.util.Optional; +import java.util.OptionalLong; +import org.apache.kafka.common.message.KRaftVersionRecord; +import org.apache.kafka.common.message.VotersRecord; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.raft.Batch; +import org.apache.kafka.raft.ControlRecord; +import org.apache.kafka.raft.Isolation; +import org.apache.kafka.raft.LogFetchInfo; +import org.apache.kafka.raft.ReplicatedLog; +import org.apache.kafka.server.common.serialization.RecordSerde; +import org.apache.kafka.snapshot.RawSnapshotReader; +import org.apache.kafka.snapshot.RecordsSnapshotReader; +import org.apache.kafka.snapshot.SnapshotReader; +import org.slf4j.Logger; + +/** + * The KRaft state machine for tracking control records in the topic partition. + * + * This type keeps track of changes to the finalized kraft.version and the sets of voters between + * the latest snasphot and the log end offset. + * + * The are two actors/threads for this type. One is the KRaft driver which indirectly call a lot of + * the public methods. The other are the callers of {@code RaftClient::createSnapshot} which + * indirectly call {@code voterSetAtOffset} and {@code kraftVersionAtOffset} when freezing a snapshot. + */ +final public class KRaftControlRecordStateMachine { +private final ReplicatedLog log; +private final RecordSerde serde; +private final BufferSupplier bufferSupplier; +private final Logger logger; +private final int maxBatchSizeBytes; + +// These objects are synchronized using their respective object monitor. The two actors +// are the KRaft driver when calling updateState and the RaftClient callers when freezing +// snapshots +private final VoterSetHistory voterSetHistory; +private final LogHistory kraftVersionHistory = new TreeMapLogHistory<>(); + +// This synchronization is enough because +// 1. The write operation updateState only sets the value without reading it and updates to +// voterSetHistory or kraftVersionHistory are done before setting the nextOffset +// +// 2. The read operations lastVoterSet, voterSetAtOffset and kraftVersionAtOffset read +// the nextOffset first before reading voterSetHistory or kraftVersionHistory +private volatile long nextOffset = 0; + +/** + * Constructs an internal log listener + * + * @param staticVoterSet the set of voter statically configured + * @param log the on disk topic partition + * @param serde the record decoder for data records + * @param bufferSupplier the supplier of byte buffers + * @param maxBatchSizeBytes the maximum size of record batch + * @param logContext the log context + */ +public KRaftControlRecordStateMachine( +Optional staticVoterSet, +ReplicatedLog log, +RecordSerde serde, +BufferSupplier bufferSupplier, +int maxBatchSizeBytes, +LogContext logContext +) { +this.log = log; +this.voterSetHistory = new VoterSetHistory(staticVoterSet); +this.serde = serde; +this.bufferSupplier = bufferSupplier; +this.maxBatchSizeBytes = maxBatchSizeBytes; +this.logger = logContext.logger(this.getClass()); +} + +/** + * Must be called whenever the {@code log} has changed. + */ +public void updateState() { +maybeLoadSnapshot(); +maybeLoadLog(); +
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1595916070 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -370,8 +368,52 @@ private void maybeFireLeaderChange() { } } -@Override -public void initialize() { +public void initialize( +Map voterAddresses, +String listenerName, +QuorumStateStore quorumStateStore, +Metrics metrics +) { +partitionState = new KRaftControlRecordStateMachine( +Optional.of(VoterSet.fromInetSocketAddresses(listenerName, voterAddresses)), +log, +serde, +BufferSupplier.create(), +MAX_BATCH_SIZE_BYTES, +logContext +); +// Read the entire log +logger.info("Reading KRaft snapshot and log as part of the initialization"); +partitionState.updateState(); + +requestManager = new RequestManager( +partitionState.lastVoterSet().voterIds(), +quorumConfig.retryBackoffMs(), +quorumConfig.requestTimeoutMs(), +random +); + +quorum = new QuorumState( +nodeId, +partitionState.lastVoterSet().voterIds(), +quorumConfig.electionTimeoutMs(), +quorumConfig.fetchTimeoutMs(), +quorumStateStore, +time, +logContext, +random +); + +kafkaRaftMetrics = new KafkaRaftMetrics(metrics, "raft", quorum); +// All Raft voters are statically configured and known at startup +// so there are no unknown voter connections. Report this metric as 0. +kafkaRaftMetrics.updateNumUnknownVoterConnections(0); + +VoterSet lastVoterSet = partitionState.lastVoterSet(); Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1595915829 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -1470,6 +1524,10 @@ private boolean handleFetchSnapshotResponse( quorum.leaderIdOrSentinel() ); +// This will aways reload the snapshot because the internal next offset Review Comment: Fixed. ## raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java: ## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.util.Optional; +import java.util.OptionalLong; +import org.apache.kafka.common.message.KRaftVersionRecord; +import org.apache.kafka.common.message.VotersRecord; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.raft.Batch; +import org.apache.kafka.raft.ControlRecord; +import org.apache.kafka.raft.Isolation; +import org.apache.kafka.raft.LogFetchInfo; +import org.apache.kafka.raft.ReplicatedLog; +import org.apache.kafka.server.common.serialization.RecordSerde; +import org.apache.kafka.snapshot.RawSnapshotReader; +import org.apache.kafka.snapshot.RecordsSnapshotReader; +import org.apache.kafka.snapshot.SnapshotReader; +import org.slf4j.Logger; + +/** + * The KRaft state machine for tracking control records in the topic partition. + * + * This type keeps track of changes to the finalized kraft.version and the sets of voters between + * the latest snasphot and the log end offset. + * + * The are two actors/threads for this type. One is the KRaft driver which indirectly call a lot of Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Add demo template for transactional client [kafka]
k-raina opened a new pull request, #15913: URL: https://github.com/apache/kafka/pull/15913 This is example code template for Transactional Client. This code assumes that new Exception types have already been implemented. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR; Validate at least one control record [kafka]
jsancio opened a new pull request, #15912: URL: https://github.com/apache/kafka/pull/15912 Validate that a control batch in the batch accumulator has at least one control record. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]
C0urante merged PR #6934: URL: https://github.com/apache/kafka/pull/6934 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1595910078 ## raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java: ## @@ -145,9 +147,7 @@ private Optional> nextBatch() { ); } -if (!batch.records().isEmpty()) { -return Optional.of(batch); -} +return Optional.of(batch); Review Comment: Yes. Good catch. We need an `if` statement. I updated the java doc too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16584) Make log processing summary configurable or debug
[ https://issues.apache.org/jira/browse/KAFKA-16584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845111#comment-17845111 ] Matthias J. Sax commented on KAFKA-16584: - I am on the PMC and can help you. :) After you wiki account was created, please share you wiki id and we can give you write access to the Kafka wiki space, so you can prepare a KIP. The goal of this ticket is, to add a new config for the logging interval, so it should not be controversial. An example of another already approved KIP that also added a now config is [https://cwiki.apache.org/confluence/display/KAFKA/KIP-811%3A+Add+config+repartition.purge.interval.ms+to+Kafka+Streams] – This should help you to write your KIP for this ticket. > Make log processing summary configurable or debug > - > > Key: KAFKA-16584 > URL: https://issues.apache.org/jira/browse/KAFKA-16584 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.2 >Reporter: Andras Hatvani >Assignee: dujian >Priority: Major > Labels: needs-kip, newbie > > Currently *every two minutes for every stream thread* statistics will be > logged on INFO log level. > {code} > 2024-04-18T09:18:23.790+02:00 INFO 33178 --- [service] [-StreamThread-1] > o.a.k.s.p.internals.StreamThread : stream-thread > [service-149405a3-c7e3-4505-8bbd-c3bff226b115-StreamThread-1] Processed 0 > total records, ran 0 punctuators, and committed 0 total tasks since the last > update {code} > This is absolutely unnecessary and even harmful since it fills the logs and > thus storage space with unwanted and useless data. Otherwise the INFO logs > are useful and helpful, therefore it's not an option to raise the log level > to WARN. > Please make the logProcessingSummary > * either to a DEBUG level log or > * make it configurable so that it can be disabled. > This is the relevant code: > https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1073 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1595898159 ## raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java: ## @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.feature.SupportedVersionRange; +import org.apache.kafka.common.message.VotersRecord; +import org.apache.kafka.common.utils.Utils; + +/** + * A type for representing the set of voters for a topic partition. + * + * It encapsulates static information like a voter's endpoint and their supported kraft.version. + * + * It providees functionality for converting to and from {@code VotersRecord} and for converting + * from the static configuration. + */ +final public class VoterSet { +private final Map voters; Review Comment: Yes. That is a sufficient condition but I decided to implement a stricter condition that the ids need be unique. This means that with this implementation, if they want to replace a directory id, they need to first remove the failed replica key (id, uuid) and then add the new replica key (id, uuid'). There are two reasons why I decide to keep the ids unique: 1. It makes it easier and safer to implement the feature for automatically having new controllers join the quorum. I was concerned that the set of voters would become unavailable if there was a race where new directory id kept joining the cluster. In this example the cluster would be come unavailable and the user would not be able to mitigate it: [(1, uuid1), (2, uuid2), (3, uuid3), (3, uuid3'), (3, uuid3''), (3, uuid3''')]. 2. Connection management is easier to implement. A lot of code in Kafka (e.g. `o.a.k.c.Node`) assumes that ids are unique and they can be used, along with the listener name, to identify an endpoint. I think it would be a big effort to extend this to identify endpoint by the replica key. Another example is `NodeEndpoints` in `FetchResponse`. That map is index by replica id. The main disadvantage of this implementation is that it would make it difficult to design and implement the ability for dynamically "altering the metadata/kraft log directory" like Kafka does for regular topic partitions. But I think we can discuss that if we ever want to implement that feature in the future. ## raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java: ## @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.feature.SupportedVersionRange; +import
Re: [PR] KAFKA-16677: Replace ClusterType#ALL and ClusterType#DEFAULT by Array [kafka]
lianetm commented on PR #15897: URL: https://github.com/apache/kafka/pull/15897#issuecomment-2103291571 Thanks for the patch @FrankYang0529, nice twist. Left some comments. Also there are examples for the `ClusterTest` annotation in the [core README file](https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/test/junit/README.md) so it should be updated too to reflect the changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16677: Replace ClusterType#ALL and ClusterType#DEFAULT by Array [kafka]
lianetm commented on code in PR #15897: URL: https://github.com/apache/kafka/pull/15897#discussion_r1595878217 ## core/src/test/java/kafka/test/annotation/ClusterTest.java: ## @@ -33,7 +33,7 @@ @Retention(RUNTIME) @TestTemplate public @interface ClusterTest { -Type clusterType() default Type.DEFAULT; +Type[] clusterTypes() default {}; Review Comment: this bit is the one I'm still going around. I totally like the direction of the PR, removing the type ALL and DEFAULT , but then it brings the need to specify the full list of cluster types on every test that needs them all, so wonder if having the full list as default here would be a good complementary change? It would mean that we have well defined `ClusterTypes` as introduced by this PR, but every test running on `ClusterTest` would run for all types, unless specified. That removes lots of `clusterTypes = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}`, it's probably the default behaviour we want for a test so that no ones accidentally forgets to run on any specific cluster type, and makes it easier to maintain (keeping the full list only in this single place, not on every test). Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16677: Replace ClusterType#ALL and ClusterType#DEFAULT by Array [kafka]
lianetm commented on code in PR #15897: URL: https://github.com/apache/kafka/pull/15897#discussion_r1595878217 ## core/src/test/java/kafka/test/annotation/ClusterTest.java: ## @@ -33,7 +33,7 @@ @Retention(RUNTIME) @TestTemplate public @interface ClusterTest { -Type clusterType() default Type.DEFAULT; +Type[] clusterTypes() default {}; Review Comment: this bit is the one I'm still going around. I totally like the direction of the PR, removing the type ALL and DEFAULT , but then it brings the need to specify the full list of cluster types on every test that needs them all, so wonder if having the full list as default here would be a good complementary change? It would mean that we have well defined `ClusterTypes` as introduced by this PR, but every test running on `ClusterTest` would run for all types, unless specified. That removes lots of `clusterTypes = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}`, it's probably the default behaviour we want for a test so that no ones accidentally forgets to run on any specific cluster type, and makes it easier to maintain (keeping the full list only in this single place, not on every test). Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1595871101 ## raft/src/main/java/org/apache/kafka/raft/internals/TreeMapLogHistory.java: ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.util.NavigableMap; +import java.util.Optional; +import java.util.TreeMap; +import java.util.Map; + +/** + * A implementation for {@code LogHistory} which uses a red-black tree to store values sorted by offset. + */ +final public class TreeMapLogHistory implements LogHistory { +private final NavigableMap history = new TreeMap<>(); + +@Override +public void addAt(long offset, T value) { +if (offset < 0) { +throw new IllegalArgumentException( +String.format("Next offset %d must be greater than or equal to 0", offset) +); +} + +Map.Entry lastEntry = history.lastEntry(); +if (lastEntry != null && offset <= lastEntry.getKey()) { +throw new IllegalArgumentException( +String.format("Next offset %d must be greater than the last offset %d", offset, lastEntry.getKey()) +); +} + +history.put(offset, value); +} + +@Override +public Optional valueAtOrBefore(long offset) { +return Optional.ofNullable(history.floorEntry(offset)).map(Map.Entry::getValue); +} + +@Override +public Optional> lastEntry() { +return Optional.ofNullable(history.lastEntry()).map(entry -> new Entry<>(entry.getKey(), entry.getValue())); +} + +@Override +public void truncateNewEntries(long endOffset) { +history.tailMap(endOffset, true).clear(); +} + +@Override +public void truncateOldEntries(long startOffset) { +NavigableMap lesserValues = history.headMap(startOffset, true); +while (lesserValues.size() > 1) { Review Comment: It should be implemented. Take a look at this test: ```java @Test void testTrimPrefixTo() { TreeMapLogHistory history = new TreeMapLogHistory<>(); history.addAt(100, "100"); history.addAt(200, "200"); ... history.truncateOldEntries(101); assertEquals(Optional.empty(), history.valueAtOrBefore(99)); assertEquals(Optional.of("100"), history.valueAtOrBefore(100)); ... history.truncateOldEntries(200); assertEquals(Optional.empty(), history.valueAtOrBefore(199)); assertEquals(Optional.of("200"), history.valueAtOrBefore(200)); } ``` For `truncateOldEntries(101)` there is one entry between `(0, 101)` so the entry at 100 is not deleted. For `truncateOldEntries(200)` there are two entries between `(0, 200)` so the oldest one at 100 is delete but the newest one at 200 is kept. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16677: Replace ClusterType#ALL and ClusterType#DEFAULT by Array [kafka]
lianetm commented on code in PR #15897: URL: https://github.com/apache/kafka/pull/15897#discussion_r1595849504 ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -219,8 +221,8 @@ public static class Builder { private Builder() {} -public Builder setType(Type type) { -this.type = type; +public Builder setTypes(Set types) { +this.types = Collections.unmodifiableSet(new HashSet<>(types)); Review Comment: `Collections.unmodifiableSet` will create a new object from the provided argument, so why creating a new `HashSet` and not simply `Collections.unmodifiableSet(types)`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15905) Restarts of MirrorCheckpointTask should not permanently interrupt offset translation
[ https://issues.apache.org/jira/browse/KAFKA-15905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar reassigned KAFKA-15905: - Assignee: Edoardo Comar > Restarts of MirrorCheckpointTask should not permanently interrupt offset > translation > > > Key: KAFKA-15905 > URL: https://issues.apache.org/jira/browse/KAFKA-15905 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 3.6.0 >Reporter: Greg Harris >Assignee: Edoardo Comar >Priority: Major > > Executive summary: When the MirrorCheckpointTask restarts, it loses the state > of checkpointsPerConsumerGroup, which limits offset translation to records > mirrored after the latest restart. > For example, if 1000 records are mirrored and the OffsetSyncs are read by > MirrorCheckpointTask, the emitted checkpoints are cached, and translation can > happen at the ~500th record. If MirrorCheckpointTask restarts, and 1000 more > records are mirrored, translation can happen at the ~1500th record, but no > longer at the ~500th record. > Context: > Before KAFKA-13659, MM2 made translation decisions based on the > incompletely-initialized OffsetSyncStore, and the checkpoint could appear to > go backwards temporarily during restarts. To fix this, we forced the > OffsetSyncStore to initialize completely before translation could take place, > ensuring that the latest OffsetSync had been read, and thus providing the > most accurate translation. > Before KAFKA-14666, MM2 translated offsets only off of the latest OffsetSync. > Afterwards, an in-memory sparse cache of historical OffsetSyncs was kept, to > allow for translation of earlier offsets. This came with the caveat that the > cache's sparseness allowed translations to go backwards permanently. To > prevent this behavior, a cache of the latest Checkpoints was kept in the > MirrorCheckpointTask#checkpointsPerConsumerGroup variable, and offset > translation remained restricted to the fully-initialized OffsetSyncStore. > Effectively, the MirrorCheckpointTask ensures that it translates based on an > OffsetSync emitted during it's lifetime, to ensure that no previous > MirrorCheckpointTask emitted a later sync. If we can read the checkpoints > emitted by previous generations of MirrorCheckpointTask, we can still ensure > that checkpoints are monotonic, while allowing translation further back in > history. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
gharris1727 commented on PR #15910: URL: https://github.com/apache/kafka/pull/15910#issuecomment-2103188026 Hey @edoardocomar and @prestona thanks for the PR! One of the reasons I thought this might require a KIP is because it requires additional permissions that the current MM2 doesn't need: If an operator has already configured ACLs such that MM2 has write permissions for the checkpoints topic but no read permissions, it could be operating today and then failing after an upgrade with this change. I don't know if that is a common configuration or even a recommended one, but it does seem possible in the wild. Perhaps this can be configuration-less and backwards-compatible if we fallback to the old behavior if reading the checkpoints fails for any reason, including insufficient permissions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]
jsancio commented on code in PR #15859: URL: https://github.com/apache/kafka/pull/15859#discussion_r1595796530 ## raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java: ## @@ -850,14 +846,10 @@ public String toString() { } private static class InflightRequest { -final int correlationId; final int sourceId; -final int destinationId; private InflightRequest(int correlationId, int sourceId, int destinationId) { -this.correlationId = correlationId; Review Comment: I agree. Removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]
jsancio commented on code in PR #15859: URL: https://github.com/apache/kafka/pull/15859#discussion_r1595793667 ## raft/src/test/java/org/apache/kafka/raft/ElectionStateTest.java: ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.raft.generated.QuorumStateData; +import org.apache.kafka.raft.internals.ReplicaKey; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +final class ElectionStateTest { +@Test +void testVotedCandidateWithoutVotedId() { +ElectionState electionState = ElectionState.withUnknownLeader(5, Collections.emptySet()); +assertFalse(electionState.isVotedCandidate(ReplicaKey.of(1, Optional.empty(; +} + +@Test +void testVotedCandidateWithoutVotedDirectoryId() { +ElectionState electionState = ElectionState.withVotedCandidate( +5, +ReplicaKey.of(1, Optional.empty()), +Collections.emptySet() +); +assertTrue(electionState.isVotedCandidate(ReplicaKey.of(1, Optional.empty(; +assertTrue( +electionState.isVotedCandidate(ReplicaKey.of(1, Optional.of(Uuid.randomUuid( +); +} + +@Test +void testVotedCandidateWithVotedDirectoryId() { +ReplicaKey votedKey = ReplicaKey.of(1, Optional.of(Uuid.randomUuid())); +ElectionState electionState = ElectionState.withVotedCandidate( +5, +votedKey, +Collections.emptySet() +); +assertFalse(electionState.isVotedCandidate(ReplicaKey.of(1, Optional.empty(; +assertTrue(electionState.isVotedCandidate(votedKey)); +} + +@ParameterizedTest +@ValueSource(shorts = {0, 1}) +void testQuorumStateDataRoundTrip(short version) { +ReplicaKey votedKey = ReplicaKey.of(1, Optional.of(Uuid.randomUuid())); +List electionStates = Arrays.asList( +ElectionState.withUnknownLeader(5, Utils.mkSet(1, 2, 3)), +ElectionState.withElectedLeader(5, 1, Utils.mkSet(1, 2, 3)), +ElectionState.withVotedCandidate(5, votedKey, Utils.mkSet(1, 2, 3)) +); + +final List expected; +if (version == 0) { +expected = Arrays.asList( +ElectionState.withUnknownLeader(5, Utils.mkSet(1, 2, 3)), +ElectionState.withElectedLeader(5, 1, Utils.mkSet(1, 2, 3)), +ElectionState.withVotedCandidate( +5, +ReplicaKey.of(1, Optional.empty()), +Utils.mkSet(1, 2, 3) +) +); +} else { +expected = Arrays.asList( +ElectionState.withUnknownLeader(5, Collections.emptySet()), +ElectionState.withElectedLeader(5, 1, Collections.emptySet()), +ElectionState.withVotedCandidate(5, votedKey, Collections.emptySet()) Review Comment: `ElectionState` is the in-memory representation of the `quorum-state` file or `QuorumStateData`. When version 1 is used, `votedDirectoryId` is persisted but `voters` is not. When version 0 is used, `voters` is persisted but `votedDirectoryId` is not. The reason why we need to keep persisting `voters` in version 0 is because I want to allow the user to upgrade to the 3.8 JAR without upgrading to kraft.version 1 and then downgrade to the 3.7 JAR. To allow this, Kafka 3.8 needs to keep persisting the `voters` even though that version of Kafka doesn't use the `voters` field. If you notice, there is not method `ElectionState::voters()`. So there is no way for the latest version of Kafka to use those values. They are just there for backwards compatibility if they don't
Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]
jsancio commented on code in PR #15859: URL: https://github.com/apache/kafka/pull/15859#discussion_r1595784042 ## raft/src/test/java/org/apache/kafka/raft/internals/VoterSetTest.java: ## @@ -79,20 +81,79 @@ void testRemoveVoter() { ); } +@Test +void testIsVoterWithDirectoryId() { +Map aVoterMap = voterMap(Arrays.asList(1, 2, 3), true); +VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap)); + +assertTrue(voterSet.isVoter(aVoterMap.get(1).voterKey())); +assertFalse(voterSet.isVoter(ReplicaKey.of(1, Optional.of(Uuid.randomUuid(); +assertFalse(voterSet.isVoter(ReplicaKey.of(1, Optional.empty(; +assertFalse( +voterSet.isVoter(ReplicaKey.of(2, aVoterMap.get(1).voterKey().directoryId())) +); +assertFalse( +voterSet.isVoter(ReplicaKey.of(4, aVoterMap.get(1).voterKey().directoryId())) +); +assertFalse(voterSet.isVoter(ReplicaKey.of(4, Optional.empty(; +} + +@Test +void testIsVoterWithoutDirectoryId() { +Map aVoterMap = voterMap(Arrays.asList(1, 2, 3), false); +VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap)); + +assertTrue(voterSet.isVoter(ReplicaKey.of(1, Optional.empty(; +assertTrue(voterSet.isVoter(ReplicaKey.of(1, Optional.of(Uuid.randomUuid(); +assertFalse(voterSet.isVoter(ReplicaKey.of(4, Optional.of(Uuid.randomUuid(); +assertFalse(voterSet.isVoter(ReplicaKey.of(4, Optional.empty(; +} + +@Test +void testStandaloneAndOnlyVoter() { +Map aVoterMap = voterMap(Arrays.asList(1), true); +VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap)); + +assertTrue(voterSet.isOnlyVoter(aVoterMap.get(1).voterKey())); +assertFalse(voterSet.isOnlyVoter(ReplicaKey.of(1, Optional.of(Uuid.randomUuid(); +assertFalse(voterSet.isOnlyVoter(ReplicaKey.of(1, Optional.empty(; +assertFalse( +voterSet.isOnlyVoter(ReplicaKey.of(4, aVoterMap.get(1).voterKey().directoryId())) +); +assertFalse(voterSet.isOnlyVoter(ReplicaKey.of(4, Optional.empty(; +} + +@Test +void testOnlyVoter() { Review Comment: Fair. I renamed it to `testNotStandaloneAndIsOnlyVoter`. I mean to the behavior of `isOnlyVoter` when there are more than one voter in the set of voters. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]
jsancio commented on code in PR #15859: URL: https://github.com/apache/kafka/pull/15859#discussion_r1595780757 ## raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java: ## @@ -63,7 +66,9 @@ public void tearDown() { private QuorumState buildQuorumState(Set voters) { return new QuorumState( OptionalInt.of(localId), -voters, +localDirectoryId, +() -> VoterSetTest.voterSet(VoterSetTest.voterMap(voters, false)), +() -> (short) 0, Review Comment: I made the parameterized and configured them to run with `kraft.version` 0 and 1. In general we didn't really need to test both cases because `kraft.version` affects the persisted state not the in-memory state. Since those tests are not recreating a `QuorumState` object they don't affect the directory id used. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16696: Removed the in-memory implementation of RSM and RLMM [kafka]
kamalcph opened a new pull request, #15911: URL: https://github.com/apache/kafka/pull/15911 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16696) Remove the in-memory implementation of RSM and RLMM
[ https://issues.apache.org/jira/browse/KAFKA-16696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash updated KAFKA-16696: - Description: The in-memory implementation of RSM and RLMM were written to write the unit/integration tests: [https://github.com/apache/kafka/pull/10218] This is not used by any of the tests and superseded by the LocalTieredStorage framework which uses local-disk as secondary storage and topic as RLMM. Using the LocalTieredStorage framework is the preferred way to write the integration tests to capture any regression as it uses the internal topic as storage for RLMM which is the default implementation. was: The in-memory implementation of RSM and RLMM were written to write the unit/integration tests: [https://github.com/apache/kafka/pull/10218] This is not used by any of the tests and superseded by the LocalTieredStorage framework which uses local-disk as secondary storage and topic as RLMM. Using the LocalTieredStorage framework is the preferred way to write the integration tests to capture any regression as it uses the official topic as storage for RLMM. > Remove the in-memory implementation of RSM and RLMM > --- > > Key: KAFKA-16696 > URL: https://issues.apache.org/jira/browse/KAFKA-16696 > Project: Kafka > Issue Type: Task >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Minor > > The in-memory implementation of RSM and RLMM were written to write the > unit/integration tests: [https://github.com/apache/kafka/pull/10218] > This is not used by any of the tests and superseded by the LocalTieredStorage > framework which uses local-disk as secondary storage and topic as RLMM. Using > the LocalTieredStorage framework is the preferred way to write the > integration tests to capture any regression as it uses the internal topic as > storage for RLMM which is the default implementation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16696) Remove the in-memory implementation of RSM and RLMM
[ https://issues.apache.org/jira/browse/KAFKA-16696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash updated KAFKA-16696: - Description: The in-memory implementation of RSM and RLMM were written to write the unit/integration tests: [https://github.com/apache/kafka/pull/10218] This is not used by any of the tests and superseded by the LocalTieredStorage framework which uses local-disk as secondary storage and topic as RLMM. Using the LocalTieredStorage framework is the preferred way to write the integration tests to capture any regression as it uses the official topic as storage for RLMM. was: The in-memory implementation of RSM and RLMM were written to write the unit/integration tests: [https://github.com/apache/kafka/pull/10218] This is not used by any of the tests and superseded by the LocalTieredStorage framework which uses local-disk as secondary storage and topic as RLMM. > Remove the in-memory implementation of RSM and RLMM > --- > > Key: KAFKA-16696 > URL: https://issues.apache.org/jira/browse/KAFKA-16696 > Project: Kafka > Issue Type: Task >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Minor > > The in-memory implementation of RSM and RLMM were written to write the > unit/integration tests: [https://github.com/apache/kafka/pull/10218] > This is not used by any of the tests and superseded by the LocalTieredStorage > framework which uses local-disk as secondary storage and topic as RLMM. Using > the LocalTieredStorage framework is the preferred way to write the > integration tests to capture any regression as it uses the official topic as > storage for RLMM. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16696) Remove the in-memory implementation of RSM and RLMM
Kamal Chandraprakash created KAFKA-16696: Summary: Remove the in-memory implementation of RSM and RLMM Key: KAFKA-16696 URL: https://issues.apache.org/jira/browse/KAFKA-16696 Project: Kafka Issue Type: Task Reporter: Kamal Chandraprakash Assignee: Kamal Chandraprakash The in-memory implementation of RSM and RLMM were written to write the unit/integration tests: [https://github.com/apache/kafka/pull/10218] This is not used by any of the tests and superseded by the LocalTieredStorage framework which uses local-disk as secondary storage and topic as RLMM. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16693) Kafka Users are created with ACL entries and during performing operations allowed by ACL we see Denied Operation
[ https://issues.apache.org/jira/browse/KAFKA-16693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Janardhana Gopalachar updated KAFKA-16693: -- Description: Hi We have created 2 KafkaUsers from Strimzi operator for 2 different cluster with ACL entries. we have observed that the ACL entries in teh Kafka Cluster will not be present for approximately 4 minutes and the ACL entries will be available after that. The clients which are using KafkaUser to perform operations not be able to perform operations till the ACL is available in KafkaCLuster. We see in the Kafka Logs below are the mesages the user and cluster details in messages are not added since it is proprietery Processing notification(s) to /config/change Processing override for entityPath Removing PRODUCE quota for user Removing FETCH quota for user Removing REQUEST quota for user Removing CONTROLLER_MUTATION quota for user Processing notification(s) to /kafka-acl-changes Processing Acl change notification for Processing notification(s) to /config/changes Processing notification(s) to /kafka-acl-changes Processing Acl change notification for ResourcePattern(resourceType=GROU The same behavior is observed even if we create a single KafkaUser , same behavior is observed was: Hi We have created 2 KafkaUsers from Strimzi operator for 2 different cluster with ACL entries. we have observed that the ACL entries in teh Kafka Cluster will not be present for approximately 4 minutes and the ACL entries will be available after that. The clients which are using KafkaUser to perform operations not be able to perform operations till the ACL is available in KafkaCLuster. We see in the Kafka Logs below are the mesages the user and cluster details in messages are not added since it is proprietery Processing notification(s) to /config/change Processing override for entityPath Removing PRODUCE quota for user Removing FETCH quota for user Removing REQUEST quota for user Removing CONTROLLER_MUTATION quota for user Processing notification(s) to /kafka-acl-changes Processing Acl change notification for Processing notification(s) to /config/changes Processing notification(s) to /kafka-acl-changes Processing Acl change notification for ResourcePattern(resourceType=GROU > Kafka Users are created with ACL entries and during performing operations > allowed by ACL we see Denied Operation > > > Key: KAFKA-16693 > URL: https://issues.apache.org/jira/browse/KAFKA-16693 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.5.1 >Reporter: Janardhana Gopalachar >Priority: Blocker > > Hi > We have created 2 KafkaUsers from Strimzi operator for 2 different cluster > with ACL entries. we have observed that the ACL entries in teh Kafka Cluster > will not be present for approximately 4 minutes and the ACL entries will be > available after that. The clients which are using KafkaUser to perform > operations not be able to perform operations till the ACL is available in > KafkaCLuster. > We see in the Kafka Logs below are the mesages the user and cluster details > in messages are not added since it is proprietery > > Processing notification(s) to /config/change > Processing override for entityPath > Removing PRODUCE quota for user > Removing FETCH quota for user > Removing REQUEST quota for user > Removing CONTROLLER_MUTATION quota for user > Processing notification(s) to /kafka-acl-changes > Processing Acl change notification for > Processing notification(s) to /config/changes > Processing notification(s) to /kafka-acl-changes > Processing Acl change notification for ResourcePattern(resourceType=GROU > > The same behavior is observed even if we create a single KafkaUser , same > behavior is observed > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-7300) Add KafkaConsumer fetch-error-rate and fetch-error-total metrics
[ https://issues.apache.org/jira/browse/KAFKA-7300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee reassigned KAFKA-7300: - Assignee: Philip Nee (was: Kevin Lu) > Add KafkaConsumer fetch-error-rate and fetch-error-total metrics > - > > Key: KAFKA-7300 > URL: https://issues.apache.org/jira/browse/KAFKA-7300 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer, metrics >Reporter: Kevin Lu >Assignee: Philip Nee >Priority: Minor > > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-356%3A+Add+KafkaConsumer+fetch-error-rate+and+fetch-error-total+metrics] > > The KafkaConsumer is a complex client that requires many different components > to function properly. When a consumer is not operating properly, it can be > difficult to identify the root cause and which component is causing issues > (ConsumerCoordinator, Fetcher, ConsumerNetworkClient, etc). > > This aims to improve the monitoring and detection of KafkaConsumer’s Fetcher > component. > > Fetcher will send a fetch request for each node that the consumer has > assigned partitions for. > > This fetch request may fail under the following cases: > * Intermittent network issues (goes to onFailure) > * Node sent an invalid full/incremental fetch response > (FetchSessionHandler’s handleResponse returns false) > * FetchSessionIdNotFound > * InvalidFetchSessionEpochException > > These cases are logged, but it would be valuable to provide a corresponding > metric that allows for monitoring and alerting. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on PR #15825: URL: https://github.com/apache/kafka/pull/15825#issuecomment-2103078761 Test failures are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16686: Wait for given offset in TopicBasedRemoteLogMetadataManagerTest [kafka]
satishd commented on code in PR #15885: URL: https://github.com/apache/kafka/pull/15885#discussion_r1595738359 ## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java: ## @@ -152,15 +152,17 @@ public void testNewPartitionUpdates() throws Exception { // RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start // fetching those events and build the cache. -waitUntilConsumerCatchesUp(newLeaderTopicIdPartition, newFollowerTopicIdPartition, 30_000L); - +waitUntilConsumerCatchesUp(newLeaderTopicIdPartition, newFollowerTopicIdPartition, 30_000L, 0, 0); Assertions.assertTrue(topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition).hasNext()); Assertions.assertTrue(topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition).hasNext()); } private void waitUntilConsumerCatchesUp(TopicIdPartition newLeaderTopicIdPartition, TopicIdPartition newFollowerTopicIdPartition, -long timeoutMs) throws TimeoutException { +long timeoutMs, +long targetLeaderMetadataPartitionOffset, Review Comment: These parameters will not help much here as this method was written for `testNewPartitionUpdates` but other tests in this class used the functionality with the gaps. It is better to relook at those usecases and refactor this method respectively. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924) New interfaces and stubbed utility classes for pluggable TaskAssignors. [kafka]
apourchet commented on code in PR #15887: URL: https://github.com/apache/kafka/pull/15887#discussion_r1595700958 ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsState.java: ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment; + +import java.util.Map; +import java.util.Optional; +import java.util.SortedSet; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.state.HostInfo; + +/** + * A read-only metadata class representing the current state of each KafkaStreams client with at least one StreamThread participating in this rebalance + */ +public interface KafkaStreamsState { +/** + * @return the processId of the application instance running on this KafkaStreams client + */ +ProcessID processId(); + +/** + * Returns the number of processing threads available to work on tasks for this KafkaStreams client, + * which represents its overall capacity for work relative to other KafkaStreams clients. + * + * @return the number of processing threads on this KafkaStreams client + */ +int numProcessingThreads(); + +/** + * @return the set of consumer client ids for this KafkaStreams client + */ +SortedSet consumerClientIds(); + +/** + * @return the set of all active tasks owned by consumers on this KafkaStreams client since the previous rebalance + */ +SortedSet previousActiveTasks(); + +/** + * @return the set of all standby tasks owned by consumers on this KafkaStreams client since the previous rebalance + */ +SortedSet previousStandbyTasks(); + +/** + * Returns the total lag across all logged stores in the task. Equal to the end offset sum if this client + * did not have any state for this task on disk. + * + * @return end offset sum - offset sum + *Task.LATEST_OFFSET if this was previously an active running task on this client Review Comment: IllegalArgumentException usually refers to an argument that was passed in, as opposed to some global misconfiguration, so I'll go for UnsupportedOperationException unless someone else feels strongly about it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: fix LogValidatorTest#checkNonCompressed [kafka]
junrao commented on code in PR #15904: URL: https://github.com/apache/kafka/pull/15904#discussion_r1595670550 ## core/src/test/scala/unit/kafka/log/LogValidatorTest.scala: ## @@ -414,7 +414,15 @@ class LogValidatorTest { assertEquals(now + 1, validatingResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") -assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp) +// V2: Only one batch is in the records, so the shallow OffsetOfMaxTimestamp is the last offset of the single batch +// V1: 3 batches are in the records, so the shallow OffsetOfMaxTimestamp is the timestamp of branch-1 Review Comment: Hmm, what is branch? ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1177,6 +1177,8 @@ class UnifiedLog(@volatile var logStartOffset: Long, validBytesCount += batchSize val batchCompression = CompressionType.forId(batch.compressionType.id) + // V2: only one batch regardless of compression Review Comment: This is not very accurate. `analyzeAndValidateRecords()` is called in the follower append too, which could include more than one batch. It's just that sourceCompression is only used on the leader path, which only contains one batch currently. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar opened a new pull request, #15910: URL: https://github.com/apache/kafka/pull/15910 KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently interrupt offset translation MirrorCheckpointTask reloads the last checkpoint at start, OffsetSyncStore stores OffsetSyncs before reading till end. Add test case simulating restarted task where the store is reinitialized with later OffsetSyncs and check that emitted Checkpoint do not rewind. Also addresses KAFKA-16622 Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once because the OffsetSyncStore store is populated before reading to log end. Co-Authored-By: Adrian Preston -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0
[ https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845045#comment-17845045 ] Philip Nee commented on KAFKA-16687: Since you mentioned `disabled JMX reporter` would resolve the issue. I was wondering if this is caused by leaking network connections. I have been wondering if it was caused by the sticky node used by the telemetry sender: [https://github.com/apache/kafka/commit/2b99d0e45027c88ae2347fa8f7d1ff4b2b919089] > Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0 > > > Key: KAFKA-16687 > URL: https://issues.apache.org/jira/browse/KAFKA-16687 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: FTR >Assignee: Philip Nee >Priority: Major > > I am building a Java Project which using Maven dependency Kafka-clients with > 3.7.0 version. > My Java application logic is to use Kafka Consumer to poll Kakfa broker topic > continuously. > I have configured my Java application with JVM options with -Xms8G -Xmx8G > -XX:MaxMetaspaceSize=4G, and then run it. > Also, there are 16G physical memory on my virtual machine. > After my Java application running a long time, I have found that resident > memory of the Java Process was being grown to more than 14G. > In the end, the Java process ate Swap space. > I checked it with jmap -heap pid, and found heap memory usage is Ok. > Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I > found that it's caused by [NMT Internal] memory, which created by > Unsafe_allocatememory xxx. > In my Java application, I don't use any NIO DirectByteBuffer to allocate > memory. > And I check it the Kafka-clients source code, it have codes with use > "sun.misc.unsafe" to allocate memory. And MaxMetaspaceSize not work for it . > > Could you help to check it? How could I to stop this growing native memory to > avoid my System hang? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0
[ https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845042#comment-17845042 ] Philip Nee commented on KAFKA-16687: Thank you. > Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0 > > > Key: KAFKA-16687 > URL: https://issues.apache.org/jira/browse/KAFKA-16687 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: FTR >Assignee: Philip Nee >Priority: Major > > I am building a Java Project which using Maven dependency Kafka-clients with > 3.7.0 version. > My Java application logic is to use Kafka Consumer to poll Kakfa broker topic > continuously. > I have configured my Java application with JVM options with -Xms8G -Xmx8G > -XX:MaxMetaspaceSize=4G, and then run it. > Also, there are 16G physical memory on my virtual machine. > After my Java application running a long time, I have found that resident > memory of the Java Process was being grown to more than 14G. > In the end, the Java process ate Swap space. > I checked it with jmap -heap pid, and found heap memory usage is Ok. > Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I > found that it's caused by [NMT Internal] memory, which created by > Unsafe_allocatememory xxx. > In my Java application, I don't use any NIO DirectByteBuffer to allocate > memory. > And I check it the Kafka-clients source code, it have codes with use > "sun.misc.unsafe" to allocate memory. And MaxMetaspaceSize not work for it . > > Could you help to check it? How could I to stop this growing native memory to > avoid my System hang? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16695) Improve expired poll interval logging by showing exceeded time
[ https://issues.apache.org/jira/browse/KAFKA-16695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16695: --- Fix Version/s: 3.8.0 > Improve expired poll interval logging by showing exceeded time > -- > > Key: KAFKA-16695 > URL: https://issues.apache.org/jira/browse/KAFKA-16695 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > When a consumer poll iteration takes longer than the max.poll.interval, the > consumer logs a warn suggesting that the max.poll.interval config was > exceeded, and pro-actively leaves the group. The log suggests to consider > adjusting the max.poll.interval.config which should help in the cases of long > processing times. We should consider adding the info of how much time the > interval was exceeded, since it could be helpful in guiding the user to > effectively adjust the config. This is done in other clients, that log this > kind of messages in this situation: > {quote}Application maximum poll interval (30ms) exceeded by 255ms (adjust > max.poll.interval.ms for long-running message processing): leaving > group{quote} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16695) Improve expired poll interval logging by showing exceeded time
[ https://issues.apache.org/jira/browse/KAFKA-16695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16695: --- Labels: kip-848-client-support (was: ) > Improve expired poll interval logging by showing exceeded time > -- > > Key: KAFKA-16695 > URL: https://issues.apache.org/jira/browse/KAFKA-16695 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > > When a consumer poll iteration takes longer than the max.poll.interval, the > consumer logs a warn suggesting that the max.poll.interval config was > exceeded, and pro-actively leaves the group. The log suggests to consider > adjusting the max.poll.interval.config which should help in the cases of long > processing times. We should consider adding the info of how much time the > interval was exceeded, since it could be helpful in guiding the user to > effectively adjust the config. This is done in other clients, that log this > kind of messages in this situation: > {quote}Application maximum poll interval (30ms) exceeded by 255ms (adjust > max.poll.interval.ms for long-running message processing): leaving > group{quote} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16695: Improve expired poll logging [kafka]
lianetm commented on PR #15909: URL: https://github.com/apache/kafka/pull/15909#issuecomment-2102939146 Hey @mjsax , here is the improved logging following your suggestion, helpful indeed I expect. Would you have a chance to take a look? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16691: [connect:transform] Support nested field paths on TimestampConverter [kafka]
C0urante commented on PR #15893: URL: https://github.com/apache/kafka/pull/15893#issuecomment-2102926693 I think this leads to a change in behavior. Right now this test (surprisingly!) passes on trunk: ```java public class TimestampConverterTest { // ... @Test public void testWithSchemaFieldWithDefaultValue() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FIELD_CONFIG, "timestamp_field"); xformValue.configure(config); java.util.Date defaultFieldValue = new java.util.Date(); Schema schema = SchemaBuilder.struct() .field( "timestamp_field", Timestamp.builder() .defaultValue(defaultFieldValue) .build() ); Struct value = new Struct(schema) .put("timestamp_field", DATE_PLUS_TIME.getTime()); SourceRecord transformed = xformValue.apply(createRecordWithSchema(schema, value)); assertEquals(Schema.Type.STRUCT, transformed.valueSchema().type()); Struct transformedValue = (Struct) transformed.value(); assertEquals(DATE_PLUS_TIME.getTime(), transformedValue.get("timestamp_field")); assertNull(transformedValue.schema().field("timestamp_field").schema().defaultValue()); } } ``` We don't propagate default values for the fields we convert. Instead, we automatically substitute in the default value if none is found. This is surprising behavior, and has led to things like [KIP-581](https://cwiki.apache.org/confluence/display/KAFKA/KIP-581%3A+Value+of+optional+null+field+which+has+default+value) and [KIP-1040](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=303794677), but I don't think we should change it in this KIP because it's out of scope and, if necessary, can be touched on in KIP-1040 (which is in discussion at the moment). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]
jsancio commented on code in PR #15859: URL: https://github.com/apache/kafka/pull/15859#discussion_r1595620870 ## raft/src/main/java/org/apache/kafka/raft/QuorumState.java: ## @@ -481,7 +517,7 @@ private void durableTransitionTo(EpochState state) { } } -this.store.writeElectionState(state.election()); +this.store.writeElectionState(state.election(), latestKraftVersion.get()); Review Comment: Yeah. I am also not a fan of this inconsistency. I try to change them as I modifying related lines. I'll go through this file and fix all of the ones I can find. I think the convention is to not use `this` unless it is needed to disambiguate from a locally scoped variable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]
jsancio commented on code in PR #15859: URL: https://github.com/apache/kafka/pull/15859#discussion_r1595616944 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -1700,16 +1710,16 @@ private void handleResponse(RaftResponse.Inbound response, long currentTimeMs) { } /** - * Validate a request which is only valid between voters. If an error is - * present in the returned value, it should be returned in the response. + * Validate common state for requests to establish leadership. + * + * These include the Vote, BeginQuorumEpoch rnd EndQuorumEpoch RPCs. If an error is present in Review Comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]
jsancio commented on code in PR #15859: URL: https://github.com/apache/kafka/pull/15859#discussion_r1595615609 ## raft/src/main/java/org/apache/kafka/raft/FileQuorumStateStore.java: ## @@ -59,17 +54,18 @@ * "data_version":0} * * */ -public class FileBasedStateStore implements QuorumStateStore { -private static final Logger log = LoggerFactory.getLogger(FileBasedStateStore.class); +public class FileQuorumStateStore implements QuorumStateStore { Review Comment: Fixed. It looks like we are not building the Java Doc HTML pages any more so I can't test the HTML generated by this change. I need to investigate this after this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16307: fix coordinator thread idle ratio [kafka]
dajac commented on PR #15835: URL: https://github.com/apache/kafka/pull/15835#issuecomment-2102880603 @jeffkbkim The unit test seems to be flaky. Could you please check? https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15785/19/tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: migrate DescribeConsumerGroupTest to use ClusterTestExtensions [kafka]
FrankYang0529 opened a new pull request, #15908: URL: https://github.com/apache/kafka/pull/15908 By using ClusterTestExtensions, DescribeConsumerGroupTest get get away from KafkaServerTestHarness dependency. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on PR #15821: URL: https://github.com/apache/kafka/pull/15821#issuecomment-2102879454 > > This is used by ListConsumerGroupTest and DescribeConsumerGroupTest. I think we can create a new class SimpleConsumerGroupExecutorTestUtils for it. WDYT? Thank you. > > That is addressed already. see https://github.com/apache/kafka/blob/trunk/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java#L92 Thanks. The `SimpleConsumerGroupExecutor ` subscribe topic partitions. I have updated `ConsumerGroupCommandTestUtils` to support it. https://github.com/apache/kafka/blob/f4fdaa702a2e718bdb44b9c5fec254f32a33f0d8/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java#L274-L287 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16660: reduce the check interval to speedup DelegationTokenRequestsTest [kafka]
brandboat commented on PR #15907: URL: https://github.com/apache/kafka/pull/15907#issuecomment-2102870594 Loop 131 times and all passed `I=0; while ./gradlew core:test --tests DelegationTokenRequestsTest --rerun --fail-fast; do (( I=$I+1 )); echo "Completed run: $I"; sleep 1; done`. But let's see if there is any surprise in jenkins. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]
jsancio commented on code in PR #15859: URL: https://github.com/apache/kafka/pull/15859#discussion_r1595591034 ## raft/src/main/java/org/apache/kafka/raft/EpochState.java: ## @@ -26,15 +27,16 @@ default Optional highWatermark() { } /** - * Decide whether to grant a vote to a candidate, it is the responsibility of the caller to invoke + * Decide whether to grant a vote to a candidate. + * + * It is the responsibility of the caller to invoke Review Comment: This is the [method](https://github.com/apache/kafka/pull/15859/files#diff-095b9e60b0227d41f24b49923afddbe813d5c5754cd0ef4801741f1d275319b6R347-R350). I changed the signature in this PR so I just changed the documentation to match the new signature. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16660: reduce the check interval to speedup DelegationTokenRequestsTest [kafka]
brandboat opened a new pull request, #15907: URL: https://github.com/apache/kafka/pull/15907 related to https://issues.apache.org/jira/browse/KAFKA-16660 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]
clolov commented on code in PR #14716: URL: https://github.com/apache/kafka/pull/14716#discussion_r1595590794 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ## @@ -411,25 +394,32 @@ public void seek(final TopicPartition partition, final long offset) { shouldNotSeek.set(new AssertionError("Should not seek")); +@SuppressWarnings("unchecked") final java.util.function.Consumer> resetter = -EasyMock.mock(java.util.function.Consumer.class); -resetter.accept(Collections.singleton(partition1)); -EasyMock.expectLastCall(); -EasyMock.replay(resetter); +mock(java.util.function.Consumer.class); Review Comment: Heya @chia7712 and thank you for the review and the good suggestions! Apologies for not responding sooner, I somehow missed it. The newest commits ought to address both of your suggestions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6
[ https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Johnson Okorie updated KAFKA-16692: --- Description: We have a kafka cluster running on version 3.5.2 that we are upgrading to 3.6.1. This cluster has a lot of clients with exactly one semantics enabled and hence creating transactions. As we replaced brokers with the new binaries, we observed lots of clients in the cluster experiencing the following error: {code:java} 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, transactionalId=] Got error produce response with correlation id 6402937 on topic-partition , retrying (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The server disconnected before a response was received.{code} On inspecting the broker, we saw the following errors on brokers still running Kafka version 3.5.2: {code:java} message: Closing socket for because of error exception_exception_class: org.apache.kafka.common.errors.InvalidRequestException exception_exception_message: Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled exception_stacktrace: org.apache.kafka.common.errors.InvalidRequestException: Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled {code} On the new brokers running 3.6.1 we saw the following errors: {code:java} [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for node 1043 with a network exception.{code} I can also see this : {code:java} [AddPartitionsToTxnManager broker=1055]Cancelled in-flight ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 being disconnected (elapsed time since creation: 11ms, elapsed time since send: 4ms, request timeout: 3ms){code} We started investigating this issue and digging through the changes in 3.6, we came across some changes introduced as part of KAFKA-14402 that we thought might lead to this behaviour. First we could see that _transaction.partition.verification.enable_ is enabled by default and enables a new code path that culminates in we sending version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269]. >From a >[discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] >on the mailing list, [~jolshan] pointed out that this scenario shouldn't be >possible as the following code paths should prevent version 4 >ADD_PARTITIONS_TO_TXN requests being sent to other brokers: [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130] [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195] However, these requests are still sent to other brokers in our environment. On further inspection of the code, I am wondering if the following code path could lead to this issue: [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500] In this scenario, we don't have any _NodeApiVersions_ available for the specified nodeId and potentially skipping the _latestUsableVersion_ check. I am wondering if it is possible that because _discoverBrokerVersions_ is set to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it skips fetching {_}NodeApiVersions{_}? I can see that we create the network client here: [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641] The _NetworkUtils.buildNetworkClient_ method seems to create a network client that has _discoverBrokerVersions_ set to {_}false{_}. I was hoping I could get some assistance debugging this issue. Happy to provide any additional information needed. was: We have a kafka cluster running on version 3.5.2 that we are upgrading to 3.6.1. This cluster has a lot of clients with exactly one semantics enabled and hence creating transactions. As we replaced brokers with the new binaries, we observed lots of clients in the cluster experiencing the following error: {code:java} 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, transactionalId=] Got error produce response with correlation id 6402937 on topic-partition , retrying (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The server disconnected before a response was received.{code} On inspecting the broker, we saw the following errors on brokers still running Kafka version 3.5.2: {code:java} message: Closing socket for because of error exception_exception_class:
[jira] [Updated] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6
[ https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Johnson Okorie updated KAFKA-16692: --- Description: We have a kafka cluster running on version 3.5.2 that we are upgrading to 3.6.1. This cluster has a lot of clients with exactly one semantics enabled and hence creating transactions. As we replaced brokers with the new binaries, we observed lots of clients in the cluster experiencing the following error: {code:java} 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, transactionalId=] Got error produce response with correlation id 6402937 on topic-partition , retrying (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The server disconnected before a response was received.{code} On inspecting the broker, we saw the following errors on brokers still running Kafka version 3.5.2: {code:java} message: Closing socket for because of error exception_exception_class: org.apache.kafka.common.errors.InvalidRequestException exception_exception_message: Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled exception_stacktrace: org.apache.kafka.common.errors.InvalidRequestException: Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled {code} On the new brokers running 3.6.1 we saw the following errors: {code:java} [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for node 1043 with a network exception.{code} I can also see this : {code:java} [AddPartitionsToTxnManager broker=1055]Cancelled in-flight ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 being disconnected (elapsed time since creation: 11ms, elapsed time since send: 4ms, request timeout: 3ms){code} We started investigating this issue and digging through the changes in 3.6, we came across some changes introduced as part of KAFKA-14402 that we thought might lead to this behaviour. First we could see that _transaction.partition.verification.enable_ is enabled by default and enables a new code path that culminates in we sending version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269]. >From a >[discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] >on the mailing list, [~jolshan] pointed out that this scenario shouldn't be >possible as the following code paths should prevent version 4 >ADD_PARTITIONS_TO_TXN requests being sent to other brokers: [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130] [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195] However, these requests are still sent to other brokers in our environment. On further inspection of the code, I am wondering if the following code path could lead to this issue: [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500] In this scenario, we don't have any _NodeApiVersions_ available for the specified nodeId and potentially skipping _latestUsableVersion_ check. I am wondering if it is possible that because _discoverBrokerVersions_ is set to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it skips fetching {_}NodeApiVersions{_}? I can see that we create the network client here: [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641] The _NetworkUtils.buildNetworkClient_ method seems to create a network client that has _discoverBrokerVersions_ set to {_}false{_}. I was hoping I could get some assistance debugging this issue. Happy to provide any additional information needed. was: We have a kafka cluster running on version 3.5.2 that we are upgrading to 3.6.1. This cluster has a lot of clients with exactly one semantics enabled and hence creating transactions. As we replaced brokers with the new binaries, we observed lots of clients in the cluster experiencing the following error: {code:java} 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, transactionalId=] Got error produce response with correlation id 6402937 on topic-partition , retrying (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The server disconnected before a response was received.{code} On inspecting the broker, we saw the following errors on brokers still running Kafka version 3.5.2: {code:java} message: Closing socket for because of error exception_exception_class:
Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]
jsancio commented on code in PR #15859: URL: https://github.com/apache/kafka/pull/15859#discussion_r1595585037 ## raft/src/main/java/org/apache/kafka/raft/ElectionState.java: ## @@ -115,15 +166,51 @@ public boolean equals(Object o) { ElectionState that = (ElectionState) o; if (epoch != that.epoch) return false; -if (!leaderIdOpt.equals(that.leaderIdOpt)) return false; -return votedIdOpt.equals(that.votedIdOpt); +if (!leaderId.equals(that.leaderId)) return false; +if (!votedKey.equals(that.votedKey)) return false; + +return voters.equals(that.voters); } @Override public int hashCode() { -int result = epoch; -result = 31 * result + leaderIdOpt.hashCode(); -result = 31 * result + votedIdOpt.hashCode(); -return result; +return Objects.hash(epoch, leaderId, votedKey, voters); +} + +public static ElectionState withVotedCandidate(int epoch, ReplicaKey votedKey, Set voters) { +if (votedKey.id() < 0) { +throw new IllegalArgumentException("Illegal voted Id " + votedKey.id() + ": must be non-negative"); +} + +return new ElectionState(epoch, OptionalInt.empty(), Optional.of(votedKey), voters); +} + +public static ElectionState withElectedLeader(int epoch, int leaderId, Set voters) { +if (leaderId < 0) { +throw new IllegalArgumentException("Illegal leader Id " + leaderId + ": must be non-negative"); +} + +return new ElectionState(epoch, OptionalInt.of(leaderId), Optional.empty(), voters); +} + +public static ElectionState withUnknownLeader(int epoch, Set voters) { +return new ElectionState(epoch, OptionalInt.empty(), Optional.empty(), voters); +} + +public static ElectionState fromQuorumStateData(QuorumStateData data) { +Optional votedDirectoryId = data.votedDirectoryId().equals(noVotedDirectoryId) ? +Optional.empty() : +Optional.of(data.votedDirectoryId()); + +Optional voterKey = data.votedId() == notVoted ? Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16695) Improve expired poll interval logging by showing exceeded time
Lianet Magrans created KAFKA-16695: -- Summary: Improve expired poll interval logging by showing exceeded time Key: KAFKA-16695 URL: https://issues.apache.org/jira/browse/KAFKA-16695 Project: Kafka Issue Type: Task Components: clients, consumer Reporter: Lianet Magrans Assignee: Lianet Magrans When a consumer poll iteration takes longer than the max.poll.interval, the consumer logs a warn suggesting that the max.poll.interval config was exceeded, and pro-actively leaves the group. The log suggests to consider adjusting the max.poll.interval.config which should help in the cases of long processing times. We should consider adding the info of how much time the interval was exceeded, since it could be helpful in guiding the user to effectively adjust the config. This is done in other clients, that log this kind of messages in this situation: {quote}Application maximum poll interval (30ms) exceeded by 255ms (adjust max.poll.interval.ms for long-running message processing): leaving group{quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]
jsancio commented on code in PR #15859: URL: https://github.com/apache/kafka/pull/15859#discussion_r1595581356 ## raft/src/main/java/org/apache/kafka/raft/ElectionState.java: ## @@ -64,47 +62,100 @@ public boolean isLeader(int nodeId) { return leaderIdOrSentinel() == nodeId; } -public boolean isVotedCandidate(int nodeId) { -if (nodeId < 0) -throw new IllegalArgumentException("Invalid negative nodeId: " + nodeId); -return votedIdOpt.orElse(-1) == nodeId; +/** + * Return if the replica has voted for the given candidate. + * + * A replica has voted for a candidate if all of the following are true: + * 1. the node's id and voted id match and + * 2. if the voted directory id is set, it matches the node's directory id + * + * @param nodeKey the id and directory id of the replica + * @return true when the arguments match, otherwise false + */ +public boolean isVotedCandidate(ReplicaKey nodeKey) { +if (nodeKey.id() < 0) { +throw new IllegalArgumentException("Invalid node key " + nodeKey); +} else if (!votedKey.isPresent()) { +return false; +} else if (votedKey.get().id() != nodeKey.id()) { +return false; +} else if (!votedKey.get().directoryId().isPresent()) { +// when the persisted voted uuid is not present assume that we voted for this candidate; +// this happends when the kraft version is 0. Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]
jsancio commented on code in PR #15859: URL: https://github.com/apache/kafka/pull/15859#discussion_r1595578267 ## raft/src/main/java/org/apache/kafka/raft/ElectionState.java: ## @@ -16,46 +16,44 @@ */ package org.apache.kafka.raft; +import java.util.List; +import java.util.Objects; +import java.util.Optional; import java.util.OptionalInt; import java.util.Set; +import java.util.stream.Collectors; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.raft.generated.QuorumStateData; +import org.apache.kafka.raft.internals.ReplicaKey; /** * Encapsulate election state stored on disk after every state change. */ -public class ElectionState { -public final int epoch; -public final OptionalInt leaderIdOpt; -public final OptionalInt votedIdOpt; +final public class ElectionState { +private static int unknownLeaderId = -1; Review Comment: Yeah, I had this using capital letters and underscore but our style guide complained about the name pattern. E.g. ``` [ant:checkstyle] [ERROR] kafka/raft/src/main/java/org/apache/kafka/raft/ElectionState.java:33:24: Name 'UNKNOWN_LEADER_ID' must match pattern '^[a-z][a-zA-Z0-9]*$'. [StaticVariableName] ``` I suspect that the style guideline is different if the field is `private`. I haven't investigated this in detail. Do you mind if I address this in a different PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]
jsancio commented on code in PR #15859: URL: https://github.com/apache/kafka/pull/15859#discussion_r1595568368 ## raft/src/main/java/org/apache/kafka/raft/CandidateState.java: ## @@ -51,14 +54,27 @@ public class CandidateState implements EpochState { protected CandidateState( Time time, int localId, +Uuid localDirectoryId, Review Comment: Yes. Thanks for catching this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]
jsancio commented on code in PR #15859: URL: https://github.com/apache/kafka/pull/15859#discussion_r1595568006 ## raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java: ## @@ -64,6 +64,43 @@ public Optional voterAddress(int voter, String listener) { .flatMap(voterNode -> voterNode.address(listener)); } +/** + * Returns if the node is a voter in the set of voters. + * + * If the voter set includes the directory id, the {@code nodeKey} directory id must match the + * directory id specified by the voter set. + * + * If the voter set doesn't include the directory id ({@code Optional.empty()}), a node is in + * the voter set as long as the node id matches. The directory id is not checked. + * + * @param nodeKey the node's id and directory id + * @return true if the node is a voter in the voter set, otherwise false + */ +public boolean isVoter(ReplicaKey nodeKey) { +VoterNode node = voters.get(nodeKey.id()); +if (node != null) { +if (node.voterKey().directoryId().isPresent()) { +return node.voterKey().directoryId().equals(nodeKey.directoryId()); +} else { +// configured voter set doesn't an uuid so it is a voter as long as the node id Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6
[ https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845004#comment-17845004 ] Justine Olshan commented on KAFKA-16692: I'm working on a test to recreate the issue, and then seeing if the proposed fix helps. > InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled when upgrading from kafka 3.5 to 3.6 > > > Key: KAFKA-16692 > URL: https://issues.apache.org/jira/browse/KAFKA-16692 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.6.1 >Reporter: Johnson Okorie >Assignee: Justine Olshan >Priority: Major > > We have a kafka cluster running on version 3.5.2 that we are upgrading to > 3.6.1. This cluster has a lot of clients with exactly one semantics enabled > and hence creating transactions. As we replaced brokers with the new > binaries, we observed lots of clients in the cluster experiencing the > following error: > {code:java} > 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, > transactionalId=] Got error produce response with > correlation id 6402937 on topic-partition , retrying > (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The > server disconnected before a response was received.{code} > On inspecting the broker, we saw the following errors on brokers still > running Kafka version 3.5.2: > > {code:java} > message: > Closing socket for because of error > exception_exception_class: > org.apache.kafka.common.errors.InvalidRequestException > exception_exception_message: > Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled > exception_stacktrace: > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled > {code} > On the new brokers running 3.6.1 we saw the following errors: > > {code:java} > [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for > node 1043 with a network exception.{code} > > I can also see this : > {code:java} > [AddPartitionsToTxnManager broker=1055]Cancelled in-flight > ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 > being disconnected (elapsed time since creation: 11ms, elapsed time since > send: 4ms, request timeout: 3ms){code} > We started investigating this issue and digging through the changes in 3.6, > we came across some changes introduced as part of KAFKA-14402 that we thought > might lead to this behaviour. > First we could see that _transaction.partition.verification.enable_ is > enabled by default and enables a new code path that culminates in we sending > version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated > [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269]. > From a > [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] > on the mailing list, [~jolshan] pointed out that this scenario shouldn't be > possible as the following code paths should prevent version 4 > ADD_PARTITIONS_TO_TXN requests being sent to other brokers: > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130] > > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195] > However, these requests are still sent to other brokers in our environment. > On further inspection of the code, I am wondering if the following code path > could lead to this issue: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500] > In this scenario, we don't have any _NodeApiVersions_ available for the > specified nodeId and potentially skipping _latestUsableVersion_ check as > expected. I am wondering if it is possible that because > _discoverBrokerVersions_ is set to _false_ for the network client of the > {_}AddPartitionsToTxnManager{_}, it skips fetching {_}NodeApiVersions{_}? I > can see that we create the network client here: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641] > The _NetworkUtils.buildNetworkClient_ method seems to create a network client > that has _discoverBrokerVersions_ set to {_}false{_}. > I was hoping I could get some assistance debugging this issue. Happy to > provide any additional information needed. > > > -- This