[jira] [Updated] (KAFKA-15143) MockFixedKeyProcessorContext is missing from test-utils
[ https://issues.apache.org/jira/browse/KAFKA-15143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15143: Labels: needs-kip (was: ) > MockFixedKeyProcessorContext is missing from test-utils > --- > > Key: KAFKA-15143 > URL: https://issues.apache.org/jira/browse/KAFKA-15143 > Project: Kafka > Issue Type: Bug > Components: streams-test-utils >Affects Versions: 3.5.0 >Reporter: Tomasz Kaszuba >Assignee: Shashwat Pandey >Priority: Major > Labels: needs-kip > > I am trying to test a ContextualFixedKeyProcessor but it is not possible to > call the init method from within a unit test since the MockProcessorContext > doesn't implement > {code:java} > FixedKeyProcessorContext {code} > but only > {code:java} > ProcessorContext > {code} > Shouldn't there also be a *MockFixedKeyProcessorContext* in the test utils? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15143) MockFixedKeyProcessorContext is missing from test-utils
[ https://issues.apache.org/jira/browse/KAFKA-15143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-15143: --- Assignee: Shashwat Pandey > MockFixedKeyProcessorContext is missing from test-utils > --- > > Key: KAFKA-15143 > URL: https://issues.apache.org/jira/browse/KAFKA-15143 > Project: Kafka > Issue Type: Bug > Components: streams-test-utils >Affects Versions: 3.5.0 >Reporter: Tomasz Kaszuba >Assignee: Shashwat Pandey >Priority: Major > > I am trying to test a ContextualFixedKeyProcessor but it is not possible to > call the init method from within a unit test since the MockProcessorContext > doesn't implement > {code:java} > FixedKeyProcessorContext {code} > but only > {code:java} > ProcessorContext > {code} > Shouldn't there also be a *MockFixedKeyProcessorContext* in the test utils? -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
florin-akermann commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1493075382 ## streams/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java: ## @@ -71,11 +71,21 @@ void afterEach() { @Test void testRelaxedLeftStreamStreamJoin() { leftStream -.leftJoin(rightStream, JOINER, WINDOW) +.leftJoin(rightStream, JOINER, WINDOW_60MS_GRACE_10MS) .to(OUT); initTopology(); -left.pipeInput(null, "leftValue", 1); -assertEquals(Collections.singletonList(new KeyValue<>(null, "leftValue|null")), out.readKeyValuesToList()); +left.pipeInput(null, "leftValue1", 1); +left.pipeInput(null, "leftValue2", 90); +left.pipeInput(null, "lateArrival-Dropped", 19); +left.pipeInput(null, "lateArrivalWithinGrace", 20); +assertEquals( +Arrays.asList( +new KeyValue<>(null, "leftValue1|null"), +new KeyValue<>(null, "leftValue2|null"), +new KeyValue<>(null, "lateArrivalWithinGrace|null") +), +out.readKeyValuesToList() +); } @Test Review Comment: Yes, I will check. Similarily, I realized `KStreamKStreamSelfJoin` shoud probably also drop 'too late' records? I guess this would also be a separate Jira ticket & PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
florin-akermann commented on PR #15189: URL: https://github.com/apache/kafka/pull/15189#issuecomment-1949473525 > Thanks for the fix! Overall LGTM. Couple of comments. @mjsax Thank you for all the good points. I agree with all of them. However, first I would like to align on whether we really want to extend the window bound tests to assert the correct grace period behavior. See my reply on 'Why + 1'. Personally, I would say I just write separate TestClasses/Cases to assert the grace period behavior (including sensor checks). Plus, as you suggested, the window bound tests will just be updated with large grace periods (E.g. Long.MAX - some constant). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Improve ListConsumerGroupTest.testListGroupCommand [kafka]
jolshan commented on code in PR #15382: URL: https://github.com/apache/kafka/pull/15382#discussion_r1492801866 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java: ## @@ -123,42 +128,89 @@ public void testListGroupCommand(String quorum) throws Exception { String simpleGroup = "simple-group"; addSimpleGroupExecutor(simpleGroup); addConsumerGroupExecutor(1); -final AtomicReference out = new AtomicReference<>(""); -String[] cgcArgs1 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list"}; -TestUtils.waitForCondition(() -> { -out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { -ConsumerGroupCommand.main(cgcArgs1); -return null; -})); -return !out.get().contains("STATE") && out.get().contains(simpleGroup) && out.get().contains(GROUP); -}, "Expected to find " + simpleGroup + ", " + GROUP + " and no header, but found " + out.get()); - -String[] cgcArgs2 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state"}; -TestUtils.waitForCondition(() -> { -out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { -ConsumerGroupCommand.main(cgcArgs2); -return null; -})); -return out.get().contains("STATE") && out.get().contains(simpleGroup) && out.get().contains(GROUP); -}, "Expected to find " + simpleGroup + ", " + GROUP + " and the header, but found " + out.get()); - -String[] cgcArgs3 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "Stable"}; -TestUtils.waitForCondition(() -> { -out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { -ConsumerGroupCommand.main(cgcArgs3); -return null; -})); -return out.get().contains("STATE") && out.get().contains(GROUP) && out.get().contains("Stable"); -}, "Expected to find " + GROUP + " in state Stable and the header, but found " + out.get()); - -String[] cgcArgs4 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "stable"}; +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list"), +Collections.emptyList(), +mkSet( +Collections.singletonList(GROUP), +Collections.singletonList(simpleGroup) +) +); + +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state"), +Arrays.asList("GROUP", "STATE"), +mkSet( +Arrays.asList(GROUP, "Stable"), +Arrays.asList(simpleGroup, "Empty") +) +); + +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "Stable"), +Arrays.asList("GROUP", "STATE"), +mkSet( +Arrays.asList(GROUP, "Stable") +) +); + +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "stable"), +Arrays.asList("GROUP", "STATE"), +mkSet( +Arrays.asList(GROUP, "Stable") +) +); +} + +/** + * Validates that the output of the list command corresponds to the expected values. + * + * @param args The arguments for the command line tool. + * @param expectedHeaderThe expected header as a list of strings; or an empty list + * if a header is not expected. + * @param expectedGroupsThe expected groups as a set of list of strings. The list + * of strings corresponds to the columns. + * @throws InterruptedException + */ +private static void validateListOutput( Review Comment: The method itself does the asserts, so I think it is ok to be void. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
florin-akermann commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1493064669 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java: ## @@ -816,10 +816,12 @@ public void testWindowing() { stream1 = builder.stream(topic1, consumed); stream2 = builder.stream(topic2, consumed); +final Duration timeDifference = ofMillis(100L); +final Duration grace = ofMillis(104); Review Comment: On second thought, note my coment below on 'Why + 1 '. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
florin-akermann commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1493064669 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java: ## @@ -816,10 +816,12 @@ public void testWindowing() { stream1 = builder.stream(topic1, consumed); stream2 = builder.stream(topic2, consumed); +final Duration timeDifference = ofMillis(100L); +final Duration grace = ofMillis(104); Review Comment: On second thought, note my coment on 'Why + 1 '. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
jolshan commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1493062297 ## clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java: ## @@ -61,6 +62,11 @@ public ListTransactionsOptions filterProducerIds(Collection producerIdFilt return this; } +public ListTransactionsOptions durationFilter(Long durationMs) { Review Comment: nit: these can all be primitive longs. (lowercase) There is usage in this file and the Admin file -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
florin-akermann commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1493061626 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java: ## @@ -1363,6 +1365,16 @@ public void testWindowing() { new KeyValueTimestamp<>(2, "L2+l2", 2002L), new KeyValueTimestamp<>(3, "L3+l3", 2003L) ); + +//push two items with timestamp at grace edge; this should produce one join item, M0 is 'too late' +final long currentStreamTime = 2104; +final long lowerBound = currentStreamTime - timeDifference.toMillis() - grace.toMillis(); +inputTopic1.pipeInput(0, "M0", lowerBound - 1); +inputTopic1.pipeInput(1, "M1", lowerBound + 1); Review Comment: I can see now that the naming is misleading. The lowerbound is with regards to the grace period, 1900 However the lowerbound of the winow `1:l1` is 1901 So the +1 was there to make sure it is still within the window. In general I start to wonder whether it wouldn't make more sense to test these two concerns (grace & windowing) separatley. E.g. with grace 150, `M0` is just a test case to assert that late records get dropped and `M1` is just another window bound test. With grace 104 we get the 'grace bound' and the 'l0 lower window bound' to overlap but it might be confusing. In other words, as you said 'this test aims to test window bounds'. So maybe I should move grace period tests into a separate test class? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
yyu1993 commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1493050657 ## clients/src/main/resources/common/message/ListTransactionsResponse.json: ## @@ -17,7 +17,8 @@ "apiKey": 66, "type": "response", "name": "ListTransactionsResponse", - "validVersions": "0", + // Version 1 is the same as vesion 0 (KIP-994). Review Comment: Nice catch. Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]
rreddy-22 commented on code in PR #15150: URL: https://github.com/apache/kafka/pull/15150#discussion_r1493044453 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java: ## @@ -20,78 +20,225 @@ import kafka.admin.ConsumerGroupCommand; import org.apache.kafka.clients.admin.ConsumerGroupListing; import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.GroupType; import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.params.provider.MethodSource; import java.util.Arrays; import java.util.Collections; +import java.util.EnumSet; +import java.util.HashSet; import java.util.Objects; import java.util.Optional; +import java.util.Properties; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; public class ListConsumerGroupTest extends ConsumerGroupCommandTest { -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testListConsumerGroups(String quorum) throws Exception { +@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) +@MethodSource("getTestQuorumAndGroupProtocolParametersAll") +public void testListConsumerGroupsWithoutFilters(String quorum, String groupProtocol) throws Exception { String simpleGroup = "simple-group"; + +createOffsetsTopic(listenerName(), new Properties()); + addSimpleGroupExecutor(simpleGroup); addConsumerGroupExecutor(1); +addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol); String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list"}; ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); -scala.collection.Set expectedGroups = set(Arrays.asList(GROUP, simpleGroup)); + +scala.collection.Set expectedGroups = set(Arrays.asList(GROUP, simpleGroup, PROTOCOL_GROUP)); final AtomicReference foundGroups = new AtomicReference<>(); + TestUtils.waitForCondition(() -> { foundGroups.set(service.listConsumerGroups().toSet()); return Objects.equals(expectedGroups, foundGroups.get()); }, "Expected --list to show groups " + expectedGroups + ", but found " + foundGroups.get() + "."); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testListWithUnrecognizedNewConsumerOption() { +@Test +public void testListWithUnrecognizedNewConsumerOption() throws Exception { String[] cgcArgs = new String[]{"--new-consumer", "--bootstrap-server", bootstrapServers(listenerName()), "--list"}; assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testListConsumerGroupsWithStates() throws Exception { +@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) +@MethodSource("getTestQuorumAndGroupProtocolParametersAll") +public void testListConsumerGroupsWithStates(String quorum, String groupProtocol) throws Exception { Review Comment: okay thanks, I created two tests, we can't change the assertions without if statements and we didn't want those right -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
yyu1993 commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1493041438 ## clients/src/main/java/org/apache/kafka/clients/admin/Admin.java: ## @@ -1633,7 +1633,8 @@ default ListTransactionsResult listTransactions() { * coordinators in the cluster and collect the state of all transactions. Users * should typically attempt to reduce the size of the result set using * {@link ListTransactionsOptions#filterProducerIds(Collection)} or - * {@link ListTransactionsOptions#filterStates(Collection)} + * {@link ListTransactionsOptions#filterStates(Collection)} or Review Comment: The ListTransactions api itself is able to filter by producer ids and states (see `handleListTransactions` in TransactionCoordinator). However, the cli tool does not support those filters currently. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
jolshan commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1493039266 ## clients/src/main/resources/common/message/ListTransactionsRequest.json: ## @@ -18,14 +18,18 @@ "type": "request", "listeners": ["zkBroker", "broker"], "name": "ListTransactionsRequest", - "validVersions": "0", + // version 1: adds DurationFilter to list transactions older than specified duration Review Comment: nit: most specs capitalize V here. ## clients/src/main/resources/common/message/ListTransactionsResponse.json: ## @@ -17,7 +17,8 @@ "apiKey": 66, "type": "response", "name": "ListTransactionsResponse", - "validVersions": "0", + // Version 1 is the same as vesion 0 (KIP-994). Review Comment: nit: version -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
jolshan commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1493024798 ## clients/src/main/java/org/apache/kafka/clients/admin/Admin.java: ## @@ -1633,7 +1633,8 @@ default ListTransactionsResult listTransactions() { * coordinators in the cluster and collect the state of all transactions. Users * should typically attempt to reduce the size of the result set using * {@link ListTransactionsOptions#filterProducerIds(Collection)} or - * {@link ListTransactionsOptions#filterStates(Collection)} + * {@link ListTransactionsOptions#filterStates(Collection)} or Review Comment: Maybe a silly question, but I noticed this comment mentions filtering by producer ID or state. Do you know where that is done? It looks like durationFilter is the only subparser argument for list in TransactionCommand -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
florin-akermann commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1493006495 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java: ## @@ -816,10 +816,12 @@ public void testWindowing() { stream1 = builder.stream(topic1, consumed); stream2 = builder.stream(topic2, consumed); +final Duration timeDifference = ofMillis(100L); +final Duration grace = ofMillis(104); Review Comment: Good idea with the sensor. Just for me to understand, is 150 an arbitrarily chosen value or how did you come up with it? Wouldn't it make sense to set it to at least (max_timestamp in test case - min_timestamp in test case) e.g. 1104 - 899? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
florin-akermann commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1493006495 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java: ## @@ -816,10 +816,12 @@ public void testWindowing() { stream1 = builder.stream(topic1, consumed); stream2 = builder.stream(topic2, consumed); +final Duration timeDifference = ofMillis(100L); +final Duration grace = ofMillis(104); Review Comment: Good idea with the sensor. Just for me to understand, is 150 an arbitrarily chosen value or how did you come up with it? Wouldn't it make sense to set it to (max_timestamp in test case - min_timestamp in test case) e.g. 1104 - 899? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
florin-akermann commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1493006495 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java: ## @@ -816,10 +816,12 @@ public void testWindowing() { stream1 = builder.stream(topic1, consumed); stream2 = builder.stream(topic2, consumed); +final Duration timeDifference = ofMillis(100L); +final Duration grace = ofMillis(104); Review Comment: Good idea with the sensor. Just for me to understand, is 150 an arbitrarily chosen value or how did you come up with it? Wouldn't it make sense to set it to (max_timestamp in test case - min_timestamp in test case) e.g. 1104 - 899 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16202: Extra dot in error message in producer [kafka]
infantlikesprogramming commented on code in PR #15296: URL: https://github.com/apache/kafka/pull/15296#discussion_r1492998071 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -704,7 +704,7 @@ private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionRespons "topic-partition may not exist or the user may not have Describe access to it", batch.topicPartition); } else { -log.warn("Received invalid metadata error in produce request on partition {} due to {}. Going " + +log.warn("Received invalid metadata error in produce request on partition {} due to {} Going " + "to request metadata update now", batch.topicPartition, Review Comment: This makes a lot of sense. Thank you so much @appchemist. It took me a while to understand the code. I will fix this in my next commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Improve ListConsumerGroupTest.testListGroupCommand [kafka]
dajac commented on code in PR #15382: URL: https://github.com/apache/kafka/pull/15382#discussion_r1492992453 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java: ## @@ -123,42 +128,89 @@ public void testListGroupCommand(String quorum) throws Exception { String simpleGroup = "simple-group"; addSimpleGroupExecutor(simpleGroup); addConsumerGroupExecutor(1); -final AtomicReference out = new AtomicReference<>(""); -String[] cgcArgs1 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list"}; -TestUtils.waitForCondition(() -> { -out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { -ConsumerGroupCommand.main(cgcArgs1); -return null; -})); -return !out.get().contains("STATE") && out.get().contains(simpleGroup) && out.get().contains(GROUP); -}, "Expected to find " + simpleGroup + ", " + GROUP + " and no header, but found " + out.get()); - -String[] cgcArgs2 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state"}; -TestUtils.waitForCondition(() -> { -out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { -ConsumerGroupCommand.main(cgcArgs2); -return null; -})); -return out.get().contains("STATE") && out.get().contains(simpleGroup) && out.get().contains(GROUP); -}, "Expected to find " + simpleGroup + ", " + GROUP + " and the header, but found " + out.get()); - -String[] cgcArgs3 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "Stable"}; -TestUtils.waitForCondition(() -> { -out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { -ConsumerGroupCommand.main(cgcArgs3); -return null; -})); -return out.get().contains("STATE") && out.get().contains(GROUP) && out.get().contains("Stable"); -}, "Expected to find " + GROUP + " in state Stable and the header, but found " + out.get()); - -String[] cgcArgs4 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "stable"}; +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list"), +Collections.emptyList(), +mkSet( +Collections.singletonList(GROUP), +Collections.singletonList(simpleGroup) +) +); + +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state"), +Arrays.asList("GROUP", "STATE"), +mkSet( +Arrays.asList(GROUP, "Stable"), +Arrays.asList(simpleGroup, "Empty") +) +); + +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "Stable"), +Arrays.asList("GROUP", "STATE"), +mkSet( +Arrays.asList(GROUP, "Stable") +) +); + +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "stable"), +Arrays.asList("GROUP", "STATE"), +mkSet( +Arrays.asList(GROUP, "Stable") +) +); +} + +/** + * Validates that the output of the list command corresponds to the expected values. + * + * @param args The arguments for the command line tool. + * @param expectedHeaderThe expected header as a list of strings; or an empty list + * if a header is not expected. + * @param expectedGroupsThe expected groups as a set of list of strings. The list + * of strings corresponds to the columns. Review Comment: I used `expectedRows`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15770: IQv2 must return immutable position [kafka]
mjsax commented on PR #15219: URL: https://github.com/apache/kafka/pull/15219#issuecomment-1949323481 @lucasbru Updates this PR. -- When I started to work on this PR, I thought that using `synchronized` would not work, because locking/unlocking would not always be method-local. But you are right, turns out it is method local, so I dropped `SynchronizedPosition`, what makes your other comments obsolete. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
florin-akermann commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1492958285 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -191,6 +191,10 @@ public void process(final Record record) { } } +private boolean isActiveWindow(final long timeFrom, final long timeTo) { +return sharedTimeTracker.streamTime >= timeFrom && timeTo + joinGraceMs >= sharedTimeTracker.streamTime; Review Comment: yes, indeed, it is redundant to check for `sharedTimeTracker.streamTime >= timeFrom` adjusted -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [Draft] KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
jolshan closed pull request #14731: [Draft] KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest URL: https://github.com/apache/kafka/pull/14731 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [Draft] KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
jolshan commented on PR #14731: URL: https://github.com/apache/kafka/pull/14731#issuecomment-1949185760 Closing in favor of https://github.com/apache/kafka/pull/15384 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
yyu1993 commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1492876179 ## tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java: ## @@ -436,16 +436,25 @@ public String name() { @Override public void addSubparser(Subparsers subparsers) { -subparsers.addParser(name()) +Subparser subparser = subparsers.addParser(name()) .help("list transactions"); + +subparser.addArgument("--duration-filter") +.help("filter duration of transaction in ms, only transactions running longer than this duration will be returned") Review Comment: Updated the help message in [81173c8](https://github.com/apache/kafka/pull/15384/commits/81173c89dbf50a98671694f99a051dad1e0b1821) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
yyu1993 commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1492875733 ## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ## @@ -278,12 +278,13 @@ class TransactionCoordinator(txnConfig: TransactionConfig, def handleListTransactions( filteredProducerIds: Set[Long], -filteredStates: Set[String] +filteredStates: Set[String], +durationFilter: Long = -1 Review Comment: Updated default value to -1L in other places for consistency in [81173c8](https://github.com/apache/kafka/pull/15384/commits/81173c89dbf50a98671694f99a051dad1e0b1821) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
yyu1993 commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1492874583 ## clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java: ## @@ -35,6 +35,7 @@ public class ListTransactionsOptions extends AbstractOptions filteredStates = Collections.emptySet(); private Set filteredProducerIds = Collections.emptySet(); +private Long durationFilter = 0L; Review Comment: Updated default value to -1L in [81173c8](https://github.com/apache/kafka/pull/15384/commits/81173c89dbf50a98671694f99a051dad1e0b1821) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
jolshan commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1492849366 ## tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java: ## @@ -436,16 +436,25 @@ public String name() { @Override public void addSubparser(Subparsers subparsers) { -subparsers.addParser(name()) +Subparser subparser = subparsers.addParser(name()) .help("list transactions"); + +subparser.addArgument("--duration-filter") +.help("filter duration of transaction in ms, only transactions running longer than this duration will be returned") Review Comment: we should mention the default/value that gives all running transactions. Providing 0 and getting a ton of transactions might be confusing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Improve ListConsumerGroupTest.testListGroupCommand [kafka]
rreddy-22 commented on code in PR #15382: URL: https://github.com/apache/kafka/pull/15382#discussion_r1492848158 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java: ## @@ -123,42 +128,89 @@ public void testListGroupCommand(String quorum) throws Exception { String simpleGroup = "simple-group"; addSimpleGroupExecutor(simpleGroup); addConsumerGroupExecutor(1); -final AtomicReference out = new AtomicReference<>(""); -String[] cgcArgs1 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list"}; -TestUtils.waitForCondition(() -> { -out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { -ConsumerGroupCommand.main(cgcArgs1); -return null; -})); -return !out.get().contains("STATE") && out.get().contains(simpleGroup) && out.get().contains(GROUP); -}, "Expected to find " + simpleGroup + ", " + GROUP + " and no header, but found " + out.get()); - -String[] cgcArgs2 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state"}; -TestUtils.waitForCondition(() -> { -out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { -ConsumerGroupCommand.main(cgcArgs2); -return null; -})); -return out.get().contains("STATE") && out.get().contains(simpleGroup) && out.get().contains(GROUP); -}, "Expected to find " + simpleGroup + ", " + GROUP + " and the header, but found " + out.get()); - -String[] cgcArgs3 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "Stable"}; -TestUtils.waitForCondition(() -> { -out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { -ConsumerGroupCommand.main(cgcArgs3); -return null; -})); -return out.get().contains("STATE") && out.get().contains(GROUP) && out.get().contains("Stable"); -}, "Expected to find " + GROUP + " in state Stable and the header, but found " + out.get()); - -String[] cgcArgs4 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "stable"}; +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list"), +Collections.emptyList(), +mkSet( +Collections.singletonList(GROUP), +Collections.singletonList(simpleGroup) +) +); + +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state"), +Arrays.asList("GROUP", "STATE"), +mkSet( +Arrays.asList(GROUP, "Stable"), +Arrays.asList(simpleGroup, "Empty") +) +); + +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "Stable"), +Arrays.asList("GROUP", "STATE"), +mkSet( +Arrays.asList(GROUP, "Stable") +) +); + +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "stable"), +Arrays.asList("GROUP", "STATE"), +mkSet( +Arrays.asList(GROUP, "Stable") +) +); +} + +/** + * Validates that the output of the list command corresponds to the expected values. + * + * @param args The arguments for the command line tool. + * @param expectedHeaderThe expected header as a list of strings; or an empty list + * if a header is not expected. + * @param expectedGroupsThe expected groups as a set of list of strings. The list + * of strings corresponds to the columns. + * @throws InterruptedException + */ +private static void validateListOutput( +List args, +List expectedHeader, +Set> expectedGroups +) throws InterruptedException { +final AtomicReference out = new AtomicReference<>(""); TestUtils.waitForCondition(() -> { -out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { -ConsumerGroupCommand.main(cgcArgs4); -return null; -})); -return out.get().contains("STATE") && out.get().contains(GROUP) && out.get().contains("Stable"); -}, "Expected to find " + GROUP + " in state Stable and the header, but found " + out.get()); +String output = runAndGrabConsoleOutput(args); +out.set(output); + +int index = 0; +String[] lines = output.split("\n"); + +// Parse the header if one is expected. +if
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
jolshan commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1492847691 ## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ## @@ -278,12 +278,13 @@ class TransactionCoordinator(txnConfig: TransactionConfig, def handleListTransactions( filteredProducerIds: Set[Long], -filteredStates: Set[String] +filteredStates: Set[String], +durationFilter: Long = -1 Review Comment: nit: this is -1 -- is that intended? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on PR #15323: URL: https://github.com/apache/kafka/pull/15323#issuecomment-1949077518 @ijuma thanks for flagging https://github.com/apache/kafka/pull/15376. @hachikuji Looks like this was going to add a test that tested the concurrent update of `Metadata`, and fetching `MetadataSnapshot`/`Cluster`. This is useful, so i have created a follow-up PR https://github.com/apache/kafka/pull/15385 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
jolshan commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1492846127 ## clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java: ## @@ -35,6 +35,7 @@ public class ListTransactionsOptions extends AbstractOptions filteredStates = Collections.emptySet(); private Set filteredProducerIds = Collections.emptySet(); +private Long durationFilter = 0L; Review Comment: nit: any reason we chose default to be zero and not -1? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Improve ListConsumerGroupTest.testListGroupCommand [kafka]
rreddy-22 commented on code in PR #15382: URL: https://github.com/apache/kafka/pull/15382#discussion_r1492844519 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java: ## @@ -123,42 +128,89 @@ public void testListGroupCommand(String quorum) throws Exception { String simpleGroup = "simple-group"; addSimpleGroupExecutor(simpleGroup); addConsumerGroupExecutor(1); -final AtomicReference out = new AtomicReference<>(""); -String[] cgcArgs1 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list"}; -TestUtils.waitForCondition(() -> { -out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { -ConsumerGroupCommand.main(cgcArgs1); -return null; -})); -return !out.get().contains("STATE") && out.get().contains(simpleGroup) && out.get().contains(GROUP); -}, "Expected to find " + simpleGroup + ", " + GROUP + " and no header, but found " + out.get()); - -String[] cgcArgs2 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state"}; -TestUtils.waitForCondition(() -> { -out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { -ConsumerGroupCommand.main(cgcArgs2); -return null; -})); -return out.get().contains("STATE") && out.get().contains(simpleGroup) && out.get().contains(GROUP); -}, "Expected to find " + simpleGroup + ", " + GROUP + " and the header, but found " + out.get()); - -String[] cgcArgs3 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "Stable"}; -TestUtils.waitForCondition(() -> { -out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { -ConsumerGroupCommand.main(cgcArgs3); -return null; -})); -return out.get().contains("STATE") && out.get().contains(GROUP) && out.get().contains("Stable"); -}, "Expected to find " + GROUP + " in state Stable and the header, but found " + out.get()); - -String[] cgcArgs4 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "stable"}; +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list"), +Collections.emptyList(), +mkSet( +Collections.singletonList(GROUP), +Collections.singletonList(simpleGroup) +) +); + +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state"), +Arrays.asList("GROUP", "STATE"), +mkSet( +Arrays.asList(GROUP, "Stable"), +Arrays.asList(simpleGroup, "Empty") +) +); + +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "Stable"), +Arrays.asList("GROUP", "STATE"), +mkSet( +Arrays.asList(GROUP, "Stable") +) +); + +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "stable"), +Arrays.asList("GROUP", "STATE"), +mkSet( +Arrays.asList(GROUP, "Stable") +) +); +} + +/** + * Validates that the output of the list command corresponds to the expected values. + * + * @param args The arguments for the command line tool. + * @param expectedHeaderThe expected header as a list of strings; or an empty list + * if a header is not expected. + * @param expectedGroupsThe expected groups as a set of list of strings. The list + * of strings corresponds to the columns. + * @throws InterruptedException + */ +private static void validateListOutput( +List args, +List expectedHeader, +Set> expectedGroups +) throws InterruptedException { +final AtomicReference out = new AtomicReference<>(""); TestUtils.waitForCondition(() -> { -out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { -ConsumerGroupCommand.main(cgcArgs4); -return null; -})); -return out.get().contains("STATE") && out.get().contains(GROUP) && out.get().contains("Stable"); -}, "Expected to find " + GROUP + " in state Stable and the header, but found " + out.get()); +String output = runAndGrabConsoleOutput(args); +out.set(output); + +int index = 0; +String[] lines = output.split("\n"); + +// Parse the header if one is expected. +if
[PR] KAFKA-16226 Add test for concurrently updatingMetadata and fetching snapshot/cluster [kafka]
msn-tldr opened a new pull request, #15385: URL: https://github.com/apache/kafka/pull/15385 This is a follow-up to https://github.com/apache/kafka/pull/15323. Metadata is typically updated concurrently in the background thread, and the MetadataSnapshot/Cluster are fetched & used in another thread(typically application thread). Make sure the concurrent update & read works as expected. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15665: Enforce partition reassignment should complete when all target replicas are in ISR [kafka]
jolshan merged PR #15359: URL: https://github.com/apache/kafka/pull/15359 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
yyu1993 opened a new pull request, #15384: URL: https://github.com/apache/kafka/pull/15384 Introduces a new filter in ListTransactionsRequest API. This enables caller to filter on transactions that have been running for longer than a certain duration of time. This PR includes the following changes: 1. bumps version for ListTransactionsRequest API to 1. Set the durationFilter to 0 when communicating with an older broker that does not support version 1. 2. bumps version for ListTransactionsResponse to 1 without changing the response structure. 3. adds durationFilter option to `kafka-transactions.sh --list` Tests: - Client side test to build request with correct combination of duration filter and API version. `testBuildRequestWithDurationFilter` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16264) Expose `producer.id.expiration.check.interval.ms` as dynamic broker configuration
[ https://issues.apache.org/jira/browse/KAFKA-16264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818064#comment-17818064 ] Justine Olshan commented on KAFKA-16264: Hey [~jeqo] thanks for filing. I was also thinking about this after you filed the first ticket. One thing that is interesting is right now the expiration is scheduled on a separate thread on startup. I guess the best course of action is to cancel that scheduled task and create a new one? See UnifiedLog producerExpireCheck. > Expose `producer.id.expiration.check.interval.ms` as dynamic broker > configuration > - > > Key: KAFKA-16264 > URL: https://issues.apache.org/jira/browse/KAFKA-16264 > Project: Kafka > Issue Type: Improvement >Reporter: Jorge Esteban Quilcate Otoya >Assignee: Jorge Esteban Quilcate Otoya >Priority: Major > > Dealing with a scenario where too many producer ids lead to issues (e.g. high > cpu utilization, see KAFKA-16229) put operators in need to flush producer ids > more promptly than usual. > Currently, only the expiration timeout `producer.id.expiration.ms` is exposed > as dynamic config. This is helpful (e.g. by reducing the timeout, less > producer would eventually be kept in memory), but not enough if the > evaluation frequency is not sufficiently short to flush producer ids before > becoming an issue. Only by tuning both, the issue could be workaround. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: WakeupTrigger cleanup [kafka]
kirktrue closed pull request #14752: MINOR: WakeupTrigger cleanup URL: https://github.com/apache/kafka/pull/14752 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Improve ListConsumerGroupTest.testListGroupCommand [kafka]
rreddy-22 commented on code in PR #15382: URL: https://github.com/apache/kafka/pull/15382#discussion_r1492799701 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java: ## @@ -123,42 +128,89 @@ public void testListGroupCommand(String quorum) throws Exception { String simpleGroup = "simple-group"; addSimpleGroupExecutor(simpleGroup); addConsumerGroupExecutor(1); -final AtomicReference out = new AtomicReference<>(""); -String[] cgcArgs1 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list"}; -TestUtils.waitForCondition(() -> { -out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { -ConsumerGroupCommand.main(cgcArgs1); -return null; -})); -return !out.get().contains("STATE") && out.get().contains(simpleGroup) && out.get().contains(GROUP); -}, "Expected to find " + simpleGroup + ", " + GROUP + " and no header, but found " + out.get()); - -String[] cgcArgs2 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state"}; -TestUtils.waitForCondition(() -> { -out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { -ConsumerGroupCommand.main(cgcArgs2); -return null; -})); -return out.get().contains("STATE") && out.get().contains(simpleGroup) && out.get().contains(GROUP); -}, "Expected to find " + simpleGroup + ", " + GROUP + " and the header, but found " + out.get()); - -String[] cgcArgs3 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "Stable"}; -TestUtils.waitForCondition(() -> { -out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { -ConsumerGroupCommand.main(cgcArgs3); -return null; -})); -return out.get().contains("STATE") && out.get().contains(GROUP) && out.get().contains("Stable"); -}, "Expected to find " + GROUP + " in state Stable and the header, but found " + out.get()); - -String[] cgcArgs4 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "stable"}; +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list"), +Collections.emptyList(), +mkSet( +Collections.singletonList(GROUP), +Collections.singletonList(simpleGroup) +) +); + +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state"), +Arrays.asList("GROUP", "STATE"), +mkSet( +Arrays.asList(GROUP, "Stable"), +Arrays.asList(simpleGroup, "Empty") +) +); + +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "Stable"), +Arrays.asList("GROUP", "STATE"), +mkSet( +Arrays.asList(GROUP, "Stable") +) +); + +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "stable"), +Arrays.asList("GROUP", "STATE"), +mkSet( +Arrays.asList(GROUP, "Stable") +) +); +} + +/** + * Validates that the output of the list command corresponds to the expected values. + * + * @param args The arguments for the command line tool. + * @param expectedHeaderThe expected header as a list of strings; or an empty list + * if a header is not expected. + * @param expectedGroupsThe expected groups as a set of list of strings. The list + * of strings corresponds to the columns. + * @throws InterruptedException + */ +private static void validateListOutput( Review Comment: The return type should be boolean no? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Improve ListConsumerGroupTest.testListGroupCommand [kafka]
jolshan commented on code in PR #15382: URL: https://github.com/apache/kafka/pull/15382#discussion_r1492801866 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java: ## @@ -123,42 +128,89 @@ public void testListGroupCommand(String quorum) throws Exception { String simpleGroup = "simple-group"; addSimpleGroupExecutor(simpleGroup); addConsumerGroupExecutor(1); -final AtomicReference out = new AtomicReference<>(""); -String[] cgcArgs1 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list"}; -TestUtils.waitForCondition(() -> { -out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { -ConsumerGroupCommand.main(cgcArgs1); -return null; -})); -return !out.get().contains("STATE") && out.get().contains(simpleGroup) && out.get().contains(GROUP); -}, "Expected to find " + simpleGroup + ", " + GROUP + " and no header, but found " + out.get()); - -String[] cgcArgs2 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state"}; -TestUtils.waitForCondition(() -> { -out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { -ConsumerGroupCommand.main(cgcArgs2); -return null; -})); -return out.get().contains("STATE") && out.get().contains(simpleGroup) && out.get().contains(GROUP); -}, "Expected to find " + simpleGroup + ", " + GROUP + " and the header, but found " + out.get()); - -String[] cgcArgs3 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "Stable"}; -TestUtils.waitForCondition(() -> { -out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { -ConsumerGroupCommand.main(cgcArgs3); -return null; -})); -return out.get().contains("STATE") && out.get().contains(GROUP) && out.get().contains("Stable"); -}, "Expected to find " + GROUP + " in state Stable and the header, but found " + out.get()); - -String[] cgcArgs4 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "stable"}; +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list"), +Collections.emptyList(), +mkSet( +Collections.singletonList(GROUP), +Collections.singletonList(simpleGroup) +) +); + +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state"), +Arrays.asList("GROUP", "STATE"), +mkSet( +Arrays.asList(GROUP, "Stable"), +Arrays.asList(simpleGroup, "Empty") +) +); + +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "Stable"), +Arrays.asList("GROUP", "STATE"), +mkSet( +Arrays.asList(GROUP, "Stable") +) +); + +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "stable"), +Arrays.asList("GROUP", "STATE"), +mkSet( +Arrays.asList(GROUP, "Stable") +) +); +} + +/** + * Validates that the output of the list command corresponds to the expected values. + * + * @param args The arguments for the command line tool. + * @param expectedHeaderThe expected header as a list of strings; or an empty list + * if a header is not expected. + * @param expectedGroupsThe expected groups as a set of list of strings. The list + * of strings corresponds to the columns. + * @throws InterruptedException + */ +private static void validateListOutput( Review Comment: The method itself does the asserts, so I think it is ok to be void. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Improve ListConsumerGroupTest.testListGroupCommand [kafka]
rreddy-22 commented on code in PR #15382: URL: https://github.com/apache/kafka/pull/15382#discussion_r1492799701 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java: ## @@ -123,42 +128,89 @@ public void testListGroupCommand(String quorum) throws Exception { String simpleGroup = "simple-group"; addSimpleGroupExecutor(simpleGroup); addConsumerGroupExecutor(1); -final AtomicReference out = new AtomicReference<>(""); -String[] cgcArgs1 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list"}; -TestUtils.waitForCondition(() -> { -out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { -ConsumerGroupCommand.main(cgcArgs1); -return null; -})); -return !out.get().contains("STATE") && out.get().contains(simpleGroup) && out.get().contains(GROUP); -}, "Expected to find " + simpleGroup + ", " + GROUP + " and no header, but found " + out.get()); - -String[] cgcArgs2 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state"}; -TestUtils.waitForCondition(() -> { -out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { -ConsumerGroupCommand.main(cgcArgs2); -return null; -})); -return out.get().contains("STATE") && out.get().contains(simpleGroup) && out.get().contains(GROUP); -}, "Expected to find " + simpleGroup + ", " + GROUP + " and the header, but found " + out.get()); - -String[] cgcArgs3 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "Stable"}; -TestUtils.waitForCondition(() -> { -out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { -ConsumerGroupCommand.main(cgcArgs3); -return null; -})); -return out.get().contains("STATE") && out.get().contains(GROUP) && out.get().contains("Stable"); -}, "Expected to find " + GROUP + " in state Stable and the header, but found " + out.get()); - -String[] cgcArgs4 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "stable"}; +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list"), +Collections.emptyList(), +mkSet( +Collections.singletonList(GROUP), +Collections.singletonList(simpleGroup) +) +); + +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state"), +Arrays.asList("GROUP", "STATE"), +mkSet( +Arrays.asList(GROUP, "Stable"), +Arrays.asList(simpleGroup, "Empty") +) +); + +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "Stable"), +Arrays.asList("GROUP", "STATE"), +mkSet( +Arrays.asList(GROUP, "Stable") +) +); + +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "stable"), +Arrays.asList("GROUP", "STATE"), +mkSet( +Arrays.asList(GROUP, "Stable") +) +); +} + +/** + * Validates that the output of the list command corresponds to the expected values. + * + * @param args The arguments for the command line tool. + * @param expectedHeaderThe expected header as a list of strings; or an empty list + * if a header is not expected. + * @param expectedGroupsThe expected groups as a set of list of strings. The list + * of strings corresponds to the columns. + * @throws InterruptedException + */ +private static void validateListOutput( Review Comment: The return type should be boolean no? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Improve ListConsumerGroupTest.testListGroupCommand [kafka]
jolshan commented on code in PR #15382: URL: https://github.com/apache/kafka/pull/15382#discussion_r1492798225 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java: ## @@ -123,42 +128,89 @@ public void testListGroupCommand(String quorum) throws Exception { String simpleGroup = "simple-group"; addSimpleGroupExecutor(simpleGroup); addConsumerGroupExecutor(1); -final AtomicReference out = new AtomicReference<>(""); -String[] cgcArgs1 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list"}; -TestUtils.waitForCondition(() -> { -out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { -ConsumerGroupCommand.main(cgcArgs1); -return null; -})); -return !out.get().contains("STATE") && out.get().contains(simpleGroup) && out.get().contains(GROUP); -}, "Expected to find " + simpleGroup + ", " + GROUP + " and no header, but found " + out.get()); - -String[] cgcArgs2 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state"}; -TestUtils.waitForCondition(() -> { -out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { -ConsumerGroupCommand.main(cgcArgs2); -return null; -})); -return out.get().contains("STATE") && out.get().contains(simpleGroup) && out.get().contains(GROUP); -}, "Expected to find " + simpleGroup + ", " + GROUP + " and the header, but found " + out.get()); - -String[] cgcArgs3 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "Stable"}; -TestUtils.waitForCondition(() -> { -out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { -ConsumerGroupCommand.main(cgcArgs3); -return null; -})); -return out.get().contains("STATE") && out.get().contains(GROUP) && out.get().contains("Stable"); -}, "Expected to find " + GROUP + " in state Stable and the header, but found " + out.get()); - -String[] cgcArgs4 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "stable"}; +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list"), +Collections.emptyList(), +mkSet( +Collections.singletonList(GROUP), +Collections.singletonList(simpleGroup) +) +); + +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state"), +Arrays.asList("GROUP", "STATE"), +mkSet( +Arrays.asList(GROUP, "Stable"), +Arrays.asList(simpleGroup, "Empty") +) +); + +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "Stable"), +Arrays.asList("GROUP", "STATE"), +mkSet( +Arrays.asList(GROUP, "Stable") +) +); + +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "stable"), +Arrays.asList("GROUP", "STATE"), +mkSet( +Arrays.asList(GROUP, "Stable") +) +); +} + +/** + * Validates that the output of the list command corresponds to the expected values. + * + * @param args The arguments for the command line tool. + * @param expectedHeaderThe expected header as a list of strings; or an empty list + * if a header is not expected. + * @param expectedGroupsThe expected groups as a set of list of strings. The list + * of strings corresponds to the columns. Review Comment: as an aside, I like that we are being more thorough with checking the components are in the right rows and columns -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Improve ListConsumerGroupTest.testListGroupCommand [kafka]
rreddy-22 commented on code in PR #15382: URL: https://github.com/apache/kafka/pull/15382#discussion_r1492788275 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java: ## @@ -123,42 +128,89 @@ public void testListGroupCommand(String quorum) throws Exception { String simpleGroup = "simple-group"; addSimpleGroupExecutor(simpleGroup); addConsumerGroupExecutor(1); -final AtomicReference out = new AtomicReference<>(""); -String[] cgcArgs1 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list"}; -TestUtils.waitForCondition(() -> { -out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { -ConsumerGroupCommand.main(cgcArgs1); -return null; -})); -return !out.get().contains("STATE") && out.get().contains(simpleGroup) && out.get().contains(GROUP); -}, "Expected to find " + simpleGroup + ", " + GROUP + " and no header, but found " + out.get()); - -String[] cgcArgs2 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state"}; -TestUtils.waitForCondition(() -> { -out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { -ConsumerGroupCommand.main(cgcArgs2); -return null; -})); -return out.get().contains("STATE") && out.get().contains(simpleGroup) && out.get().contains(GROUP); -}, "Expected to find " + simpleGroup + ", " + GROUP + " and the header, but found " + out.get()); - -String[] cgcArgs3 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "Stable"}; -TestUtils.waitForCondition(() -> { -out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { -ConsumerGroupCommand.main(cgcArgs3); -return null; -})); -return out.get().contains("STATE") && out.get().contains(GROUP) && out.get().contains("Stable"); -}, "Expected to find " + GROUP + " in state Stable and the header, but found " + out.get()); - -String[] cgcArgs4 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "stable"}; +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list"), +Collections.emptyList(), +mkSet( +Collections.singletonList(GROUP), +Collections.singletonList(simpleGroup) +) +); + +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state"), +Arrays.asList("GROUP", "STATE"), +mkSet( +Arrays.asList(GROUP, "Stable"), +Arrays.asList(simpleGroup, "Empty") +) +); + +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "Stable"), +Arrays.asList("GROUP", "STATE"), +mkSet( +Arrays.asList(GROUP, "Stable") +) +); + +validateListOutput( +Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "stable"), +Arrays.asList("GROUP", "STATE"), +mkSet( +Arrays.asList(GROUP, "Stable") +) +); +} + +/** + * Validates that the output of the list command corresponds to the expected values. + * + * @param args The arguments for the command line tool. + * @param expectedHeaderThe expected header as a list of strings; or an empty list + * if a header is not expected. + * @param expectedGroupsThe expected groups as a set of list of strings. The list + * of strings corresponds to the columns. Review Comment: can we say expected* columns? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14588 ZK configuration moved to ZkConfig [kafka]
nizhikov commented on PR #15075: URL: https://github.com/apache/kafka/pull/15075#issuecomment-1948922545 Hello @mimaison @ijuma This PR is ready for review. Please, take a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16265) Add durationFilter to ListTransactionsRequest
Yang Yu created KAFKA-16265: --- Summary: Add durationFilter to ListTransactionsRequest Key: KAFKA-16265 URL: https://issues.apache.org/jira/browse/KAFKA-16265 Project: Kafka Issue Type: Sub-task Reporter: Yang Yu * Add durationFilter field to ListTransactionsRequest, make corresponding server side changes * Make appropriate version bumps for ListTransactionsRequest and ListTransationsResponse * Add durationFilter option to kafka-transactions.sh --list -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16266) Introduce TransactionLastUpdateTimeMs tagged field to DescribeTransactionsResponse
Yang Yu created KAFKA-16266: --- Summary: Introduce TransactionLastUpdateTimeMs tagged field to DescribeTransactionsResponse Key: KAFKA-16266 URL: https://issues.apache.org/jira/browse/KAFKA-16266 Project: Kafka Issue Type: Sub-task Reporter: Yang Yu Introduce TransactionLastUpdateTimeMs tagged field to DescribeTransactionsResponse. Make broker side changes to send this bit of information. Also, make changes to `kafka-transactions.sh --describe` tooling to display this new piece of information to the output. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
mjsax commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1492754199 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -609,17 +842,18 @@ public void testOrdering() { inputTopic1.pipeInput(1, "A1", 100L); processor.checkAndClearProcessResult(); -// push one item to the other window that has a join; this should produce non-joined records with a closed window first, then -// the joined records +// push one item to the other window that has a join; +// this should produce the joined record first; +// then non-joined record with a closed window // by the time they were produced before // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // w2 = { } // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // --> w2 = { 1:a1 (ts: 110) } inputTopic2.pipeInput(1, "a1", 110L); processor.checkAndClearProcessResult( -new KeyValueTimestamp<>(0, "A0+null", 0L), -new KeyValueTimestamp<>(1, "A1+a1", 110L) +new KeyValueTimestamp<>(1, "A1+a1", 110L), +new KeyValueTimestamp<>(0, "A0+null", 0L) Review Comment: Seems this PR is basically ready for merging, so it might be faster to go with option (2), and revert changing the order in this PR and we can merge it. -- Of course, I would want to make a final pass after the change, to check the testing code again to verify correctness. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
mjsax commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1492754199 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -609,17 +842,18 @@ public void testOrdering() { inputTopic1.pipeInput(1, "A1", 100L); processor.checkAndClearProcessResult(); -// push one item to the other window that has a join; this should produce non-joined records with a closed window first, then -// the joined records +// push one item to the other window that has a join; +// this should produce the joined record first; +// then non-joined record with a closed window // by the time they were produced before // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // w2 = { } // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // --> w2 = { 1:a1 (ts: 110) } inputTopic2.pipeInput(1, "a1", 110L); processor.checkAndClearProcessResult( -new KeyValueTimestamp<>(0, "A0+null", 0L), -new KeyValueTimestamp<>(1, "A1+a1", 110L) +new KeyValueTimestamp<>(1, "A1+a1", 110L), +new KeyValueTimestamp<>(0, "A0+null", 0L) Review Comment: Seems this PR is basically ready for merging, to it might be faster to go with option (2), and revert changing the order in this PR and we can merge it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
mjsax commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1492754199 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -609,17 +842,18 @@ public void testOrdering() { inputTopic1.pipeInput(1, "A1", 100L); processor.checkAndClearProcessResult(); -// push one item to the other window that has a join; this should produce non-joined records with a closed window first, then -// the joined records +// push one item to the other window that has a join; +// this should produce the joined record first; +// then non-joined record with a closed window // by the time they were produced before // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // w2 = { } // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // --> w2 = { 1:a1 (ts: 110) } inputTopic2.pipeInput(1, "a1", 110L); processor.checkAndClearProcessResult( -new KeyValueTimestamp<>(0, "A0+null", 0L), -new KeyValueTimestamp<>(1, "A1+a1", 110L) +new KeyValueTimestamp<>(1, "A1+a1", 110L), +new KeyValueTimestamp<>(0, "A0+null", 0L) Review Comment: Seems this PR is basically ready for merging, so it might be faster to go with option (2), and revert changing the order in this PR and we can merge it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: add note about Kafka Streams feature for 3.7 release [kafka]
mjsax merged PR #15380: URL: https://github.com/apache/kafka/pull/15380 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16243: Make sure that we do not exceed max poll interval inside poll [kafka]
lucasbru commented on PR #15372: URL: https://github.com/apache/kafka/pull/15372#issuecomment-1948876670 Hey @mjsax . This PR is right now waiting for @cadonna's review. If you have time, you could take a look to get into the 848 work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16009: Fix PlaintextConsumerTest. testMaxPollIntervalMsDelayInRevocation [kafka]
lucasbru commented on PR #15383: URL: https://github.com/apache/kafka/pull/15383#issuecomment-1948876369 Hey @mjsax . This PR is right now waiting for @cadonna's review. If you have time, you could take a look to get into the 848 work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]
rreddy-22 commented on code in PR #15150: URL: https://github.com/apache/kafka/pull/15150#discussion_r1492719703 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java: ## @@ -143,22 +312,134 @@ public void testListGroupCommand(String quorum) throws Exception { return out.get().contains("STATE") && out.get().contains(simpleGroup) && out.get().contains(GROUP); }, "Expected to find " + simpleGroup + ", " + GROUP + " and the header, but found " + out.get()); -String[] cgcArgs3 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "Stable"}; +String[] cgcArgs3 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--type"}; TestUtils.waitForCondition(() -> { out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { ConsumerGroupCommand.main(cgcArgs3); return null; })); +return out.get().contains("TYPE") && !out.get().contains("STATE") && out.get().contains(simpleGroup) && out.get().contains(GROUP); +}, "Expected to find " + simpleGroup + ", " + GROUP + " and the header, but found " + out.get()); + +String[] cgcArgs4 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "--type"}; +TestUtils.waitForCondition(() -> { +out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { +ConsumerGroupCommand.main(cgcArgs4); +return null; +})); +return out.get().contains("TYPE") && out.get().contains("STATE") && out.get().contains(simpleGroup) && out.get().contains(GROUP); +}, "Expected to find " + simpleGroup + ", " + GROUP + " and the header, but found " + out.get()); + +String[] cgcArgs5 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "Stable"}; +TestUtils.waitForCondition(() -> { +out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { +ConsumerGroupCommand.main(cgcArgs5); +return null; +})); return out.get().contains("STATE") && out.get().contains(GROUP) && out.get().contains("Stable"); }, "Expected to find " + GROUP + " in state Stable and the header, but found " + out.get()); -String[] cgcArgs4 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "stable"}; +String[] cgcArgs6 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "stable"}; TestUtils.waitForCondition(() -> { out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { -ConsumerGroupCommand.main(cgcArgs4); +ConsumerGroupCommand.main(cgcArgs6); return null; })); return out.get().contains("STATE") && out.get().contains(GROUP) && out.get().contains("Stable"); }, "Expected to find " + GROUP + " in state Stable and the header, but found " + out.get()); + +String[] cgcArgs7 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--type", "Classic"}; +TestUtils.waitForCondition(() -> { +out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { +ConsumerGroupCommand.main(cgcArgs7); +return null; +})); +return out.get().contains("TYPE") && out.get().contains("Classic") && !out.get().contains("STATE") && +out.get().contains(simpleGroup) && out.get().contains(GROUP); +}, "Expected to find " + GROUP + " and the header, but found " + out.get()); + +String[] cgcArgs8 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--type", "classic"}; +TestUtils.waitForCondition(() -> { +out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { +ConsumerGroupCommand.main(cgcArgs8); +return null; +})); +return out.get().contains("TYPE") && out.get().contains("Classic") && !out.get().contains("STATE") && +out.get().contains(simpleGroup) && out.get().contains(GROUP); +}, "Expected to find " + GROUP + " and the header, but found " + out.get()); Review Comment: Thanks a lot! I'll take a look! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]
dajac commented on code in PR #15150: URL: https://github.com/apache/kafka/pull/15150#discussion_r1492698285 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java: ## @@ -20,78 +20,225 @@ import kafka.admin.ConsumerGroupCommand; import org.apache.kafka.clients.admin.ConsumerGroupListing; import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.GroupType; import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.params.provider.MethodSource; import java.util.Arrays; import java.util.Collections; +import java.util.EnumSet; +import java.util.HashSet; import java.util.Objects; import java.util.Optional; +import java.util.Properties; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; public class ListConsumerGroupTest extends ConsumerGroupCommandTest { -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testListConsumerGroups(String quorum) throws Exception { +@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) +@MethodSource("getTestQuorumAndGroupProtocolParametersAll") +public void testListConsumerGroupsWithoutFilters(String quorum, String groupProtocol) throws Exception { String simpleGroup = "simple-group"; + +createOffsetsTopic(listenerName(), new Properties()); + addSimpleGroupExecutor(simpleGroup); addConsumerGroupExecutor(1); +addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol); String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list"}; ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); -scala.collection.Set expectedGroups = set(Arrays.asList(GROUP, simpleGroup)); + +scala.collection.Set expectedGroups = set(Arrays.asList(GROUP, simpleGroup, PROTOCOL_GROUP)); final AtomicReference foundGroups = new AtomicReference<>(); + TestUtils.waitForCondition(() -> { foundGroups.set(service.listConsumerGroups().toSet()); return Objects.equals(expectedGroups, foundGroups.get()); }, "Expected --list to show groups " + expectedGroups + ", but found " + foundGroups.get() + "."); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testListWithUnrecognizedNewConsumerOption() { +@Test +public void testListWithUnrecognizedNewConsumerOption() throws Exception { String[] cgcArgs = new String[]{"--new-consumer", "--bootstrap-server", bootstrapServers(listenerName()), "--list"}; assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testListConsumerGroupsWithStates() throws Exception { +@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) +@MethodSource("getTestQuorumAndGroupProtocolParametersAll") +public void testListConsumerGroupsWithStates(String quorum, String groupProtocol) throws Exception { Review Comment: Sorry, I very likely missed your reply. It would be better to change it in my opinion. We could pass the group protocol to addConsumerGroupExecutor and adapt the assertions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]
rreddy-22 commented on code in PR #15150: URL: https://github.com/apache/kafka/pull/15150#discussion_r1492695529 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java: ## @@ -20,78 +20,225 @@ import kafka.admin.ConsumerGroupCommand; import org.apache.kafka.clients.admin.ConsumerGroupListing; import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.GroupType; import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.params.provider.MethodSource; import java.util.Arrays; import java.util.Collections; +import java.util.EnumSet; +import java.util.HashSet; import java.util.Objects; import java.util.Optional; +import java.util.Properties; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; public class ListConsumerGroupTest extends ConsumerGroupCommandTest { -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testListConsumerGroups(String quorum) throws Exception { +@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) +@MethodSource("getTestQuorumAndGroupProtocolParametersAll") +public void testListConsumerGroupsWithoutFilters(String quorum, String groupProtocol) throws Exception { String simpleGroup = "simple-group"; + +createOffsetsTopic(listenerName(), new Properties()); + addSimpleGroupExecutor(simpleGroup); addConsumerGroupExecutor(1); +addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol); String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list"}; ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); -scala.collection.Set expectedGroups = set(Arrays.asList(GROUP, simpleGroup)); + +scala.collection.Set expectedGroups = set(Arrays.asList(GROUP, simpleGroup, PROTOCOL_GROUP)); final AtomicReference foundGroups = new AtomicReference<>(); + TestUtils.waitForCondition(() -> { foundGroups.set(service.listConsumerGroups().toSet()); return Objects.equals(expectedGroups, foundGroups.get()); }, "Expected --list to show groups " + expectedGroups + ", but found " + foundGroups.get() + "."); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testListWithUnrecognizedNewConsumerOption() { +@Test +public void testListWithUnrecognizedNewConsumerOption() throws Exception { String[] cgcArgs = new String[]{"--new-consumer", "--bootstrap-server", bootstrapServers(listenerName()), "--list"}; assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testListConsumerGroupsWithStates() throws Exception { +@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) +@MethodSource("getTestQuorumAndGroupProtocolParametersAll") +public void testListConsumerGroupsWithStates(String quorum, String groupProtocol) throws Exception { Review Comment: I think we talked about this before, I thought we didn't want to make changes to the states in this PR so I had left it so we can improve it in another small PR, but I can change it now, I might've missed communicating that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16243: Make sure that we do not exceed max poll interval inside poll [kafka]
lianetm commented on PR #15372: URL: https://github.com/apache/kafka/pull/15372#issuecomment-1948611367 I like the last commit msg :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16009: Fix PlaintextConsumerTest. testMaxPollIntervalMsDelayInRevocation [kafka]
lucasbru commented on code in PR #15383: URL: https://github.com/apache/kafka/pull/15383#discussion_r1492621621 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -419,6 +422,35 @@ public void testWakeupAfterNonEmptyFetch() { assertThrows(WakeupException.class, () -> consumer.poll(Duration.ZERO)); } +@Test +public void testCommitInCommitCallback() { Review Comment: oops -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16009: Fix PlaintextConsumerTest. testMaxPollIntervalMsDelayInRevocation [kafka]
lianetm commented on code in PR #15383: URL: https://github.com/apache/kafka/pull/15383#discussion_r1492613876 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -419,6 +422,35 @@ public void testWakeupAfterNonEmptyFetch() { assertThrows(WakeupException.class, () -> consumer.poll(Duration.ZERO)); } +@Test +public void testCommitInCommitCallback() { Review Comment: We're resting here the commit on rebalance callback, so name maybe `testCommitInRebalanceCallback` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
dajac commented on PR #15364: URL: https://github.com/apache/kafka/pull/15364#issuecomment-1948535272 @jeffkbkim Thanks for your comments. I have addressed them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
dajac commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1492587334 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -251,195 +255,51 @@ private ConsumerGroupMember transitionToNewTargetAssignmentState() { } if (!newPartitionsPendingRevocation.isEmpty()) { -// If the partition pending revocation set is not empty, we transition the -// member to revoking and keep the current epoch. The transition to the new -// state is done when the member is updated. +// If there are partitions to be revoked, the member remains in its current +// epoch and requests the revocation of those partitions. It transitions to +// the UNREVOKED_PARTITIONS state to wait until the client acknowledges the +// revocation of the partitions. return new ConsumerGroupMember.Builder(member) +.setState(MemberState.UNREVOKED_PARTITIONS) +.updateMemberEpoch(memberEpoch) .setAssignedPartitions(newAssignedPartitions) -.setPartitionsPendingRevocation(newPartitionsPendingRevocation) -.setPartitionsPendingAssignment(newPartitionsPendingAssignment) -.setTargetMemberEpoch(targetAssignmentEpoch) +.setRevokedPartitions(newPartitionsPendingRevocation) Review Comment: Alright. Let me change it back. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
dajac commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1492585893 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -251,195 +255,51 @@ private ConsumerGroupMember transitionToNewTargetAssignmentState() { } if (!newPartitionsPendingRevocation.isEmpty()) { -// If the partition pending revocation set is not empty, we transition the -// member to revoking and keep the current epoch. The transition to the new -// state is done when the member is updated. +// If there are partitions to be revoked, the member remains in its current +// epoch and requests the revocation of those partitions. It transitions to +// the UNREVOKED_PARTITIONS state to wait until the client acknowledges the Review Comment: Correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
dajac commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1492583068 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -170,72 +127,119 @@ public CurrentAssignmentBuilder withOwnedTopicPartitions( * @return A new ConsumerGroupMember or the current one. */ public ConsumerGroupMember build() { -// A new target assignment has been installed, we need to restart -// the reconciliation loop from the beginning. -if (targetAssignmentEpoch != member.targetMemberEpoch()) { -return transitionToNewTargetAssignmentState(); -} - switch (member.state()) { -// Check if the partitions have been revoked by the member. -case REVOKING: -return maybeTransitionFromRevokingToAssigningOrStable(); +case STABLE: +// When the member is in the STABLE state, we verify if a newer +// epoch (or target assignment) is available. If it is, we can +// reconcile the member towards it. Otherwise, we return. +if (member.memberEpoch() != targetAssignmentEpoch) { +return computeNextAssignment( +member.memberEpoch(), +member.assignedPartitions() +); +} else { +return member; +} -// Check if pending partitions have been freed up. -case ASSIGNING: -return maybeTransitionFromAssigningToAssigningOrStable(); +case UNREVOKED_PARTITIONS: +// When the member is in the UNREVOKED_PARTITIONS state, we wait +// until the member has revoked the necessary partitions. They are +// considered revoked when they are not anymore reported in the +// owned partitions set in the ConsumerGroupHeartbeat API. -// Nothing to do. -case STABLE: -return member; +// If the member does not provide its owned partitions. We cannot +// progress. +if (ownedTopicPartitions == null) { +return member; +} + +// If the member provides its owned partitions. We verify if it still +// owens any of the revoked partitions. If it does, we cannot progress. +for (ConsumerGroupHeartbeatRequestData.TopicPartitions topicPartitions : ownedTopicPartitions) { +for (Integer partitionId : topicPartitions.partitions()) { +boolean stillHasRevokedPartition = member +.revokedPartitions() +.getOrDefault(topicPartitions.topicId(), Collections.emptySet()) +.contains(partitionId); +if (stillHasRevokedPartition) { +return member; +} +} +} + +// When the member has revoked all the pending partitions, it can +// transition to the next epoch (current + 1) and we can reconcile +// its state towards the latest target assignment. +return computeNextAssignment( +member.memberEpoch() + 1, Review Comment: Yes, it does in the code you extracted. Let's take an example: 1) Member A has partitions 1 and 2 in epoch 10. 2) Target assignment changes the assignment of A to 2 in epoch 11. 3) Member A enters the UNREVOKED_PARTITIONS state to revoke 1 and stays in 10. 4) Target assignment changes the assignment of A to 3 in epoch 12 (based an HB from another member). 5) A comes back to acknowledge that 1 is gone. Now A can transitions to epoch 11 because it has revoked the partitions presenting him to do so earlier. However, he cannot transition to epoch 12 yet because it has to revoke 2 first. If we don't do this, A would remain in epoch 10 while revoking 2 and the rebalance timeout would not be reset. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
dajac commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1492574501 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -779,7 +779,7 @@ private void maybeUpdateGroupState() { newState = ASSIGNING; } else { for (ConsumerGroupMember member : members.values()) { -if (member.targetMemberEpoch() != targetAssignmentEpoch.get() || member.state() != ConsumerGroupMember.MemberState.STABLE) { +if (!member.isReconciledTo(targetAssignmentEpoch.get())) { Review Comment: The member epoch has not changed. A member is fully reconciled when it is Stable in the latest epoch (targetAssignmentEpoch). The previous condition was a bit weird, I agree. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java: ## @@ -369,20 +329,12 @@ public String toString() { /** * The partitions being revoked by this member. */ -private final Map> partitionsPendingRevocation; - -/** - * The partitions waiting to be assigned to this - * member. They will be assigned when they are - * released by their previous owners. - */ -private final Map> partitionsPendingAssignment; +private final Map> revokedPartitions; Review Comment: Nope. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16009) Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation
[ https://issues.apache.org/jira/browse/KAFKA-16009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-16009: -- Assignee: Lucas Brutschy (was: Kirk True) > Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation > > > Key: KAFKA-16009 > URL: https://issues.apache.org/jira/browse/KAFKA-16009 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Lucas Brutschy >Priority: Critical > Labels: consumer-threading-refactor, integration-tests, timeout > Fix For: 3.8.0 > > > The integration test > {{PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation}} is failing > when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Timed out before expected rebalance > completed > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317) > at > kafka.api.PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation(PlaintextConsumerTest.scala:235) > {code} > The logs include this line: > > {code} > [2023-12-13 15:20:27,824] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
dajac commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1492567552 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ## @@ -346,10 +346,9 @@ public static Record newCurrentAssignmentRecord( new ConsumerGroupCurrentMemberAssignmentValue() .setMemberEpoch(member.memberEpoch()) .setPreviousMemberEpoch(member.previousMemberEpoch()) -.setTargetMemberEpoch(member.targetMemberEpoch()) +.setState(member.state().value()) Review Comment: It was mainly used to determine whether a new assignment was required if its target epoch was stale. The state with the previous schema was basically determined by the target member epoch, the partitions pending revocation and the partitions pending assignment. It was a bit weakly defined. Having a proper state seems much better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
dajac commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1492563558 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1211,13 +1192,99 @@ private CoordinatorResult consumerGr // 1. The member reported its owned partitions; // 2. The member just joined or rejoined to group (epoch equals to zero); // 3. The member's assignment has been updated. -if (ownedTopicPartitions != null || memberEpoch == 0 || assignmentUpdated) { +if (ownedTopicPartitions != null || memberEpoch == 0 || hasAssignedPartitionsChanged(member, updatedMember)) { response.setAssignment(createResponseAssignment(updatedMember)); } return new CoordinatorResult<>(records, response); } +/** + * Reconciles the current assignment of the member if needed. + * + * @param groupId The group id. + * @param memberThe member to reconcile. + * @param currentPartitionEpoch The function returning the current epoch of + * a given partition. + * @param targetAssignmentEpoch The target assignment epoch. + * @param targetAssignment The target assignment. + * @param ownedTopicPartitions The list of partitions owned by the member. This + * is reported in the ConsumerGroupHeartbeat API and + * it could be null if not provided. + * @param records The list to accumulate any new records. + * @return The received member if no changes have been made; or a new + * member containing the new assignment. + */ +private ConsumerGroupMember maybeReconcile( +String groupId, +ConsumerGroupMember member, +BiFunction currentPartitionEpoch, +int targetAssignmentEpoch, +Assignment targetAssignment, +List ownedTopicPartitions, +List records +) { +if (member.isReconciledTo(targetAssignmentEpoch)) { +return member; +} + +ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) +.withTargetAssignment(targetAssignmentEpoch, targetAssignment) +.withCurrentPartitionEpoch(currentPartitionEpoch) +.withOwnedTopicPartitions(ownedTopicPartitions) +.build(); + +if (!updatedMember.equals(member)) { +records.add(newCurrentAssignmentRecord(groupId, updatedMember)); + +log.info("[GroupId {}] Member {} new assignment state: epoch={}, previousEpoch={}, state={}, " + + "assignedPartitions={} and revokedPartitions={}.", +groupId, updatedMember.memberId(), updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), updatedMember.state(), +formatAssignment(updatedMember.assignedPartitions()), formatAssignment(updatedMember.revokedPartitions())); + +if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) { +scheduleConsumerGroupRebalanceTimeout( +groupId, +updatedMember.memberId(), +updatedMember.memberEpoch(), +updatedMember.rebalanceTimeoutMs() +); +} else { Review Comment: The rebalance is not complete here because the member has to reach Stable for this. However, we only have a timeout on the revocation part. We basically want to member to not block revoked partitions forever. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
dajac commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1492559984 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1211,13 +1192,99 @@ private CoordinatorResult consumerGr // 1. The member reported its owned partitions; // 2. The member just joined or rejoined to group (epoch equals to zero); // 3. The member's assignment has been updated. -if (ownedTopicPartitions != null || memberEpoch == 0 || assignmentUpdated) { +if (ownedTopicPartitions != null || memberEpoch == 0 || hasAssignedPartitionsChanged(member, updatedMember)) { response.setAssignment(createResponseAssignment(updatedMember)); } return new CoordinatorResult<>(records, response); } +/** + * Reconciles the current assignment of the member if needed. + * + * @param groupId The group id. + * @param memberThe member to reconcile. + * @param currentPartitionEpoch The function returning the current epoch of + * a given partition. + * @param targetAssignmentEpoch The target assignment epoch. + * @param targetAssignment The target assignment. + * @param ownedTopicPartitions The list of partitions owned by the member. This + * is reported in the ConsumerGroupHeartbeat API and + * it could be null if not provided. + * @param records The list to accumulate any new records. + * @return The received member if no changes have been made; or a new + * member containing the new assignment. + */ +private ConsumerGroupMember maybeReconcile( +String groupId, +ConsumerGroupMember member, +BiFunction currentPartitionEpoch, +int targetAssignmentEpoch, +Assignment targetAssignment, +List ownedTopicPartitions, +List records +) { +if (member.isReconciledTo(targetAssignmentEpoch)) { +return member; +} + +ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) +.withTargetAssignment(targetAssignmentEpoch, targetAssignment) +.withCurrentPartitionEpoch(currentPartitionEpoch) +.withOwnedTopicPartitions(ownedTopicPartitions) +.build(); + +if (!updatedMember.equals(member)) { +records.add(newCurrentAssignmentRecord(groupId, updatedMember)); + +log.info("[GroupId {}] Member {} new assignment state: epoch={}, previousEpoch={}, state={}, " + + "assignedPartitions={} and revokedPartitions={}.", +groupId, updatedMember.memberId(), updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), updatedMember.state(), +formatAssignment(updatedMember.assignedPartitions()), formatAssignment(updatedMember.revokedPartitions())); + +if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) { +scheduleConsumerGroupRebalanceTimeout( +groupId, +updatedMember.memberId(), +updatedMember.memberEpoch(), +updatedMember.rebalanceTimeoutMs() +); +} else { +cancelConsumerGroupRebalanceTimeout(groupId, updatedMember.memberId()); +} +} + +return updatedMember; +} + +private String formatAssignment( Review Comment: Would replacing `-` by `:` help? Or do you have another option in mind? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
dajac commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1492557819 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1211,13 +1192,99 @@ private CoordinatorResult consumerGr // 1. The member reported its owned partitions; // 2. The member just joined or rejoined to group (epoch equals to zero); // 3. The member's assignment has been updated. -if (ownedTopicPartitions != null || memberEpoch == 0 || assignmentUpdated) { +if (ownedTopicPartitions != null || memberEpoch == 0 || hasAssignedPartitionsChanged(member, updatedMember)) { response.setAssignment(createResponseAssignment(updatedMember)); } return new CoordinatorResult<>(records, response); } +/** + * Reconciles the current assignment of the member if needed. + * + * @param groupId The group id. + * @param memberThe member to reconcile. + * @param currentPartitionEpoch The function returning the current epoch of + * a given partition. + * @param targetAssignmentEpoch The target assignment epoch. + * @param targetAssignment The target assignment. + * @param ownedTopicPartitions The list of partitions owned by the member. This + * is reported in the ConsumerGroupHeartbeat API and + * it could be null if not provided. + * @param records The list to accumulate any new records. + * @return The received member if no changes have been made; or a new + * member containing the new assignment. + */ +private ConsumerGroupMember maybeReconcile( +String groupId, +ConsumerGroupMember member, +BiFunction currentPartitionEpoch, +int targetAssignmentEpoch, +Assignment targetAssignment, +List ownedTopicPartitions, +List records +) { +if (member.isReconciledTo(targetAssignmentEpoch)) { +return member; +} + +ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) +.withTargetAssignment(targetAssignmentEpoch, targetAssignment) +.withCurrentPartitionEpoch(currentPartitionEpoch) +.withOwnedTopicPartitions(ownedTopicPartitions) +.build(); + +if (!updatedMember.equals(member)) { Review Comment: The reference check only works if the `CurrentAssignmentBuilder` guarantees that a new instance is returned when there is a change. At the moment, the contract is weak here so it may be better to use `equals` here. An alternative would be to delegate this decision to `CurrentAssignmentBuilder`. For instance, the builder could return an `Optional`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
dajac commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1492554402 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1211,13 +1192,99 @@ private CoordinatorResult consumerGr // 1. The member reported its owned partitions; // 2. The member just joined or rejoined to group (epoch equals to zero); // 3. The member's assignment has been updated. -if (ownedTopicPartitions != null || memberEpoch == 0 || assignmentUpdated) { +if (ownedTopicPartitions != null || memberEpoch == 0 || hasAssignedPartitionsChanged(member, updatedMember)) { Review Comment: My understanding is that `equals` starts by verifying the reference. Hence, we really compare the two maps if the assignment of the new member has been updated. This should not be an issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16009: Fix PlaintextConsumerTest. testMaxPollIntervalMsDelayInRevocation [kafka]
lucasbru commented on code in PR #15383: URL: https://github.com/apache/kafka/pull/15383#discussion_r1492547506 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -384,7 +387,7 @@ public void testWakeupAfterEmptyFetch() { doAnswer(invocation -> { consumer.wakeup(); return Fetch.empty(); -}).when(fetchCollector).collectFetch(any(FetchBuffer.class)); +}).doAnswer(invocation -> Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); Review Comment: It seems that this was just a bit of a lazy test setup. We only want to wake up the consumer once for this test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16009: Fix PlaintextConsumerTest. testMaxPollIntervalMsDelayInRevocation [kafka]
lucasbru commented on PR #15383: URL: https://github.com/apache/kafka/pull/15383#issuecomment-1948483411 @cadonna Could you please have a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16009: Fix PlaintextConsumerTest. testMaxPollIntervalMsDelayInRevocation [kafka]
lucasbru opened a new pull request, #15383: URL: https://github.com/apache/kafka/pull/15383 The wake-up mechanism in the new consumer is preventing from committing within a rebalance listener callback. The reason is that we are trying to register two wake-uppable actions at the same time. The fix is to register the wake-uppable action more closely to where we are in fact blocking on it, so that the action is not registered when we execute rebalance listeneners and callback listeners. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]
Phuc-Hong-Tran commented on code in PR #15188: URL: https://github.com/apache/kafka/pull/15188#discussion_r1492453933 ## clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json: ## @@ -35,6 +35,8 @@ "about": "-1 if it didn't change since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise." }, { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "topicName", "about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." }, +{ "name": "SubscribedTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "null if it didn't change since the last heartbeat; the subscribed topic regex otherwise" }, Review Comment: I understand -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]
dajac commented on PR #15188: URL: https://github.com/apache/kafka/pull/15188#issuecomment-1948347606 I was wondering whether we should introduce a new error code to signal to the user that the regular expression is invalid. Otherwise, we would have to use the invalid request exception and it does not seem user friendly. @cadonna @lianetm What do you think about this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]
dajac commented on code in PR #15188: URL: https://github.com/apache/kafka/pull/15188#discussion_r1492422212 ## clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json: ## @@ -35,6 +35,8 @@ "about": "-1 if it didn't change since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise." }, { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "topicName", "about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." }, +{ "name": "SubscribedTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "null if it didn't change since the last heartbeat; the subscribed topic regex otherwise" }, Review Comment: This is incorrect. We cannot add a field to a released version. I would suggest to bump the version of the RPC and use the correct version here. We should also mark the last version as unstable with `latestVersionUnstable` set to `true`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]
dajac commented on PR #15188: URL: https://github.com/apache/kafka/pull/15188#issuecomment-1948333168 > > @cadonna @lianetm, since we're supporting for both subscribe method using java.util.regex.Pattern and SubscriptionPattern, I think we should throw a illegal heartbeat exeption when user try to use both method at the same time and inform the user to use once at a time, since the field SubscribedRegex is used for java.util.regex.Pattern as well as SubscriptionPattern. What do you guys think? > > IMO, we must support the deprecated pattern subscriptions with `java.util.regex.Pattern` to ensure backwards compatibility, but we do not need to support mixed usage of `java.util.regex.Pattern` and Google regex patterns. I think this is a blind spot in the KIP. I propose to throw an `IllegalStateException` if `subscribe(java.util.regex.Pattern)` is called after `subscribe(SubscriptionPattern)` (and vice versa) without calling `unsubscribe()` in between. That is similar to the restrictions between pattern, topic, and partition subscriptions @lianetm linked above. I do not think it is worth to consider the edge case of mixed usage of the two pattern types. Does this make sense to you? \cc @dajac What do you as the original author of the KIP think? Should we update the KIP to make this clear? @cadonna I would rather follow what we already do with `subscribe` today. The last one called takes precedence. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-6579) Consolidate window store and session store unit tests into a single class
[ https://issues.apache.org/jira/browse/KAFKA-6579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ahmed Sobeh reassigned KAFKA-6579: -- Assignee: (was: Ahmed Sobeh) > Consolidate window store and session store unit tests into a single class > - > > Key: KAFKA-6579 > URL: https://issues.apache.org/jira/browse/KAFKA-6579 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: newbie, unit-test > > For key value store, we have a {{AbstractKeyValueStoreTest}} that is shared > among all its implementations; however for window and session stores, each > class has its own independent unit test classes that do not share the test > coverage. In fact, many of these test classes share the same unit test > functions (e.g. {{RocksDBWindowStoreTest}}, > {{CompositeReadOnlyWindowStoreTest}} and {{CachingWindowStoreTest}}). > It is better to use the same pattern as for key value stores to consolidate > these test functions into a shared base class. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15770: IQv2 must return immutable position [kafka]
lucasbru commented on code in PR #15219: URL: https://github.com/apache/kafka/pull/15219#discussion_r1492366369 ## streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java: ## @@ -193,17 +194,22 @@ public void put(final Bytes rawBaseKey, expiredRecordSensor.record(1.0d, context.currentSystemTimeMs()); LOG.warn("Skipping record for expired segment."); } else { -StoreQueryUtils.updatePosition(position, stateStoreContext); - -// Put to index first so that if put to base failed, when we iterate index, we will -// find no base value. If put to base first but putting to index fails, when we iterate -// index, we can't find the key but if we iterate over base store, we can find the key -// which lead to inconsistency. -if (hasIndex()) { -final KeyValue indexKeyValue = getIndexKeyValue(rawBaseKey, value); -segment.put(indexKeyValue.key, indexKeyValue.value); +position.lock(); Review Comment: Have you considered doing the same without `SynchronizedPosition` and instead just using the object monitor of position ``` synchronized(position) { } ``` ? ## streams/src/test/java/org/apache/kafka/streams/state/internals/StoreQueryUtilsTest.java: ## @@ -70,7 +71,7 @@ public void shouldReturnErrorOnBoundViolation() { PositionBound.at(Position.emptyPosition().withComponent("topic", 0, 1)), new QueryConfig(false), store, -Position.emptyPosition().withComponent("topic", 0, 0), +(SynchronizedPosition) Position.emptyPosition().withComponent("topic", 0, 0), Review Comment: That cast won't work ## streams/src/main/java/org/apache/kafka/streams/query/internals/SynchronizedPosition.java: ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query.internals; + +import org.apache.kafka.streams.query.Position; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; + +public class SynchronizedPosition extends Position { +private final ReentrantLock lock = new ReentrantLock(); + +public SynchronizedPosition(final ConcurrentHashMap> position) { +super(position); +} + +public static SynchronizedPosition emptyPosition() { +return new SynchronizedPosition(new ConcurrentHashMap<>()); +} + +public static SynchronizedPosition fromMap(final Map> map) { +return new SynchronizedPosition(deepCopy(map)); +} + +public static ConcurrentHashMap> deepCopy( Review Comment: Why did you move this static method? Since it's also accessed by `Position`, I'd rather leave it there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Improve ListConsumerGroupTest.testListGroupCommand [kafka]
nizhikov commented on PR #15382: URL: https://github.com/apache/kafka/pull/15382#issuecomment-1948181817 Hello, @dajac I will take a look, shortly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Added ACLs authorizer change during migration [kafka]
showuon merged PR #15333: URL: https://github.com/apache/kafka/pull/15333 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15670: add "inter.broker.listener.name" config in KRaft controller config [kafka]
showuon merged PR #14631: URL: https://github.com/apache/kafka/pull/14631 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]
dajac commented on code in PR #15150: URL: https://github.com/apache/kafka/pull/15150#discussion_r1492178089 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java: ## @@ -20,78 +20,225 @@ import kafka.admin.ConsumerGroupCommand; import org.apache.kafka.clients.admin.ConsumerGroupListing; import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.GroupType; import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.params.provider.MethodSource; import java.util.Arrays; import java.util.Collections; +import java.util.EnumSet; +import java.util.HashSet; import java.util.Objects; import java.util.Optional; +import java.util.Properties; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; public class ListConsumerGroupTest extends ConsumerGroupCommandTest { -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testListConsumerGroups(String quorum) throws Exception { +@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) +@MethodSource("getTestQuorumAndGroupProtocolParametersAll") +public void testListConsumerGroupsWithoutFilters(String quorum, String groupProtocol) throws Exception { String simpleGroup = "simple-group"; + +createOffsetsTopic(listenerName(), new Properties()); + addSimpleGroupExecutor(simpleGroup); addConsumerGroupExecutor(1); +addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol); String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list"}; ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); -scala.collection.Set expectedGroups = set(Arrays.asList(GROUP, simpleGroup)); + +scala.collection.Set expectedGroups = set(Arrays.asList(GROUP, simpleGroup, PROTOCOL_GROUP)); final AtomicReference foundGroups = new AtomicReference<>(); + TestUtils.waitForCondition(() -> { foundGroups.set(service.listConsumerGroups().toSet()); return Objects.equals(expectedGroups, foundGroups.get()); }, "Expected --list to show groups " + expectedGroups + ", but found " + foundGroups.get() + "."); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testListWithUnrecognizedNewConsumerOption() { +@Test +public void testListWithUnrecognizedNewConsumerOption() throws Exception { String[] cgcArgs = new String[]{"--new-consumer", "--bootstrap-server", bootstrapServers(listenerName()), "--list"}; assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testListConsumerGroupsWithStates() throws Exception { +@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) +@MethodSource("getTestQuorumAndGroupProtocolParametersAll") +public void testListConsumerGroupsWithStates(String quorum, String groupProtocol) throws Exception { Review Comment: `groupProtocol` is never used in this test and we run it with all consumer types. I suppose that we either want to use `groupProtocol` or only run it with the classic type. If we do the latter, do we have a test listing new consumer groups too? ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java: ## @@ -143,22 +312,134 @@ public void testListGroupCommand(String quorum) throws Exception { return out.get().contains("STATE") && out.get().contains(simpleGroup) && out.get().contains(GROUP); }, "Expected to find " + simpleGroup + ", " + GROUP + " and the header, but found " + out.get()); -String[] cgcArgs3 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "Stable"}; +String[] cgcArgs3 = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--type"}; TestUtils.waitForCondition(() -> { out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> { ConsumerGroupCommand.main(cgcArgs3); return null; })); +return out.get().contains("TYPE") && !out.get().contains("STATE") && out.get().contains(simpleGroup) && out.get().contains(GROUP); +}, "Expected to find " + simpleGroup + ", " + GROUP + " and the header, but found " + out.get()); + +String[]
Re: [PR] KAFKA-16230: Update verifiable_consumer.py to support KIP-848’s group protocol config [kafka]
lucasbru merged PR #15328: URL: https://github.com/apache/kafka/pull/15328 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Improve ListConsumerGroupTest.testListGroupCommand [kafka]
dajac opened a new pull request, #15382: URL: https://github.com/apache/kafka/pull/15382 While reviewing https://github.com/apache/kafka/pull/15150, I found that our tests verifying the console output are really hard to read. Here is my proposal to make it better. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Improve ListConsumerGroupTest.testListGroupCommand [kafka]
dajac commented on PR #15382: URL: https://github.com/apache/kafka/pull/15382#issuecomment-1948168236 @mimaison @nizhikov @rreddy-22 @jolshan Could you take a look when you get a chance? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on PR #15323: URL: https://github.com/apache/kafka/pull/15323#issuecomment-1948142511 @hachikuji thanks for merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16165: Fix invalid transition on poll timer expiration [kafka]
lucasbru merged PR #15375: URL: https://github.com/apache/kafka/pull/15375 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16243: Make sure that we do not exceed max poll interval inside poll [kafka]
lucasbru commented on code in PR #15372: URL: https://github.com/apache/kafka/pull/15372#discussion_r1492266247 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -265,6 +265,32 @@ class PlaintextConsumerTest extends BaseConsumerTest { ensureNoRebalance(consumer, listener) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testMaxPollIntervalMsShorterThanPollTimeout(quorum: String, groupProtocol: String): Unit = { + this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000.toString) + this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString) + +val consumer = createConsumer() +val listener = new TestConsumerReassignmentListener +consumer.subscribe(List(topic).asJava, listener) + +// rebalance to get the initial assignment +awaitRebalance(consumer, listener) + +val initialAssignedCalls = listener.callsToAssigned + +consumer.poll(Duration.ofMillis(2000)); Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16264) Expose `producer.id.expiration.check.interval.ms` as dynamic broker configuration
[ https://issues.apache.org/jira/browse/KAFKA-16264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817867#comment-17817867 ] Jorge Esteban Quilcate Otoya commented on KAFKA-16264: -- cc [~jolshan] – as related to https://issues.apache.org/jira/browse/KAFKA-16229 > Expose `producer.id.expiration.check.interval.ms` as dynamic broker > configuration > - > > Key: KAFKA-16264 > URL: https://issues.apache.org/jira/browse/KAFKA-16264 > Project: Kafka > Issue Type: Improvement >Reporter: Jorge Esteban Quilcate Otoya >Assignee: Jorge Esteban Quilcate Otoya >Priority: Major > > Dealing with a scenario where too many producer ids lead to issues (e.g. high > cpu utilization, see KAFKA-16229) put operators in need to flush producer ids > more promptly than usual. > Currently, only the expiration timeout `producer.id.expiration.ms` is exposed > as dynamic config. This is helpful (e.g. by reducing the timeout, less > producer would eventually be kept in memory), but not enough if the > evaluation frequency is not sufficiently short to flush producer ids before > becoming an issue. Only by tuning both, the issue could be workaround. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16264) Expose `producer.id.expiration.check.interval.ms` as dynamic broker configuration
Jorge Esteban Quilcate Otoya created KAFKA-16264: Summary: Expose `producer.id.expiration.check.interval.ms` as dynamic broker configuration Key: KAFKA-16264 URL: https://issues.apache.org/jira/browse/KAFKA-16264 Project: Kafka Issue Type: Improvement Reporter: Jorge Esteban Quilcate Otoya Assignee: Jorge Esteban Quilcate Otoya Dealing with a scenario where too many producer ids lead to issues (e.g. high cpu utilization, see KAFKA-16229) put operators in need to flush producer ids more promptly than usual. Currently, only the expiration timeout `producer.id.expiration.ms` is exposed as dynamic config. This is helpful (e.g. by reducing the timeout, less producer would eventually be kept in memory), but not enough if the evaluation frequency is not sufficiently short to flush producer ids before becoming an issue. Only by tuning both, the issue could be workaround. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1492090658 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -609,17 +842,18 @@ public void testOrdering() { inputTopic1.pipeInput(1, "A1", 100L); processor.checkAndClearProcessResult(); -// push one item to the other window that has a join; this should produce non-joined records with a closed window first, then -// the joined records +// push one item to the other window that has a join; +// this should produce the joined record first; +// then non-joined record with a closed window // by the time they were produced before // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // w2 = { } // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // --> w2 = { 1:a1 (ts: 110) } inputTopic2.pipeInput(1, "a1", 110L); processor.checkAndClearProcessResult( -new KeyValueTimestamp<>(0, "A0+null", 0L), -new KeyValueTimestamp<>(1, "A1+a1", 110L) +new KeyValueTimestamp<>(1, "A1+a1", 110L), +new KeyValueTimestamp<>(0, "A0+null", 0L) Review Comment: When we change the order in which we check back to as it was, only the ordering tests will fail. So there seems to be no bug (anymore). I think that in a previous version of this PR (before checking both the left and right in the outerjoin) we saw that it did matter and a left-join test failed. But this has been solved now with the getOuterJoinLookBackTimeMs(). I think option 2 and option 3 will do fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16167: re-enable PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup [kafka]
lucasbru merged PR #15358: URL: https://github.com/apache/kafka/pull/15358 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org