[jira] [Updated] (KAFKA-15143) MockFixedKeyProcessorContext is missing from test-utils

2024-02-16 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15143:

Labels: needs-kip  (was: )

> MockFixedKeyProcessorContext is missing from test-utils
> ---
>
> Key: KAFKA-15143
> URL: https://issues.apache.org/jira/browse/KAFKA-15143
> Project: Kafka
>  Issue Type: Bug
>  Components: streams-test-utils
>Affects Versions: 3.5.0
>Reporter: Tomasz Kaszuba
>Assignee: Shashwat Pandey
>Priority: Major
>  Labels: needs-kip
>
> I am trying to test a ContextualFixedKeyProcessor but it is not possible to 
> call the init method from within a unit test since the MockProcessorContext 
> doesn't implement  
> {code:java}
> FixedKeyProcessorContext {code}
> but only
> {code:java}
> ProcessorContext
> {code}
> Shouldn't there also be a *MockFixedKeyProcessorContext* in the test utils?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15143) MockFixedKeyProcessorContext is missing from test-utils

2024-02-16 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-15143:
---

Assignee: Shashwat Pandey

> MockFixedKeyProcessorContext is missing from test-utils
> ---
>
> Key: KAFKA-15143
> URL: https://issues.apache.org/jira/browse/KAFKA-15143
> Project: Kafka
>  Issue Type: Bug
>  Components: streams-test-utils
>Affects Versions: 3.5.0
>Reporter: Tomasz Kaszuba
>Assignee: Shashwat Pandey
>Priority: Major
>
> I am trying to test a ContextualFixedKeyProcessor but it is not possible to 
> call the init method from within a unit test since the MockProcessorContext 
> doesn't implement  
> {code:java}
> FixedKeyProcessorContext {code}
> but only
> {code:java}
> ProcessorContext
> {code}
> Shouldn't there also be a *MockFixedKeyProcessorContext* in the test utils?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-02-16 Thread via GitHub


florin-akermann commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1493075382


##
streams/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java:
##
@@ -71,11 +71,21 @@ void afterEach() {
 @Test
 void testRelaxedLeftStreamStreamJoin() {
 leftStream
-.leftJoin(rightStream, JOINER, WINDOW)
+.leftJoin(rightStream, JOINER, WINDOW_60MS_GRACE_10MS)
 .to(OUT);
 initTopology();
-left.pipeInput(null, "leftValue", 1);
-assertEquals(Collections.singletonList(new KeyValue<>(null, 
"leftValue|null")), out.readKeyValuesToList());
+left.pipeInput(null, "leftValue1", 1);
+left.pipeInput(null, "leftValue2", 90);
+left.pipeInput(null, "lateArrival-Dropped", 19);
+left.pipeInput(null, "lateArrivalWithinGrace", 20);
+assertEquals(
+Arrays.asList(
+new KeyValue<>(null, "leftValue1|null"),
+new KeyValue<>(null, "leftValue2|null"),
+new KeyValue<>(null, "lateArrivalWithinGrace|null")
+),
+out.readKeyValuesToList()
+);
 }
 
 @Test

Review Comment:
   Yes, I will check.
   
   Similarily, I realized `KStreamKStreamSelfJoin` shoud probably also drop 
'too late' records? I guess this would also be  a separate Jira ticket & PR?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-02-16 Thread via GitHub


florin-akermann commented on PR #15189:
URL: https://github.com/apache/kafka/pull/15189#issuecomment-1949473525

   > Thanks for the fix! Overall LGTM. Couple of comments.
   
   @mjsax Thank you for all the good points. I agree with all of them. However, 
first I would like to align on whether we really want to extend the window 
bound tests to assert the correct grace period behavior. See my reply on 'Why + 
1'.
   
   Personally, I would say I just write separate TestClasses/Cases to assert 
the grace period behavior (including sensor checks).
   
   Plus, as you suggested, the window bound tests will just be updated with 
large grace periods (E.g. Long.MAX - some constant).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Improve ListConsumerGroupTest.testListGroupCommand [kafka]

2024-02-16 Thread via GitHub


jolshan commented on code in PR #15382:
URL: https://github.com/apache/kafka/pull/15382#discussion_r1492801866


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java:
##
@@ -123,42 +128,89 @@ public void testListGroupCommand(String quorum) throws 
Exception {
 String simpleGroup = "simple-group";
 addSimpleGroupExecutor(simpleGroup);
 addConsumerGroupExecutor(1);
-final AtomicReference out = new AtomicReference<>("");
 
-String[] cgcArgs1 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list"};
-TestUtils.waitForCondition(() -> {
-out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
-ConsumerGroupCommand.main(cgcArgs1);
-return null;
-}));
-return !out.get().contains("STATE") && 
out.get().contains(simpleGroup) && out.get().contains(GROUP);
-}, "Expected to find " + simpleGroup + ", " + GROUP + " and no header, 
but found " + out.get());
-
-String[] cgcArgs2 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state"};
-TestUtils.waitForCondition(() -> {
-out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
-ConsumerGroupCommand.main(cgcArgs2);
-return null;
-}));
-return out.get().contains("STATE") && 
out.get().contains(simpleGroup) && out.get().contains(GROUP);
-}, "Expected to find " + simpleGroup + ", " + GROUP + " and the 
header, but found " + out.get());
-
-String[] cgcArgs3 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "Stable"};
-TestUtils.waitForCondition(() -> {
-out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
-ConsumerGroupCommand.main(cgcArgs3);
-return null;
-}));
-return out.get().contains("STATE") && out.get().contains(GROUP) && 
out.get().contains("Stable");
-}, "Expected to find " + GROUP + " in state Stable and the header, but 
found " + out.get());
-
-String[] cgcArgs4 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "stable"};
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list"),
+Collections.emptyList(),
+mkSet(
+Collections.singletonList(GROUP),
+Collections.singletonList(simpleGroup)
+)
+);
+
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state"),
+Arrays.asList("GROUP", "STATE"),
+mkSet(
+Arrays.asList(GROUP, "Stable"),
+Arrays.asList(simpleGroup, "Empty")
+)
+);
+
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "Stable"),
+Arrays.asList("GROUP", "STATE"),
+mkSet(
+Arrays.asList(GROUP, "Stable")
+)
+);
+
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "stable"),
+Arrays.asList("GROUP", "STATE"),
+mkSet(
+Arrays.asList(GROUP, "Stable")
+)
+);
+}
+
+/**
+ * Validates that the output of the list command corresponds to the 
expected values.
+ *
+ * @param args  The arguments for the command line tool.
+ * @param expectedHeaderThe expected header as a list of strings; or 
an empty list
+ *  if a header is not expected.
+ * @param expectedGroupsThe expected groups as a set of list of 
strings. The list
+ *  of strings corresponds to the columns.
+ * @throws InterruptedException
+ */
+private static void validateListOutput(

Review Comment:
   The method itself does the asserts, so I think it is ok to be void.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-02-16 Thread via GitHub


florin-akermann commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1493064669


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##
@@ -816,10 +816,12 @@ public void testWindowing() {
 stream1 = builder.stream(topic1, consumed);
 stream2 = builder.stream(topic2, consumed);
 
+final Duration timeDifference = ofMillis(100L);
+final Duration grace = ofMillis(104);

Review Comment:
   On second thought, note my coment below on 'Why + 1 '. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-02-16 Thread via GitHub


florin-akermann commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1493064669


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##
@@ -816,10 +816,12 @@ public void testWindowing() {
 stream1 = builder.stream(topic1, consumed);
 stream2 = builder.stream(topic2, consumed);
 
+final Duration timeDifference = ofMillis(100L);
+final Duration grace = ofMillis(104);

Review Comment:
   On second thought, note my coment on 'Why + 1 '. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-16 Thread via GitHub


jolshan commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1493062297


##
clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java:
##
@@ -61,6 +62,11 @@ public ListTransactionsOptions 
filterProducerIds(Collection producerIdFilt
 return this;
 }
 
+public ListTransactionsOptions durationFilter(Long durationMs) {

Review Comment:
   nit: these can all be primitive longs. (lowercase) There is usage in this 
file and the Admin file



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-02-16 Thread via GitHub


florin-akermann commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1493061626


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##
@@ -1363,6 +1365,16 @@ public void testWindowing() {
 new KeyValueTimestamp<>(2, "L2+l2", 2002L),
 new KeyValueTimestamp<>(3, "L3+l3", 2003L)
 );
+
+//push two items with timestamp at grace edge; this should produce 
one join item, M0 is 'too late'
+final long currentStreamTime = 2104;
+final long lowerBound = currentStreamTime - 
timeDifference.toMillis() - grace.toMillis();
+inputTopic1.pipeInput(0, "M0", lowerBound - 1);
+inputTopic1.pipeInput(1, "M1", lowerBound + 1);

Review Comment:
   I can see now that the naming is misleading.
   
   The lowerbound is with regards to the grace period, 1900
   However the lowerbound of the winow `1:l1` is 1901
   
   So the +1 was there to make sure it is still within the window.
   
   In general I start to wonder whether it wouldn't make more sense to test 
these two concerns (grace & windowing) separatley. E.g. with grace 150, `M0` is 
just a test case to assert that late records get dropped and `M1` is just 
another window bound test. With grace 104 we get the 'grace bound' and the 'l0 
lower window bound' to overlap but it might be confusing. In other words, as 
you said 'this test aims to test window bounds'. So maybe I should move grace 
period tests into a separate test class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-16 Thread via GitHub


yyu1993 commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1493050657


##
clients/src/main/resources/common/message/ListTransactionsResponse.json:
##
@@ -17,7 +17,8 @@
   "apiKey": 66,
   "type": "response",
   "name": "ListTransactionsResponse",
-  "validVersions": "0",
+  // Version 1 is the same as vesion 0 (KIP-994).

Review Comment:
   Nice catch. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-02-16 Thread via GitHub


rreddy-22 commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1493044453


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java:
##
@@ -20,78 +20,225 @@
 import kafka.admin.ConsumerGroupCommand;
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
 import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.GroupType;
 import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static 
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class ListConsumerGroupTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testListConsumerGroups(String quorum) throws Exception {
+@ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+@MethodSource("getTestQuorumAndGroupProtocolParametersAll")
+public void testListConsumerGroupsWithoutFilters(String quorum, String 
groupProtocol) throws Exception {
 String simpleGroup = "simple-group";
+
+createOffsetsTopic(listenerName(), new Properties());
+
 addSimpleGroupExecutor(simpleGroup);
 addConsumerGroupExecutor(1);
+addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol);
 
 String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list"};
 ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-scala.collection.Set expectedGroups = set(Arrays.asList(GROUP, 
simpleGroup));
+
+scala.collection.Set expectedGroups = set(Arrays.asList(GROUP, 
simpleGroup, PROTOCOL_GROUP));
 final AtomicReference foundGroups = new 
AtomicReference<>();
+
 TestUtils.waitForCondition(() -> {
 foundGroups.set(service.listConsumerGroups().toSet());
 return Objects.equals(expectedGroups, foundGroups.get());
 }, "Expected --list to show groups " + expectedGroups + ", but found " 
+ foundGroups.get() + ".");
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testListWithUnrecognizedNewConsumerOption() {
+@Test
+public void testListWithUnrecognizedNewConsumerOption() throws Exception {
 String[] cgcArgs = new String[]{"--new-consumer", 
"--bootstrap-server", bootstrapServers(listenerName()), "--list"};
 assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testListConsumerGroupsWithStates() throws Exception {
+@ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+@MethodSource("getTestQuorumAndGroupProtocolParametersAll")
+public void testListConsumerGroupsWithStates(String quorum, String 
groupProtocol) throws Exception {

Review Comment:
   okay thanks, I created two tests, we can't change the assertions without if 
statements and we didn't want those right
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-16 Thread via GitHub


yyu1993 commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1493041438


##
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java:
##
@@ -1633,7 +1633,8 @@ default ListTransactionsResult listTransactions() {
  * coordinators in the cluster and collect the state of all transactions. 
Users
  * should typically attempt to reduce the size of the result set using
  * {@link ListTransactionsOptions#filterProducerIds(Collection)} or
- * {@link ListTransactionsOptions#filterStates(Collection)}
+ * {@link ListTransactionsOptions#filterStates(Collection)} or

Review Comment:
   The ListTransactions api itself is able to filter by producer ids and states 
(see `handleListTransactions` in TransactionCoordinator). However, the cli tool 
does not support those filters currently. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-16 Thread via GitHub


jolshan commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1493039266


##
clients/src/main/resources/common/message/ListTransactionsRequest.json:
##
@@ -18,14 +18,18 @@
   "type": "request",
   "listeners": ["zkBroker", "broker"],
   "name": "ListTransactionsRequest",
-  "validVersions": "0",
+  // version 1: adds DurationFilter to list transactions older than specified 
duration

Review Comment:
   nit: most specs capitalize V here.



##
clients/src/main/resources/common/message/ListTransactionsResponse.json:
##
@@ -17,7 +17,8 @@
   "apiKey": 66,
   "type": "response",
   "name": "ListTransactionsResponse",
-  "validVersions": "0",
+  // Version 1 is the same as vesion 0 (KIP-994).

Review Comment:
   nit: version



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-16 Thread via GitHub


jolshan commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1493024798


##
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java:
##
@@ -1633,7 +1633,8 @@ default ListTransactionsResult listTransactions() {
  * coordinators in the cluster and collect the state of all transactions. 
Users
  * should typically attempt to reduce the size of the result set using
  * {@link ListTransactionsOptions#filterProducerIds(Collection)} or
- * {@link ListTransactionsOptions#filterStates(Collection)}
+ * {@link ListTransactionsOptions#filterStates(Collection)} or

Review Comment:
   Maybe a silly question, but I noticed this comment mentions filtering by 
producer ID or state. Do you know where that is done? It looks like 
durationFilter is the only subparser argument for list in TransactionCommand



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-02-16 Thread via GitHub


florin-akermann commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1493006495


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##
@@ -816,10 +816,12 @@ public void testWindowing() {
 stream1 = builder.stream(topic1, consumed);
 stream2 = builder.stream(topic2, consumed);
 
+final Duration timeDifference = ofMillis(100L);
+final Duration grace = ofMillis(104);

Review Comment:
   Good idea with the sensor.
   
   Just for me to understand, is 150 an arbitrarily chosen value or how did you 
come up with it?
   Wouldn't it make sense to set it to at least (max_timestamp in test case - 
min_timestamp in test case) e.g. 1104 - 899?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-02-16 Thread via GitHub


florin-akermann commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1493006495


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##
@@ -816,10 +816,12 @@ public void testWindowing() {
 stream1 = builder.stream(topic1, consumed);
 stream2 = builder.stream(topic2, consumed);
 
+final Duration timeDifference = ofMillis(100L);
+final Duration grace = ofMillis(104);

Review Comment:
   Good idea with the sensor.
   
   Just for me to understand, is 150 an arbitrarily chosen value or how did you 
come up with it?
   Wouldn't it make sense to set it to (max_timestamp in test case - 
min_timestamp in test case) e.g. 1104 - 899?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-02-16 Thread via GitHub


florin-akermann commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1493006495


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##
@@ -816,10 +816,12 @@ public void testWindowing() {
 stream1 = builder.stream(topic1, consumed);
 stream2 = builder.stream(topic2, consumed);
 
+final Duration timeDifference = ofMillis(100L);
+final Duration grace = ofMillis(104);

Review Comment:
   Good idea with the sensor.
   
   Just for me to understand, is 150 an arbitrarily chosen value or how did you 
come up with it?
   Wouldn't it make sense to set it to (max_timestamp in test case - 
min_timestamp in test case) e.g. 1104 - 899
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16202: Extra dot in error message in producer [kafka]

2024-02-16 Thread via GitHub


infantlikesprogramming commented on code in PR #15296:
URL: https://github.com/apache/kafka/pull/15296#discussion_r1492998071


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -704,7 +704,7 @@ private void completeBatch(ProducerBatch batch, 
ProduceResponse.PartitionRespons
 "topic-partition may not exist or the user may not 
have Describe access to it",
 batch.topicPartition);
 } else {
-log.warn("Received invalid metadata error in produce 
request on partition {} due to {}. Going " +
+log.warn("Received invalid metadata error in produce 
request on partition {} due to {} Going " +
 "to request metadata update now", 
batch.topicPartition,

Review Comment:
   This makes a lot of sense. Thank you so much @appchemist. It took me a while 
to understand the code. I will fix this in my next commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Improve ListConsumerGroupTest.testListGroupCommand [kafka]

2024-02-16 Thread via GitHub


dajac commented on code in PR #15382:
URL: https://github.com/apache/kafka/pull/15382#discussion_r1492992453


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java:
##
@@ -123,42 +128,89 @@ public void testListGroupCommand(String quorum) throws 
Exception {
 String simpleGroup = "simple-group";
 addSimpleGroupExecutor(simpleGroup);
 addConsumerGroupExecutor(1);
-final AtomicReference out = new AtomicReference<>("");
 
-String[] cgcArgs1 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list"};
-TestUtils.waitForCondition(() -> {
-out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
-ConsumerGroupCommand.main(cgcArgs1);
-return null;
-}));
-return !out.get().contains("STATE") && 
out.get().contains(simpleGroup) && out.get().contains(GROUP);
-}, "Expected to find " + simpleGroup + ", " + GROUP + " and no header, 
but found " + out.get());
-
-String[] cgcArgs2 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state"};
-TestUtils.waitForCondition(() -> {
-out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
-ConsumerGroupCommand.main(cgcArgs2);
-return null;
-}));
-return out.get().contains("STATE") && 
out.get().contains(simpleGroup) && out.get().contains(GROUP);
-}, "Expected to find " + simpleGroup + ", " + GROUP + " and the 
header, but found " + out.get());
-
-String[] cgcArgs3 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "Stable"};
-TestUtils.waitForCondition(() -> {
-out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
-ConsumerGroupCommand.main(cgcArgs3);
-return null;
-}));
-return out.get().contains("STATE") && out.get().contains(GROUP) && 
out.get().contains("Stable");
-}, "Expected to find " + GROUP + " in state Stable and the header, but 
found " + out.get());
-
-String[] cgcArgs4 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "stable"};
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list"),
+Collections.emptyList(),
+mkSet(
+Collections.singletonList(GROUP),
+Collections.singletonList(simpleGroup)
+)
+);
+
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state"),
+Arrays.asList("GROUP", "STATE"),
+mkSet(
+Arrays.asList(GROUP, "Stable"),
+Arrays.asList(simpleGroup, "Empty")
+)
+);
+
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "Stable"),
+Arrays.asList("GROUP", "STATE"),
+mkSet(
+Arrays.asList(GROUP, "Stable")
+)
+);
+
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "stable"),
+Arrays.asList("GROUP", "STATE"),
+mkSet(
+Arrays.asList(GROUP, "Stable")
+)
+);
+}
+
+/**
+ * Validates that the output of the list command corresponds to the 
expected values.
+ *
+ * @param args  The arguments for the command line tool.
+ * @param expectedHeaderThe expected header as a list of strings; or 
an empty list
+ *  if a header is not expected.
+ * @param expectedGroupsThe expected groups as a set of list of 
strings. The list
+ *  of strings corresponds to the columns.

Review Comment:
   I used `expectedRows`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15770: IQv2 must return immutable position [kafka]

2024-02-16 Thread via GitHub


mjsax commented on PR #15219:
URL: https://github.com/apache/kafka/pull/15219#issuecomment-1949323481

   @lucasbru Updates this PR. -- When I started to work on this PR, I thought 
that using `synchronized` would not work, because locking/unlocking would not 
always be method-local. But you are right, turns out it is method local, so I 
dropped `SynchronizedPosition`, what makes your other comments obsolete.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-02-16 Thread via GitHub


florin-akermann commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1492958285


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -191,6 +191,10 @@ public void process(final Record record) {
 }
 }
 
+private boolean isActiveWindow(final long timeFrom, final long timeTo) 
{
+return sharedTimeTracker.streamTime >= timeFrom && timeTo + 
joinGraceMs >= sharedTimeTracker.streamTime;

Review Comment:
   yes, indeed, it is redundant to check for `sharedTimeTracker.streamTime >= 
timeFrom`
   
   adjusted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Draft] KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-16 Thread via GitHub


jolshan closed pull request #14731: [Draft] KIP-994 (Part 1) Minor Enhancements 
to ListTransactionsRequest
URL: https://github.com/apache/kafka/pull/14731


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Draft] KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-16 Thread via GitHub


jolshan commented on PR #14731:
URL: https://github.com/apache/kafka/pull/14731#issuecomment-1949185760

   Closing in favor of https://github.com/apache/kafka/pull/15384


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-16 Thread via GitHub


yyu1993 commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1492876179


##
tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java:
##
@@ -436,16 +436,25 @@ public String name() {
 
 @Override
 public void addSubparser(Subparsers subparsers) {
-subparsers.addParser(name())
+Subparser subparser = subparsers.addParser(name())
 .help("list transactions");
+
+subparser.addArgument("--duration-filter")
+.help("filter duration of transaction in ms, only 
transactions running longer than this duration will be returned")

Review Comment:
   Updated the help message in 
[81173c8](https://github.com/apache/kafka/pull/15384/commits/81173c89dbf50a98671694f99a051dad1e0b1821)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-16 Thread via GitHub


yyu1993 commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1492875733


##
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##
@@ -278,12 +278,13 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
 
   def handleListTransactions(
 filteredProducerIds: Set[Long],
-filteredStates: Set[String]
+filteredStates: Set[String],
+durationFilter: Long = -1

Review Comment:
   Updated default value to -1L in other places for consistency in 
[81173c8](https://github.com/apache/kafka/pull/15384/commits/81173c89dbf50a98671694f99a051dad1e0b1821)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-16 Thread via GitHub


yyu1993 commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1492874583


##
clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java:
##
@@ -35,6 +35,7 @@ public class ListTransactionsOptions extends 
AbstractOptions filteredStates = Collections.emptySet();
 private Set filteredProducerIds = Collections.emptySet();
 
+private Long durationFilter = 0L;

Review Comment:
   Updated default value to -1L in 
[81173c8](https://github.com/apache/kafka/pull/15384/commits/81173c89dbf50a98671694f99a051dad1e0b1821)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-16 Thread via GitHub


jolshan commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1492849366


##
tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java:
##
@@ -436,16 +436,25 @@ public String name() {
 
 @Override
 public void addSubparser(Subparsers subparsers) {
-subparsers.addParser(name())
+Subparser subparser = subparsers.addParser(name())
 .help("list transactions");
+
+subparser.addArgument("--duration-filter")
+.help("filter duration of transaction in ms, only 
transactions running longer than this duration will be returned")

Review Comment:
   we should mention the default/value that gives all running transactions. 
Providing 0 and getting a ton of transactions might be confusing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Improve ListConsumerGroupTest.testListGroupCommand [kafka]

2024-02-16 Thread via GitHub


rreddy-22 commented on code in PR #15382:
URL: https://github.com/apache/kafka/pull/15382#discussion_r1492848158


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java:
##
@@ -123,42 +128,89 @@ public void testListGroupCommand(String quorum) throws 
Exception {
 String simpleGroup = "simple-group";
 addSimpleGroupExecutor(simpleGroup);
 addConsumerGroupExecutor(1);
-final AtomicReference out = new AtomicReference<>("");
 
-String[] cgcArgs1 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list"};
-TestUtils.waitForCondition(() -> {
-out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
-ConsumerGroupCommand.main(cgcArgs1);
-return null;
-}));
-return !out.get().contains("STATE") && 
out.get().contains(simpleGroup) && out.get().contains(GROUP);
-}, "Expected to find " + simpleGroup + ", " + GROUP + " and no header, 
but found " + out.get());
-
-String[] cgcArgs2 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state"};
-TestUtils.waitForCondition(() -> {
-out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
-ConsumerGroupCommand.main(cgcArgs2);
-return null;
-}));
-return out.get().contains("STATE") && 
out.get().contains(simpleGroup) && out.get().contains(GROUP);
-}, "Expected to find " + simpleGroup + ", " + GROUP + " and the 
header, but found " + out.get());
-
-String[] cgcArgs3 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "Stable"};
-TestUtils.waitForCondition(() -> {
-out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
-ConsumerGroupCommand.main(cgcArgs3);
-return null;
-}));
-return out.get().contains("STATE") && out.get().contains(GROUP) && 
out.get().contains("Stable");
-}, "Expected to find " + GROUP + " in state Stable and the header, but 
found " + out.get());
-
-String[] cgcArgs4 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "stable"};
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list"),
+Collections.emptyList(),
+mkSet(
+Collections.singletonList(GROUP),
+Collections.singletonList(simpleGroup)
+)
+);
+
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state"),
+Arrays.asList("GROUP", "STATE"),
+mkSet(
+Arrays.asList(GROUP, "Stable"),
+Arrays.asList(simpleGroup, "Empty")
+)
+);
+
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "Stable"),
+Arrays.asList("GROUP", "STATE"),
+mkSet(
+Arrays.asList(GROUP, "Stable")
+)
+);
+
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "stable"),
+Arrays.asList("GROUP", "STATE"),
+mkSet(
+Arrays.asList(GROUP, "Stable")
+)
+);
+}
+
+/**
+ * Validates that the output of the list command corresponds to the 
expected values.
+ *
+ * @param args  The arguments for the command line tool.
+ * @param expectedHeaderThe expected header as a list of strings; or 
an empty list
+ *  if a header is not expected.
+ * @param expectedGroupsThe expected groups as a set of list of 
strings. The list
+ *  of strings corresponds to the columns.
+ * @throws InterruptedException
+ */
+private static void validateListOutput(
+List args,
+List expectedHeader,
+Set> expectedGroups
+) throws InterruptedException {
+final AtomicReference out = new AtomicReference<>("");
 TestUtils.waitForCondition(() -> {
-out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
-ConsumerGroupCommand.main(cgcArgs4);
-return null;
-}));
-return out.get().contains("STATE") && out.get().contains(GROUP) && 
out.get().contains("Stable");
-}, "Expected to find " + GROUP + " in state Stable and the header, but 
found " + out.get());
+String output = runAndGrabConsoleOutput(args);
+out.set(output);
+
+int index = 0;
+String[] lines = output.split("\n");
+
+// Parse the header if one is expected.
+if 

Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-16 Thread via GitHub


jolshan commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1492847691


##
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##
@@ -278,12 +278,13 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
 
   def handleListTransactions(
 filteredProducerIds: Set[Long],
-filteredStates: Set[String]
+filteredStates: Set[String],
+durationFilter: Long = -1

Review Comment:
   nit: this is -1 -- is that intended?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-16 Thread via GitHub


msn-tldr commented on PR #15323:
URL: https://github.com/apache/kafka/pull/15323#issuecomment-1949077518

   @ijuma thanks for flagging https://github.com/apache/kafka/pull/15376. 
   
   @hachikuji  Looks like this was going to add a test that tested the 
concurrent update of `Metadata`, and fetching `MetadataSnapshot`/`Cluster`. 
This is useful, so i have created a follow-up PR 
https://github.com/apache/kafka/pull/15385


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-16 Thread via GitHub


jolshan commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1492846127


##
clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java:
##
@@ -35,6 +35,7 @@ public class ListTransactionsOptions extends 
AbstractOptions filteredStates = Collections.emptySet();
 private Set filteredProducerIds = Collections.emptySet();
 
+private Long durationFilter = 0L;

Review Comment:
   nit: any reason we chose default to be zero and not -1?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Improve ListConsumerGroupTest.testListGroupCommand [kafka]

2024-02-16 Thread via GitHub


rreddy-22 commented on code in PR #15382:
URL: https://github.com/apache/kafka/pull/15382#discussion_r1492844519


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java:
##
@@ -123,42 +128,89 @@ public void testListGroupCommand(String quorum) throws 
Exception {
 String simpleGroup = "simple-group";
 addSimpleGroupExecutor(simpleGroup);
 addConsumerGroupExecutor(1);
-final AtomicReference out = new AtomicReference<>("");
 
-String[] cgcArgs1 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list"};
-TestUtils.waitForCondition(() -> {
-out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
-ConsumerGroupCommand.main(cgcArgs1);
-return null;
-}));
-return !out.get().contains("STATE") && 
out.get().contains(simpleGroup) && out.get().contains(GROUP);
-}, "Expected to find " + simpleGroup + ", " + GROUP + " and no header, 
but found " + out.get());
-
-String[] cgcArgs2 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state"};
-TestUtils.waitForCondition(() -> {
-out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
-ConsumerGroupCommand.main(cgcArgs2);
-return null;
-}));
-return out.get().contains("STATE") && 
out.get().contains(simpleGroup) && out.get().contains(GROUP);
-}, "Expected to find " + simpleGroup + ", " + GROUP + " and the 
header, but found " + out.get());
-
-String[] cgcArgs3 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "Stable"};
-TestUtils.waitForCondition(() -> {
-out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
-ConsumerGroupCommand.main(cgcArgs3);
-return null;
-}));
-return out.get().contains("STATE") && out.get().contains(GROUP) && 
out.get().contains("Stable");
-}, "Expected to find " + GROUP + " in state Stable and the header, but 
found " + out.get());
-
-String[] cgcArgs4 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "stable"};
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list"),
+Collections.emptyList(),
+mkSet(
+Collections.singletonList(GROUP),
+Collections.singletonList(simpleGroup)
+)
+);
+
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state"),
+Arrays.asList("GROUP", "STATE"),
+mkSet(
+Arrays.asList(GROUP, "Stable"),
+Arrays.asList(simpleGroup, "Empty")
+)
+);
+
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "Stable"),
+Arrays.asList("GROUP", "STATE"),
+mkSet(
+Arrays.asList(GROUP, "Stable")
+)
+);
+
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "stable"),
+Arrays.asList("GROUP", "STATE"),
+mkSet(
+Arrays.asList(GROUP, "Stable")
+)
+);
+}
+
+/**
+ * Validates that the output of the list command corresponds to the 
expected values.
+ *
+ * @param args  The arguments for the command line tool.
+ * @param expectedHeaderThe expected header as a list of strings; or 
an empty list
+ *  if a header is not expected.
+ * @param expectedGroupsThe expected groups as a set of list of 
strings. The list
+ *  of strings corresponds to the columns.
+ * @throws InterruptedException
+ */
+private static void validateListOutput(
+List args,
+List expectedHeader,
+Set> expectedGroups
+) throws InterruptedException {
+final AtomicReference out = new AtomicReference<>("");
 TestUtils.waitForCondition(() -> {
-out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
-ConsumerGroupCommand.main(cgcArgs4);
-return null;
-}));
-return out.get().contains("STATE") && out.get().contains(GROUP) && 
out.get().contains("Stable");
-}, "Expected to find " + GROUP + " in state Stable and the header, but 
found " + out.get());
+String output = runAndGrabConsoleOutput(args);
+out.set(output);
+
+int index = 0;
+String[] lines = output.split("\n");
+
+// Parse the header if one is expected.
+if 

[PR] KAFKA-16226 Add test for concurrently updatingMetadata and fetching snapshot/cluster [kafka]

2024-02-16 Thread via GitHub


msn-tldr opened a new pull request, #15385:
URL: https://github.com/apache/kafka/pull/15385

   This is a follow-up to https://github.com/apache/kafka/pull/15323. 
   
   Metadata is typically updated concurrently in the background thread, and the 
MetadataSnapshot/Cluster are fetched & used in another thread(typically 
application thread). Make sure the concurrent update & read works as expected.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15665: Enforce partition reassignment should complete when all target replicas are in ISR [kafka]

2024-02-16 Thread via GitHub


jolshan merged PR #15359:
URL: https://github.com/apache/kafka/pull/15359


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-16 Thread via GitHub


yyu1993 opened a new pull request, #15384:
URL: https://github.com/apache/kafka/pull/15384

   Introduces a new filter in ListTransactionsRequest API. This enables caller 
to filter on transactions that have been running for longer than a certain 
duration of time.
   This PR includes the following changes:
   1.  bumps version for ListTransactionsRequest API to 1. Set the 
durationFilter to 0 when communicating with an older broker that does not 
support version 1.
   2. bumps version for ListTransactionsResponse to 1 without changing the 
response structure.
   3. adds durationFilter option to `kafka-transactions.sh --list`
   
   Tests:
   
   - Client side test to build request with correct combination of duration 
filter and API version. `testBuildRequestWithDurationFilter`
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16264) Expose `producer.id.expiration.check.interval.ms` as dynamic broker configuration

2024-02-16 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818064#comment-17818064
 ] 

Justine Olshan commented on KAFKA-16264:


Hey [~jeqo] thanks for filing. I was also thinking about this after you filed 
the first ticket.


One thing that is interesting is right now the expiration is scheduled on a 
separate thread on startup. I guess the best course of action is to cancel that 
scheduled task and create a new one?
See UnifiedLog producerExpireCheck.

> Expose `producer.id.expiration.check.interval.ms` as dynamic broker 
> configuration
> -
>
> Key: KAFKA-16264
> URL: https://issues.apache.org/jira/browse/KAFKA-16264
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>
> Dealing with a scenario where too many producer ids lead to issues (e.g. high 
> cpu utilization, see KAFKA-16229) put operators in need to flush producer ids 
> more promptly than usual.
> Currently, only the expiration timeout `producer.id.expiration.ms` is exposed 
> as dynamic config. This is helpful (e.g. by reducing the timeout, less 
> producer would eventually be kept in memory), but not enough if the 
> evaluation frequency is not sufficiently short to flush producer ids before 
> becoming an issue. Only by tuning both, the issue could be workaround.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: WakeupTrigger cleanup [kafka]

2024-02-16 Thread via GitHub


kirktrue closed pull request #14752: MINOR: WakeupTrigger cleanup
URL: https://github.com/apache/kafka/pull/14752


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Improve ListConsumerGroupTest.testListGroupCommand [kafka]

2024-02-16 Thread via GitHub


rreddy-22 commented on code in PR #15382:
URL: https://github.com/apache/kafka/pull/15382#discussion_r1492799701


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java:
##
@@ -123,42 +128,89 @@ public void testListGroupCommand(String quorum) throws 
Exception {
 String simpleGroup = "simple-group";
 addSimpleGroupExecutor(simpleGroup);
 addConsumerGroupExecutor(1);
-final AtomicReference out = new AtomicReference<>("");
 
-String[] cgcArgs1 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list"};
-TestUtils.waitForCondition(() -> {
-out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
-ConsumerGroupCommand.main(cgcArgs1);
-return null;
-}));
-return !out.get().contains("STATE") && 
out.get().contains(simpleGroup) && out.get().contains(GROUP);
-}, "Expected to find " + simpleGroup + ", " + GROUP + " and no header, 
but found " + out.get());
-
-String[] cgcArgs2 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state"};
-TestUtils.waitForCondition(() -> {
-out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
-ConsumerGroupCommand.main(cgcArgs2);
-return null;
-}));
-return out.get().contains("STATE") && 
out.get().contains(simpleGroup) && out.get().contains(GROUP);
-}, "Expected to find " + simpleGroup + ", " + GROUP + " and the 
header, but found " + out.get());
-
-String[] cgcArgs3 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "Stable"};
-TestUtils.waitForCondition(() -> {
-out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
-ConsumerGroupCommand.main(cgcArgs3);
-return null;
-}));
-return out.get().contains("STATE") && out.get().contains(GROUP) && 
out.get().contains("Stable");
-}, "Expected to find " + GROUP + " in state Stable and the header, but 
found " + out.get());
-
-String[] cgcArgs4 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "stable"};
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list"),
+Collections.emptyList(),
+mkSet(
+Collections.singletonList(GROUP),
+Collections.singletonList(simpleGroup)
+)
+);
+
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state"),
+Arrays.asList("GROUP", "STATE"),
+mkSet(
+Arrays.asList(GROUP, "Stable"),
+Arrays.asList(simpleGroup, "Empty")
+)
+);
+
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "Stable"),
+Arrays.asList("GROUP", "STATE"),
+mkSet(
+Arrays.asList(GROUP, "Stable")
+)
+);
+
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "stable"),
+Arrays.asList("GROUP", "STATE"),
+mkSet(
+Arrays.asList(GROUP, "Stable")
+)
+);
+}
+
+/**
+ * Validates that the output of the list command corresponds to the 
expected values.
+ *
+ * @param args  The arguments for the command line tool.
+ * @param expectedHeaderThe expected header as a list of strings; or 
an empty list
+ *  if a header is not expected.
+ * @param expectedGroupsThe expected groups as a set of list of 
strings. The list
+ *  of strings corresponds to the columns.
+ * @throws InterruptedException
+ */
+private static void validateListOutput(

Review Comment:
   The return type should be boolean no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Improve ListConsumerGroupTest.testListGroupCommand [kafka]

2024-02-16 Thread via GitHub


jolshan commented on code in PR #15382:
URL: https://github.com/apache/kafka/pull/15382#discussion_r1492801866


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java:
##
@@ -123,42 +128,89 @@ public void testListGroupCommand(String quorum) throws 
Exception {
 String simpleGroup = "simple-group";
 addSimpleGroupExecutor(simpleGroup);
 addConsumerGroupExecutor(1);
-final AtomicReference out = new AtomicReference<>("");
 
-String[] cgcArgs1 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list"};
-TestUtils.waitForCondition(() -> {
-out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
-ConsumerGroupCommand.main(cgcArgs1);
-return null;
-}));
-return !out.get().contains("STATE") && 
out.get().contains(simpleGroup) && out.get().contains(GROUP);
-}, "Expected to find " + simpleGroup + ", " + GROUP + " and no header, 
but found " + out.get());
-
-String[] cgcArgs2 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state"};
-TestUtils.waitForCondition(() -> {
-out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
-ConsumerGroupCommand.main(cgcArgs2);
-return null;
-}));
-return out.get().contains("STATE") && 
out.get().contains(simpleGroup) && out.get().contains(GROUP);
-}, "Expected to find " + simpleGroup + ", " + GROUP + " and the 
header, but found " + out.get());
-
-String[] cgcArgs3 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "Stable"};
-TestUtils.waitForCondition(() -> {
-out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
-ConsumerGroupCommand.main(cgcArgs3);
-return null;
-}));
-return out.get().contains("STATE") && out.get().contains(GROUP) && 
out.get().contains("Stable");
-}, "Expected to find " + GROUP + " in state Stable and the header, but 
found " + out.get());
-
-String[] cgcArgs4 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "stable"};
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list"),
+Collections.emptyList(),
+mkSet(
+Collections.singletonList(GROUP),
+Collections.singletonList(simpleGroup)
+)
+);
+
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state"),
+Arrays.asList("GROUP", "STATE"),
+mkSet(
+Arrays.asList(GROUP, "Stable"),
+Arrays.asList(simpleGroup, "Empty")
+)
+);
+
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "Stable"),
+Arrays.asList("GROUP", "STATE"),
+mkSet(
+Arrays.asList(GROUP, "Stable")
+)
+);
+
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "stable"),
+Arrays.asList("GROUP", "STATE"),
+mkSet(
+Arrays.asList(GROUP, "Stable")
+)
+);
+}
+
+/**
+ * Validates that the output of the list command corresponds to the 
expected values.
+ *
+ * @param args  The arguments for the command line tool.
+ * @param expectedHeaderThe expected header as a list of strings; or 
an empty list
+ *  if a header is not expected.
+ * @param expectedGroupsThe expected groups as a set of list of 
strings. The list
+ *  of strings corresponds to the columns.
+ * @throws InterruptedException
+ */
+private static void validateListOutput(

Review Comment:
   The method itself does the asserts, so I think it is ok to be void.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Improve ListConsumerGroupTest.testListGroupCommand [kafka]

2024-02-16 Thread via GitHub


rreddy-22 commented on code in PR #15382:
URL: https://github.com/apache/kafka/pull/15382#discussion_r1492799701


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java:
##
@@ -123,42 +128,89 @@ public void testListGroupCommand(String quorum) throws 
Exception {
 String simpleGroup = "simple-group";
 addSimpleGroupExecutor(simpleGroup);
 addConsumerGroupExecutor(1);
-final AtomicReference out = new AtomicReference<>("");
 
-String[] cgcArgs1 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list"};
-TestUtils.waitForCondition(() -> {
-out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
-ConsumerGroupCommand.main(cgcArgs1);
-return null;
-}));
-return !out.get().contains("STATE") && 
out.get().contains(simpleGroup) && out.get().contains(GROUP);
-}, "Expected to find " + simpleGroup + ", " + GROUP + " and no header, 
but found " + out.get());
-
-String[] cgcArgs2 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state"};
-TestUtils.waitForCondition(() -> {
-out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
-ConsumerGroupCommand.main(cgcArgs2);
-return null;
-}));
-return out.get().contains("STATE") && 
out.get().contains(simpleGroup) && out.get().contains(GROUP);
-}, "Expected to find " + simpleGroup + ", " + GROUP + " and the 
header, but found " + out.get());
-
-String[] cgcArgs3 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "Stable"};
-TestUtils.waitForCondition(() -> {
-out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
-ConsumerGroupCommand.main(cgcArgs3);
-return null;
-}));
-return out.get().contains("STATE") && out.get().contains(GROUP) && 
out.get().contains("Stable");
-}, "Expected to find " + GROUP + " in state Stable and the header, but 
found " + out.get());
-
-String[] cgcArgs4 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "stable"};
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list"),
+Collections.emptyList(),
+mkSet(
+Collections.singletonList(GROUP),
+Collections.singletonList(simpleGroup)
+)
+);
+
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state"),
+Arrays.asList("GROUP", "STATE"),
+mkSet(
+Arrays.asList(GROUP, "Stable"),
+Arrays.asList(simpleGroup, "Empty")
+)
+);
+
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "Stable"),
+Arrays.asList("GROUP", "STATE"),
+mkSet(
+Arrays.asList(GROUP, "Stable")
+)
+);
+
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "stable"),
+Arrays.asList("GROUP", "STATE"),
+mkSet(
+Arrays.asList(GROUP, "Stable")
+)
+);
+}
+
+/**
+ * Validates that the output of the list command corresponds to the 
expected values.
+ *
+ * @param args  The arguments for the command line tool.
+ * @param expectedHeaderThe expected header as a list of strings; or 
an empty list
+ *  if a header is not expected.
+ * @param expectedGroupsThe expected groups as a set of list of 
strings. The list
+ *  of strings corresponds to the columns.
+ * @throws InterruptedException
+ */
+private static void validateListOutput(

Review Comment:
   The return type should be boolean no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Improve ListConsumerGroupTest.testListGroupCommand [kafka]

2024-02-16 Thread via GitHub


jolshan commented on code in PR #15382:
URL: https://github.com/apache/kafka/pull/15382#discussion_r1492798225


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java:
##
@@ -123,42 +128,89 @@ public void testListGroupCommand(String quorum) throws 
Exception {
 String simpleGroup = "simple-group";
 addSimpleGroupExecutor(simpleGroup);
 addConsumerGroupExecutor(1);
-final AtomicReference out = new AtomicReference<>("");
 
-String[] cgcArgs1 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list"};
-TestUtils.waitForCondition(() -> {
-out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
-ConsumerGroupCommand.main(cgcArgs1);
-return null;
-}));
-return !out.get().contains("STATE") && 
out.get().contains(simpleGroup) && out.get().contains(GROUP);
-}, "Expected to find " + simpleGroup + ", " + GROUP + " and no header, 
but found " + out.get());
-
-String[] cgcArgs2 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state"};
-TestUtils.waitForCondition(() -> {
-out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
-ConsumerGroupCommand.main(cgcArgs2);
-return null;
-}));
-return out.get().contains("STATE") && 
out.get().contains(simpleGroup) && out.get().contains(GROUP);
-}, "Expected to find " + simpleGroup + ", " + GROUP + " and the 
header, but found " + out.get());
-
-String[] cgcArgs3 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "Stable"};
-TestUtils.waitForCondition(() -> {
-out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
-ConsumerGroupCommand.main(cgcArgs3);
-return null;
-}));
-return out.get().contains("STATE") && out.get().contains(GROUP) && 
out.get().contains("Stable");
-}, "Expected to find " + GROUP + " in state Stable and the header, but 
found " + out.get());
-
-String[] cgcArgs4 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "stable"};
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list"),
+Collections.emptyList(),
+mkSet(
+Collections.singletonList(GROUP),
+Collections.singletonList(simpleGroup)
+)
+);
+
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state"),
+Arrays.asList("GROUP", "STATE"),
+mkSet(
+Arrays.asList(GROUP, "Stable"),
+Arrays.asList(simpleGroup, "Empty")
+)
+);
+
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "Stable"),
+Arrays.asList("GROUP", "STATE"),
+mkSet(
+Arrays.asList(GROUP, "Stable")
+)
+);
+
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "stable"),
+Arrays.asList("GROUP", "STATE"),
+mkSet(
+Arrays.asList(GROUP, "Stable")
+)
+);
+}
+
+/**
+ * Validates that the output of the list command corresponds to the 
expected values.
+ *
+ * @param args  The arguments for the command line tool.
+ * @param expectedHeaderThe expected header as a list of strings; or 
an empty list
+ *  if a header is not expected.
+ * @param expectedGroupsThe expected groups as a set of list of 
strings. The list
+ *  of strings corresponds to the columns.

Review Comment:
   as an aside, I like that we are being more thorough with checking the 
components are in the right rows and columns  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Improve ListConsumerGroupTest.testListGroupCommand [kafka]

2024-02-16 Thread via GitHub


rreddy-22 commented on code in PR #15382:
URL: https://github.com/apache/kafka/pull/15382#discussion_r1492788275


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java:
##
@@ -123,42 +128,89 @@ public void testListGroupCommand(String quorum) throws 
Exception {
 String simpleGroup = "simple-group";
 addSimpleGroupExecutor(simpleGroup);
 addConsumerGroupExecutor(1);
-final AtomicReference out = new AtomicReference<>("");
 
-String[] cgcArgs1 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list"};
-TestUtils.waitForCondition(() -> {
-out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
-ConsumerGroupCommand.main(cgcArgs1);
-return null;
-}));
-return !out.get().contains("STATE") && 
out.get().contains(simpleGroup) && out.get().contains(GROUP);
-}, "Expected to find " + simpleGroup + ", " + GROUP + " and no header, 
but found " + out.get());
-
-String[] cgcArgs2 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state"};
-TestUtils.waitForCondition(() -> {
-out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
-ConsumerGroupCommand.main(cgcArgs2);
-return null;
-}));
-return out.get().contains("STATE") && 
out.get().contains(simpleGroup) && out.get().contains(GROUP);
-}, "Expected to find " + simpleGroup + ", " + GROUP + " and the 
header, but found " + out.get());
-
-String[] cgcArgs3 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "Stable"};
-TestUtils.waitForCondition(() -> {
-out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
-ConsumerGroupCommand.main(cgcArgs3);
-return null;
-}));
-return out.get().contains("STATE") && out.get().contains(GROUP) && 
out.get().contains("Stable");
-}, "Expected to find " + GROUP + " in state Stable and the header, but 
found " + out.get());
-
-String[] cgcArgs4 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "stable"};
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list"),
+Collections.emptyList(),
+mkSet(
+Collections.singletonList(GROUP),
+Collections.singletonList(simpleGroup)
+)
+);
+
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state"),
+Arrays.asList("GROUP", "STATE"),
+mkSet(
+Arrays.asList(GROUP, "Stable"),
+Arrays.asList(simpleGroup, "Empty")
+)
+);
+
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "Stable"),
+Arrays.asList("GROUP", "STATE"),
+mkSet(
+Arrays.asList(GROUP, "Stable")
+)
+);
+
+validateListOutput(
+Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "stable"),
+Arrays.asList("GROUP", "STATE"),
+mkSet(
+Arrays.asList(GROUP, "Stable")
+)
+);
+}
+
+/**
+ * Validates that the output of the list command corresponds to the 
expected values.
+ *
+ * @param args  The arguments for the command line tool.
+ * @param expectedHeaderThe expected header as a list of strings; or 
an empty list
+ *  if a header is not expected.
+ * @param expectedGroupsThe expected groups as a set of list of 
strings. The list
+ *  of strings corresponds to the columns.

Review Comment:
   can we say expected* columns?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14588 ZK configuration moved to ZkConfig [kafka]

2024-02-16 Thread via GitHub


nizhikov commented on PR #15075:
URL: https://github.com/apache/kafka/pull/15075#issuecomment-1948922545

   Hello @mimaison @ijuma 
   
   This PR is ready for review.
   Please, take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16265) Add durationFilter to ListTransactionsRequest

2024-02-16 Thread Yang Yu (Jira)
Yang Yu created KAFKA-16265:
---

 Summary: Add durationFilter to ListTransactionsRequest
 Key: KAFKA-16265
 URL: https://issues.apache.org/jira/browse/KAFKA-16265
 Project: Kafka
  Issue Type: Sub-task
Reporter: Yang Yu


* Add durationFilter field to ListTransactionsRequest, make corresponding 
server side changes
 * Make appropriate version bumps for ListTransactionsRequest and 
ListTransationsResponse
 * Add durationFilter option to kafka-transactions.sh --list



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16266) Introduce TransactionLastUpdateTimeMs tagged field to DescribeTransactionsResponse

2024-02-16 Thread Yang Yu (Jira)
Yang Yu created KAFKA-16266:
---

 Summary:  Introduce TransactionLastUpdateTimeMs tagged field to 
DescribeTransactionsResponse
 Key: KAFKA-16266
 URL: https://issues.apache.org/jira/browse/KAFKA-16266
 Project: Kafka
  Issue Type: Sub-task
Reporter: Yang Yu


Introduce TransactionLastUpdateTimeMs tagged field to 
DescribeTransactionsResponse. Make broker side changes to send this bit of 
information. Also, make changes to `kafka-transactions.sh --describe` tooling 
to display this new piece of information to the output.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-02-16 Thread via GitHub


mjsax commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1492754199


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -609,17 +842,18 @@ public void testOrdering() {
 inputTopic1.pipeInput(1, "A1", 100L);
 processor.checkAndClearProcessResult();
 
-// push one item to the other window that has a join; this should 
produce non-joined records with a closed window first, then
-// the joined records
+// push one item to the other window that has a join; 
+// this should produce the joined record first;
+// then non-joined record with a closed window
 // by the time they were produced before
 // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // w2 = { }
 // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // --> w2 = { 1:a1 (ts: 110) }
 inputTopic2.pipeInput(1, "a1", 110L);
 processor.checkAndClearProcessResult(
-new KeyValueTimestamp<>(0, "A0+null", 0L),
-new KeyValueTimestamp<>(1, "A1+a1", 110L)
+new KeyValueTimestamp<>(1, "A1+a1", 110L),
+new KeyValueTimestamp<>(0, "A0+null", 0L)

Review Comment:
   Seems this PR is basically ready for merging, so it might be faster to go 
with option (2), and revert changing the order in this PR and we can merge it. 
-- Of course, I would want to make a final pass after the change, to check the 
testing code again to verify correctness.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-02-16 Thread via GitHub


mjsax commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1492754199


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -609,17 +842,18 @@ public void testOrdering() {
 inputTopic1.pipeInput(1, "A1", 100L);
 processor.checkAndClearProcessResult();
 
-// push one item to the other window that has a join; this should 
produce non-joined records with a closed window first, then
-// the joined records
+// push one item to the other window that has a join; 
+// this should produce the joined record first;
+// then non-joined record with a closed window
 // by the time they were produced before
 // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // w2 = { }
 // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // --> w2 = { 1:a1 (ts: 110) }
 inputTopic2.pipeInput(1, "a1", 110L);
 processor.checkAndClearProcessResult(
-new KeyValueTimestamp<>(0, "A0+null", 0L),
-new KeyValueTimestamp<>(1, "A1+a1", 110L)
+new KeyValueTimestamp<>(1, "A1+a1", 110L),
+new KeyValueTimestamp<>(0, "A0+null", 0L)

Review Comment:
   Seems this PR is basically ready for merging, to it might be faster to go 
with option (2), and revert changing the order in this PR and we can merge it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-02-16 Thread via GitHub


mjsax commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1492754199


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -609,17 +842,18 @@ public void testOrdering() {
 inputTopic1.pipeInput(1, "A1", 100L);
 processor.checkAndClearProcessResult();
 
-// push one item to the other window that has a join; this should 
produce non-joined records with a closed window first, then
-// the joined records
+// push one item to the other window that has a join; 
+// this should produce the joined record first;
+// then non-joined record with a closed window
 // by the time they were produced before
 // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // w2 = { }
 // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // --> w2 = { 1:a1 (ts: 110) }
 inputTopic2.pipeInput(1, "a1", 110L);
 processor.checkAndClearProcessResult(
-new KeyValueTimestamp<>(0, "A0+null", 0L),
-new KeyValueTimestamp<>(1, "A1+a1", 110L)
+new KeyValueTimestamp<>(1, "A1+a1", 110L),
+new KeyValueTimestamp<>(0, "A0+null", 0L)

Review Comment:
   Seems this PR is basically ready for merging, so it might be faster to go 
with option (2), and revert changing the order in this PR and we can merge it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: add note about Kafka Streams feature for 3.7 release [kafka]

2024-02-16 Thread via GitHub


mjsax merged PR #15380:
URL: https://github.com/apache/kafka/pull/15380


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16243: Make sure that we do not exceed max poll interval inside poll [kafka]

2024-02-16 Thread via GitHub


lucasbru commented on PR #15372:
URL: https://github.com/apache/kafka/pull/15372#issuecomment-1948876670

   Hey @mjsax . This PR is right now waiting for @cadonna's review. If you have 
time, you could take a look to get into the 848 work.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16009: Fix PlaintextConsumerTest. testMaxPollIntervalMsDelayInRevocation [kafka]

2024-02-16 Thread via GitHub


lucasbru commented on PR #15383:
URL: https://github.com/apache/kafka/pull/15383#issuecomment-1948876369

   Hey @mjsax . This PR is right now waiting for @cadonna's review. If you have 
time, you could take a look to get into the 848 work.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-02-16 Thread via GitHub


rreddy-22 commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1492719703


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java:
##
@@ -143,22 +312,134 @@ public void testListGroupCommand(String quorum) throws 
Exception {
 return out.get().contains("STATE") && 
out.get().contains(simpleGroup) && out.get().contains(GROUP);
 }, "Expected to find " + simpleGroup + ", " + GROUP + " and the 
header, but found " + out.get());
 
-String[] cgcArgs3 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "Stable"};
+String[] cgcArgs3 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--type"};
 TestUtils.waitForCondition(() -> {
 out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
 ConsumerGroupCommand.main(cgcArgs3);
 return null;
 }));
+return out.get().contains("TYPE") && !out.get().contains("STATE") 
&& out.get().contains(simpleGroup) && out.get().contains(GROUP);
+}, "Expected to find " + simpleGroup + ", " + GROUP + " and the 
header, but found " + out.get());
+
+String[] cgcArgs4 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "--type"};
+TestUtils.waitForCondition(() -> {
+out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
+ConsumerGroupCommand.main(cgcArgs4);
+return null;
+}));
+return out.get().contains("TYPE") && out.get().contains("STATE") 
&& out.get().contains(simpleGroup) && out.get().contains(GROUP);
+}, "Expected to find " + simpleGroup + ", " + GROUP + " and the 
header, but found " + out.get());
+
+String[] cgcArgs5 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "Stable"};
+TestUtils.waitForCondition(() -> {
+out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
+ConsumerGroupCommand.main(cgcArgs5);
+return null;
+}));
 return out.get().contains("STATE") && out.get().contains(GROUP) && 
out.get().contains("Stable");
 }, "Expected to find " + GROUP + " in state Stable and the header, but 
found " + out.get());
 
-String[] cgcArgs4 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "stable"};
+String[] cgcArgs6 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "stable"};
 TestUtils.waitForCondition(() -> {
 out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
-ConsumerGroupCommand.main(cgcArgs4);
+ConsumerGroupCommand.main(cgcArgs6);
 return null;
 }));
 return out.get().contains("STATE") && out.get().contains(GROUP) && 
out.get().contains("Stable");
 }, "Expected to find " + GROUP + " in state Stable and the header, but 
found " + out.get());
+
+String[] cgcArgs7 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--type", "Classic"};
+TestUtils.waitForCondition(() -> {
+out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
+ConsumerGroupCommand.main(cgcArgs7);
+return null;
+}));
+return out.get().contains("TYPE") && out.get().contains("Classic") 
&& !out.get().contains("STATE") &&
+out.get().contains(simpleGroup) && out.get().contains(GROUP);
+}, "Expected to find " + GROUP + " and the header, but found " + 
out.get());
+
+String[] cgcArgs8 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--type", "classic"};
+TestUtils.waitForCondition(() -> {
+out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
+ConsumerGroupCommand.main(cgcArgs8);
+return null;
+}));
+return out.get().contains("TYPE") && out.get().contains("Classic") 
&& !out.get().contains("STATE") &&
+out.get().contains(simpleGroup) && out.get().contains(GROUP);
+}, "Expected to find " + GROUP + " and the header, but found " + 
out.get());

Review Comment:
   Thanks a lot! I'll take a look!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-02-16 Thread via GitHub


dajac commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1492698285


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java:
##
@@ -20,78 +20,225 @@
 import kafka.admin.ConsumerGroupCommand;
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
 import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.GroupType;
 import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static 
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class ListConsumerGroupTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testListConsumerGroups(String quorum) throws Exception {
+@ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+@MethodSource("getTestQuorumAndGroupProtocolParametersAll")
+public void testListConsumerGroupsWithoutFilters(String quorum, String 
groupProtocol) throws Exception {
 String simpleGroup = "simple-group";
+
+createOffsetsTopic(listenerName(), new Properties());
+
 addSimpleGroupExecutor(simpleGroup);
 addConsumerGroupExecutor(1);
+addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol);
 
 String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list"};
 ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-scala.collection.Set expectedGroups = set(Arrays.asList(GROUP, 
simpleGroup));
+
+scala.collection.Set expectedGroups = set(Arrays.asList(GROUP, 
simpleGroup, PROTOCOL_GROUP));
 final AtomicReference foundGroups = new 
AtomicReference<>();
+
 TestUtils.waitForCondition(() -> {
 foundGroups.set(service.listConsumerGroups().toSet());
 return Objects.equals(expectedGroups, foundGroups.get());
 }, "Expected --list to show groups " + expectedGroups + ", but found " 
+ foundGroups.get() + ".");
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testListWithUnrecognizedNewConsumerOption() {
+@Test
+public void testListWithUnrecognizedNewConsumerOption() throws Exception {
 String[] cgcArgs = new String[]{"--new-consumer", 
"--bootstrap-server", bootstrapServers(listenerName()), "--list"};
 assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testListConsumerGroupsWithStates() throws Exception {
+@ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+@MethodSource("getTestQuorumAndGroupProtocolParametersAll")
+public void testListConsumerGroupsWithStates(String quorum, String 
groupProtocol) throws Exception {

Review Comment:
   Sorry, I very likely missed your reply. It would be better to change it in 
my opinion. We could pass the group protocol to addConsumerGroupExecutor and 
adapt the assertions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-02-16 Thread via GitHub


rreddy-22 commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1492695529


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java:
##
@@ -20,78 +20,225 @@
 import kafka.admin.ConsumerGroupCommand;
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
 import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.GroupType;
 import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static 
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class ListConsumerGroupTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testListConsumerGroups(String quorum) throws Exception {
+@ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+@MethodSource("getTestQuorumAndGroupProtocolParametersAll")
+public void testListConsumerGroupsWithoutFilters(String quorum, String 
groupProtocol) throws Exception {
 String simpleGroup = "simple-group";
+
+createOffsetsTopic(listenerName(), new Properties());
+
 addSimpleGroupExecutor(simpleGroup);
 addConsumerGroupExecutor(1);
+addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol);
 
 String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list"};
 ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-scala.collection.Set expectedGroups = set(Arrays.asList(GROUP, 
simpleGroup));
+
+scala.collection.Set expectedGroups = set(Arrays.asList(GROUP, 
simpleGroup, PROTOCOL_GROUP));
 final AtomicReference foundGroups = new 
AtomicReference<>();
+
 TestUtils.waitForCondition(() -> {
 foundGroups.set(service.listConsumerGroups().toSet());
 return Objects.equals(expectedGroups, foundGroups.get());
 }, "Expected --list to show groups " + expectedGroups + ", but found " 
+ foundGroups.get() + ".");
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testListWithUnrecognizedNewConsumerOption() {
+@Test
+public void testListWithUnrecognizedNewConsumerOption() throws Exception {
 String[] cgcArgs = new String[]{"--new-consumer", 
"--bootstrap-server", bootstrapServers(listenerName()), "--list"};
 assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testListConsumerGroupsWithStates() throws Exception {
+@ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+@MethodSource("getTestQuorumAndGroupProtocolParametersAll")
+public void testListConsumerGroupsWithStates(String quorum, String 
groupProtocol) throws Exception {

Review Comment:
   I think we talked about this before, I thought we didn't want to make 
changes to the states in this PR so I had left it so we can improve it in 
another small PR, but I can change it now, I might've missed communicating that
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16243: Make sure that we do not exceed max poll interval inside poll [kafka]

2024-02-16 Thread via GitHub


lianetm commented on PR #15372:
URL: https://github.com/apache/kafka/pull/15372#issuecomment-1948611367

   I like the last commit msg :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16009: Fix PlaintextConsumerTest. testMaxPollIntervalMsDelayInRevocation [kafka]

2024-02-16 Thread via GitHub


lucasbru commented on code in PR #15383:
URL: https://github.com/apache/kafka/pull/15383#discussion_r1492621621


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -419,6 +422,35 @@ public void testWakeupAfterNonEmptyFetch() {
 assertThrows(WakeupException.class, () -> 
consumer.poll(Duration.ZERO));
 }
 
+@Test
+public void testCommitInCommitCallback() {

Review Comment:
   oops



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16009: Fix PlaintextConsumerTest. testMaxPollIntervalMsDelayInRevocation [kafka]

2024-02-16 Thread via GitHub


lianetm commented on code in PR #15383:
URL: https://github.com/apache/kafka/pull/15383#discussion_r1492613876


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -419,6 +422,35 @@ public void testWakeupAfterNonEmptyFetch() {
 assertThrows(WakeupException.class, () -> 
consumer.poll(Duration.ZERO));
 }
 
+@Test
+public void testCommitInCommitCallback() {

Review Comment:
   We're resting here the commit on rebalance callback, so name maybe 
`testCommitInRebalanceCallback` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-02-16 Thread via GitHub


dajac commented on PR #15364:
URL: https://github.com/apache/kafka/pull/15364#issuecomment-1948535272

   @jeffkbkim Thanks for your comments. I have addressed them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-02-16 Thread via GitHub


dajac commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1492587334


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -251,195 +255,51 @@ private ConsumerGroupMember 
transitionToNewTargetAssignmentState() {
 }
 
 if (!newPartitionsPendingRevocation.isEmpty()) {
-// If the partition pending revocation set is not empty, we 
transition the
-// member to revoking and keep the current epoch. The transition 
to the new
-// state is done when the member is updated.
+// If there are partitions to be revoked, the member remains in 
its current
+// epoch and requests the revocation of those partitions. It 
transitions to
+// the UNREVOKED_PARTITIONS state to wait until the client 
acknowledges the
+// revocation of the partitions.
 return new ConsumerGroupMember.Builder(member)
+.setState(MemberState.UNREVOKED_PARTITIONS)
+.updateMemberEpoch(memberEpoch)
 .setAssignedPartitions(newAssignedPartitions)
-.setPartitionsPendingRevocation(newPartitionsPendingRevocation)
-.setPartitionsPendingAssignment(newPartitionsPendingAssignment)
-.setTargetMemberEpoch(targetAssignmentEpoch)
+.setRevokedPartitions(newPartitionsPendingRevocation)

Review Comment:
   Alright. Let me change it back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-02-16 Thread via GitHub


dajac commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1492585893


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -251,195 +255,51 @@ private ConsumerGroupMember 
transitionToNewTargetAssignmentState() {
 }
 
 if (!newPartitionsPendingRevocation.isEmpty()) {
-// If the partition pending revocation set is not empty, we 
transition the
-// member to revoking and keep the current epoch. The transition 
to the new
-// state is done when the member is updated.
+// If there are partitions to be revoked, the member remains in 
its current
+// epoch and requests the revocation of those partitions. It 
transitions to
+// the UNREVOKED_PARTITIONS state to wait until the client 
acknowledges the

Review Comment:
   Correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-02-16 Thread via GitHub


dajac commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1492583068


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -170,72 +127,119 @@ public CurrentAssignmentBuilder withOwnedTopicPartitions(
  * @return A new ConsumerGroupMember or the current one.
  */
 public ConsumerGroupMember build() {
-// A new target assignment has been installed, we need to restart
-// the reconciliation loop from the beginning.
-if (targetAssignmentEpoch != member.targetMemberEpoch()) {
-return transitionToNewTargetAssignmentState();
-}
-
 switch (member.state()) {
-// Check if the partitions have been revoked by the member.
-case REVOKING:
-return maybeTransitionFromRevokingToAssigningOrStable();
+case STABLE:
+// When the member is in the STABLE state, we verify if a newer
+// epoch (or target assignment) is available. If it is, we can
+// reconcile the member towards it. Otherwise, we return.
+if (member.memberEpoch() != targetAssignmentEpoch) {
+return computeNextAssignment(
+member.memberEpoch(),
+member.assignedPartitions()
+);
+} else {
+return member;
+}
 
-// Check if pending partitions have been freed up.
-case ASSIGNING:
-return maybeTransitionFromAssigningToAssigningOrStable();
+case UNREVOKED_PARTITIONS:
+// When the member is in the UNREVOKED_PARTITIONS state, we 
wait
+// until the member has revoked the necessary partitions. They 
are
+// considered revoked when they are not anymore reported in the
+// owned partitions set in the ConsumerGroupHeartbeat API.
 
-// Nothing to do.
-case STABLE:
-return member;
+// If the member does not provide its owned partitions. We 
cannot
+// progress.
+if (ownedTopicPartitions == null) {
+return member;
+}
+
+// If the member provides its owned partitions. We verify if 
it still
+// owens any of the revoked partitions. If it does, we cannot 
progress.
+for (ConsumerGroupHeartbeatRequestData.TopicPartitions 
topicPartitions : ownedTopicPartitions) {
+for (Integer partitionId : topicPartitions.partitions()) {
+boolean stillHasRevokedPartition = member
+.revokedPartitions()
+.getOrDefault(topicPartitions.topicId(), 
Collections.emptySet())
+.contains(partitionId);
+if (stillHasRevokedPartition) {
+return member;
+}
+}
+}
+
+// When the member has revoked all the pending partitions, it 
can
+// transition to the next epoch (current + 1) and we can 
reconcile
+// its state towards the latest target assignment.
+return computeNextAssignment(
+member.memberEpoch() + 1,

Review Comment:
   Yes, it does in the code you extracted. Let's take an example:
   1) Member A has partitions 1 and 2 in epoch 10.
   2) Target assignment changes the assignment of A to 2 in epoch 11.
   3) Member A enters the UNREVOKED_PARTITIONS state to revoke 1 and stays in 
10.
   4) Target assignment changes the assignment of A to 3 in epoch 12 (based an 
HB from another member).
   5) A comes back to acknowledge that 1 is gone. Now A can transitions to 
epoch 11 because it has revoked the partitions presenting him to do so earlier. 
However, he cannot transition to epoch 12 yet because it has to revoke 2 first.
   
   If we don't do this, A would remain in epoch 10 while revoking 2 and the 
rebalance timeout would not be reset.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-02-16 Thread via GitHub


dajac commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1492574501


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -779,7 +779,7 @@ private void maybeUpdateGroupState() {
 newState = ASSIGNING;
 } else {
 for (ConsumerGroupMember member : members.values()) {
-if (member.targetMemberEpoch() != targetAssignmentEpoch.get() 
|| member.state() != ConsumerGroupMember.MemberState.STABLE) {
+if (!member.isReconciledTo(targetAssignmentEpoch.get())) {

Review Comment:
   The member epoch has not changed. A member is fully reconciled when it is 
Stable in the latest epoch (targetAssignmentEpoch). The previous condition was 
a bit weird, I agree.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##
@@ -369,20 +329,12 @@ public String toString() {
 /**
  * The partitions being revoked by this member.
  */
-private final Map> partitionsPendingRevocation;
-
-/**
- * The partitions waiting to be assigned to this
- * member. They will be assigned when they are
- * released by their previous owners.
- */
-private final Map> partitionsPendingAssignment;
+private final Map> revokedPartitions;

Review Comment:
   Nope.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16009) Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation

2024-02-16 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-16009:
--

Assignee: Lucas Brutschy  (was: Kirk True)

> Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation
> 
>
> Key: KAFKA-16009
> URL: https://issues.apache.org/jira/browse/KAFKA-16009
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Lucas Brutschy
>Priority: Critical
>  Labels: consumer-threading-refactor, integration-tests, timeout
> Fix For: 3.8.0
>
>
> The integration test 
> {{PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation}} is failing 
> when using the {{AsyncKafkaConsumer}}.
> The error is:
> {code}
> org.opentest4j.AssertionFailedError: Timed out before expected rebalance 
> completed
>   at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
>   at org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
>   at 
> kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317)
>   at 
> kafka.api.PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation(PlaintextConsumerTest.scala:235)
> {code}
> The logs include this line:
>  
> {code}
> [2023-12-13 15:20:27,824] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> {code} 
> I don't know if that's related or not.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-02-16 Thread via GitHub


dajac commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1492567552


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -346,10 +346,9 @@ public static Record newCurrentAssignmentRecord(
 new ConsumerGroupCurrentMemberAssignmentValue()
 .setMemberEpoch(member.memberEpoch())
 .setPreviousMemberEpoch(member.previousMemberEpoch())
-.setTargetMemberEpoch(member.targetMemberEpoch())
+.setState(member.state().value())

Review Comment:
   It was mainly used to determine whether a new assignment was required if its 
target epoch was stale. The state with the previous schema was basically 
determined by the target member epoch, the partitions pending revocation and 
the partitions pending assignment. It was a bit weakly defined. Having a proper 
state seems much better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-02-16 Thread via GitHub


dajac commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1492563558


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1211,13 +1192,99 @@ private 
CoordinatorResult consumerGr
 // 1. The member reported its owned partitions;
 // 2. The member just joined or rejoined to group (epoch equals to 
zero);
 // 3. The member's assignment has been updated.
-if (ownedTopicPartitions != null || memberEpoch == 0 || 
assignmentUpdated) {
+if (ownedTopicPartitions != null || memberEpoch == 0 || 
hasAssignedPartitionsChanged(member, updatedMember)) {
 response.setAssignment(createResponseAssignment(updatedMember));
 }
 
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Reconciles the current assignment of the member if needed.
+ *
+ * @param groupId   The group id.
+ * @param memberThe member to reconcile.
+ * @param currentPartitionEpoch The function returning the current epoch of
+ *  a given partition.
+ * @param targetAssignmentEpoch The target assignment epoch.
+ * @param targetAssignment  The target assignment.
+ * @param ownedTopicPartitions  The list of partitions owned by the 
member. This
+ *  is reported in the ConsumerGroupHeartbeat 
API and
+ *  it could be null if not provided.
+ * @param records   The list to accumulate any new records.
+ * @return The received member if no changes have been made; or a new
+ * member containing the new assignment.
+ */
+private ConsumerGroupMember maybeReconcile(
+String groupId,
+ConsumerGroupMember member,
+BiFunction currentPartitionEpoch,
+int targetAssignmentEpoch,
+Assignment targetAssignment,
+List 
ownedTopicPartitions,
+List records
+) {
+if (member.isReconciledTo(targetAssignmentEpoch)) {
+return member;
+}
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(targetAssignmentEpoch, targetAssignment)
+.withCurrentPartitionEpoch(currentPartitionEpoch)
+.withOwnedTopicPartitions(ownedTopicPartitions)
+.build();
+
+if (!updatedMember.equals(member)) {
+records.add(newCurrentAssignmentRecord(groupId, updatedMember));
+
+log.info("[GroupId {}] Member {} new assignment state: epoch={}, 
previousEpoch={}, state={}, "
+ + "assignedPartitions={} and revokedPartitions={}.",
+groupId, updatedMember.memberId(), 
updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), 
updatedMember.state(),
+formatAssignment(updatedMember.assignedPartitions()), 
formatAssignment(updatedMember.revokedPartitions()));
+
+if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) {
+scheduleConsumerGroupRebalanceTimeout(
+groupId,
+updatedMember.memberId(),
+updatedMember.memberEpoch(),
+updatedMember.rebalanceTimeoutMs()
+);
+} else {

Review Comment:
   The rebalance is not complete here because the member has to reach Stable 
for this. However, we only have a timeout on the revocation part. We basically 
want to member to not block revoked partitions forever.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-02-16 Thread via GitHub


dajac commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1492559984


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1211,13 +1192,99 @@ private 
CoordinatorResult consumerGr
 // 1. The member reported its owned partitions;
 // 2. The member just joined or rejoined to group (epoch equals to 
zero);
 // 3. The member's assignment has been updated.
-if (ownedTopicPartitions != null || memberEpoch == 0 || 
assignmentUpdated) {
+if (ownedTopicPartitions != null || memberEpoch == 0 || 
hasAssignedPartitionsChanged(member, updatedMember)) {
 response.setAssignment(createResponseAssignment(updatedMember));
 }
 
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Reconciles the current assignment of the member if needed.
+ *
+ * @param groupId   The group id.
+ * @param memberThe member to reconcile.
+ * @param currentPartitionEpoch The function returning the current epoch of
+ *  a given partition.
+ * @param targetAssignmentEpoch The target assignment epoch.
+ * @param targetAssignment  The target assignment.
+ * @param ownedTopicPartitions  The list of partitions owned by the 
member. This
+ *  is reported in the ConsumerGroupHeartbeat 
API and
+ *  it could be null if not provided.
+ * @param records   The list to accumulate any new records.
+ * @return The received member if no changes have been made; or a new
+ * member containing the new assignment.
+ */
+private ConsumerGroupMember maybeReconcile(
+String groupId,
+ConsumerGroupMember member,
+BiFunction currentPartitionEpoch,
+int targetAssignmentEpoch,
+Assignment targetAssignment,
+List 
ownedTopicPartitions,
+List records
+) {
+if (member.isReconciledTo(targetAssignmentEpoch)) {
+return member;
+}
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(targetAssignmentEpoch, targetAssignment)
+.withCurrentPartitionEpoch(currentPartitionEpoch)
+.withOwnedTopicPartitions(ownedTopicPartitions)
+.build();
+
+if (!updatedMember.equals(member)) {
+records.add(newCurrentAssignmentRecord(groupId, updatedMember));
+
+log.info("[GroupId {}] Member {} new assignment state: epoch={}, 
previousEpoch={}, state={}, "
+ + "assignedPartitions={} and revokedPartitions={}.",
+groupId, updatedMember.memberId(), 
updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), 
updatedMember.state(),
+formatAssignment(updatedMember.assignedPartitions()), 
formatAssignment(updatedMember.revokedPartitions()));
+
+if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) {
+scheduleConsumerGroupRebalanceTimeout(
+groupId,
+updatedMember.memberId(),
+updatedMember.memberEpoch(),
+updatedMember.rebalanceTimeoutMs()
+);
+} else {
+cancelConsumerGroupRebalanceTimeout(groupId, 
updatedMember.memberId());
+}
+}
+
+return updatedMember;
+}
+
+private String formatAssignment(

Review Comment:
   Would replacing `-` by `:` help? Or do you have another option in mind?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-02-16 Thread via GitHub


dajac commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1492557819


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1211,13 +1192,99 @@ private 
CoordinatorResult consumerGr
 // 1. The member reported its owned partitions;
 // 2. The member just joined or rejoined to group (epoch equals to 
zero);
 // 3. The member's assignment has been updated.
-if (ownedTopicPartitions != null || memberEpoch == 0 || 
assignmentUpdated) {
+if (ownedTopicPartitions != null || memberEpoch == 0 || 
hasAssignedPartitionsChanged(member, updatedMember)) {
 response.setAssignment(createResponseAssignment(updatedMember));
 }
 
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Reconciles the current assignment of the member if needed.
+ *
+ * @param groupId   The group id.
+ * @param memberThe member to reconcile.
+ * @param currentPartitionEpoch The function returning the current epoch of
+ *  a given partition.
+ * @param targetAssignmentEpoch The target assignment epoch.
+ * @param targetAssignment  The target assignment.
+ * @param ownedTopicPartitions  The list of partitions owned by the 
member. This
+ *  is reported in the ConsumerGroupHeartbeat 
API and
+ *  it could be null if not provided.
+ * @param records   The list to accumulate any new records.
+ * @return The received member if no changes have been made; or a new
+ * member containing the new assignment.
+ */
+private ConsumerGroupMember maybeReconcile(
+String groupId,
+ConsumerGroupMember member,
+BiFunction currentPartitionEpoch,
+int targetAssignmentEpoch,
+Assignment targetAssignment,
+List 
ownedTopicPartitions,
+List records
+) {
+if (member.isReconciledTo(targetAssignmentEpoch)) {
+return member;
+}
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(targetAssignmentEpoch, targetAssignment)
+.withCurrentPartitionEpoch(currentPartitionEpoch)
+.withOwnedTopicPartitions(ownedTopicPartitions)
+.build();
+
+if (!updatedMember.equals(member)) {

Review Comment:
   The reference check only works if the `CurrentAssignmentBuilder` guarantees 
that a new instance is returned when there is a change. At the moment, the 
contract is weak here so it may be better to use `equals` here. An alternative 
would be to delegate this decision to `CurrentAssignmentBuilder`. For instance, 
the builder could return an `Optional`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-02-16 Thread via GitHub


dajac commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1492554402


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1211,13 +1192,99 @@ private 
CoordinatorResult consumerGr
 // 1. The member reported its owned partitions;
 // 2. The member just joined or rejoined to group (epoch equals to 
zero);
 // 3. The member's assignment has been updated.
-if (ownedTopicPartitions != null || memberEpoch == 0 || 
assignmentUpdated) {
+if (ownedTopicPartitions != null || memberEpoch == 0 || 
hasAssignedPartitionsChanged(member, updatedMember)) {

Review Comment:
   My understanding is that `equals` starts by verifying the reference. Hence, 
we really compare the two maps if the assignment of the new member has been 
updated. This should not be an issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16009: Fix PlaintextConsumerTest. testMaxPollIntervalMsDelayInRevocation [kafka]

2024-02-16 Thread via GitHub


lucasbru commented on code in PR #15383:
URL: https://github.com/apache/kafka/pull/15383#discussion_r1492547506


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -384,7 +387,7 @@ public void testWakeupAfterEmptyFetch() {
 doAnswer(invocation -> {
 consumer.wakeup();
 return Fetch.empty();
-}).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+}).doAnswer(invocation -> 
Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));

Review Comment:
   It seems that this was just a bit of a lazy test setup. We only want to wake 
up the consumer once for this test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16009: Fix PlaintextConsumerTest. testMaxPollIntervalMsDelayInRevocation [kafka]

2024-02-16 Thread via GitHub


lucasbru commented on PR #15383:
URL: https://github.com/apache/kafka/pull/15383#issuecomment-1948483411

   @cadonna Could you please have a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16009: Fix PlaintextConsumerTest. testMaxPollIntervalMsDelayInRevocation [kafka]

2024-02-16 Thread via GitHub


lucasbru opened a new pull request, #15383:
URL: https://github.com/apache/kafka/pull/15383

   The wake-up mechanism in the new consumer is preventing from committing 
within a rebalance listener callback. The reason is that we are trying to 
register two wake-uppable actions at the same time. 
   
   The fix is to register the wake-uppable action more closely to where we are 
in fact blocking on it, so that the action is not registered when we execute 
rebalance listeneners and callback listeners.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]

2024-02-16 Thread via GitHub


Phuc-Hong-Tran commented on code in PR #15188:
URL: https://github.com/apache/kafka/pull/15188#discussion_r1492453933


##
clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json:
##
@@ -35,6 +35,8 @@
   "about": "-1 if it didn't change since the last heartbeat; the maximum 
time in milliseconds that the coordinator will wait on the member to revoke its 
partitions otherwise." },
 { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", 
"nullableVersions": "0+", "default": "null", "entityType": "topicName",
   "about": "null if it didn't change since the last heartbeat; the 
subscribed topic names otherwise." },
+{ "name": "SubscribedTopicRegex", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+  "about": "null if it didn't change since the last heartbeat; the 
subscribed topic regex otherwise" },

Review Comment:
   I understand



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]

2024-02-16 Thread via GitHub


dajac commented on PR #15188:
URL: https://github.com/apache/kafka/pull/15188#issuecomment-1948347606

   I was wondering whether we should introduce a new error code to signal to 
the user that the regular expression is invalid. Otherwise, we would have to 
use the invalid request exception and it does not seem user friendly. @cadonna 
@lianetm What do you think about this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]

2024-02-16 Thread via GitHub


dajac commented on code in PR #15188:
URL: https://github.com/apache/kafka/pull/15188#discussion_r1492422212


##
clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json:
##
@@ -35,6 +35,8 @@
   "about": "-1 if it didn't change since the last heartbeat; the maximum 
time in milliseconds that the coordinator will wait on the member to revoke its 
partitions otherwise." },
 { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", 
"nullableVersions": "0+", "default": "null", "entityType": "topicName",
   "about": "null if it didn't change since the last heartbeat; the 
subscribed topic names otherwise." },
+{ "name": "SubscribedTopicRegex", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+  "about": "null if it didn't change since the last heartbeat; the 
subscribed topic regex otherwise" },

Review Comment:
   This is incorrect. We cannot add a field to a released version. I would 
suggest to bump the version of the RPC and use the correct version here. We 
should also mark the last version as unstable with `latestVersionUnstable` set 
to `true`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]

2024-02-16 Thread via GitHub


dajac commented on PR #15188:
URL: https://github.com/apache/kafka/pull/15188#issuecomment-1948333168

   > > @cadonna @lianetm, since we're supporting for both subscribe method 
using java.util.regex.Pattern and SubscriptionPattern, I think we should throw 
a illegal heartbeat exeption when user try to use both method at the same time 
and inform the user to use once at a time, since the field SubscribedRegex is 
used for java.util.regex.Pattern as well as SubscriptionPattern. What do you 
guys think?
   > 
   > IMO, we must support the deprecated pattern subscriptions with 
`java.util.regex.Pattern` to ensure backwards compatibility, but we do not need 
to support mixed usage of `java.util.regex.Pattern` and Google regex patterns. 
I think this is a blind spot in the KIP. I propose to throw an 
`IllegalStateException` if `subscribe(java.util.regex.Pattern)` is called after 
`subscribe(SubscriptionPattern)` (and vice versa) without calling 
`unsubscribe()` in between. That is similar to the restrictions between 
pattern, topic, and partition subscriptions @lianetm linked above. I do not 
think it is worth to consider the edge case of mixed usage of the two pattern 
types. Does this make sense to you? \cc @dajac What do you as the original 
author of the KIP think? Should we update the KIP to make this clear?
   
   @cadonna I would rather follow what we already do with `subscribe` today. 
The last one called takes precedence.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-6579) Consolidate window store and session store unit tests into a single class

2024-02-16 Thread Ahmed Sobeh (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ahmed Sobeh reassigned KAFKA-6579:
--

Assignee: (was: Ahmed Sobeh)

> Consolidate window store and session store unit tests into a single class
> -
>
> Key: KAFKA-6579
> URL: https://issues.apache.org/jira/browse/KAFKA-6579
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie, unit-test
>
> For key value store, we have a {{AbstractKeyValueStoreTest}} that is shared 
> among all its implementations; however for window and session stores, each 
> class has its own independent unit test classes that do not share the test 
> coverage. In fact, many of these test classes share the same unit test 
> functions (e.g. {{RocksDBWindowStoreTest}}, 
> {{CompositeReadOnlyWindowStoreTest}} and {{CachingWindowStoreTest}}).
> It is better to use the same pattern as for key value stores to consolidate 
> these test functions into a shared base class.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15770: IQv2 must return immutable position [kafka]

2024-02-16 Thread via GitHub


lucasbru commented on code in PR #15219:
URL: https://github.com/apache/kafka/pull/15219#discussion_r1492366369


##
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java:
##
@@ -193,17 +194,22 @@ public void put(final Bytes rawBaseKey,
 expiredRecordSensor.record(1.0d, context.currentSystemTimeMs());
 LOG.warn("Skipping record for expired segment.");
 } else {
-StoreQueryUtils.updatePosition(position, stateStoreContext);
-
-// Put to index first so that if put to base failed, when we 
iterate index, we will
-// find no base value. If put to base first but putting to index 
fails, when we iterate
-// index, we can't find the key but if we iterate over base store, 
we can find the key
-// which lead to inconsistency.
-if (hasIndex()) {
-final KeyValue indexKeyValue = 
getIndexKeyValue(rawBaseKey, value);
-segment.put(indexKeyValue.key, indexKeyValue.value);
+position.lock();

Review Comment:
   Have you considered doing the same without `SynchronizedPosition` and 
instead just using the object monitor of position
   
   ```
   synchronized(position) {
   
   
   }
   ```
   ?



##
streams/src/test/java/org/apache/kafka/streams/state/internals/StoreQueryUtilsTest.java:
##
@@ -70,7 +71,7 @@ public void shouldReturnErrorOnBoundViolation() {
 PositionBound.at(Position.emptyPosition().withComponent("topic", 
0, 1)),
 new QueryConfig(false),
 store,
-Position.emptyPosition().withComponent("topic", 0, 0),
+(SynchronizedPosition) 
Position.emptyPosition().withComponent("topic", 0, 0),

Review Comment:
   That cast won't work



##
streams/src/main/java/org/apache/kafka/streams/query/internals/SynchronizedPosition.java:
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query.internals;
+
+import org.apache.kafka.streams.query.Position;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class SynchronizedPosition extends Position {
+private final ReentrantLock lock = new ReentrantLock();
+
+public SynchronizedPosition(final ConcurrentHashMap> position) {
+super(position);
+}
+
+public static SynchronizedPosition emptyPosition() {
+return new SynchronizedPosition(new ConcurrentHashMap<>());
+}
+
+public static SynchronizedPosition fromMap(final Map> map) {
+return new SynchronizedPosition(deepCopy(map));
+}
+
+public static ConcurrentHashMap> 
deepCopy(

Review Comment:
   Why did you move this static method? Since it's also accessed by `Position`, 
I'd rather leave it there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Improve ListConsumerGroupTest.testListGroupCommand [kafka]

2024-02-16 Thread via GitHub


nizhikov commented on PR #15382:
URL: https://github.com/apache/kafka/pull/15382#issuecomment-1948181817

   Hello, @dajac I will take a look, shortly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Added ACLs authorizer change during migration [kafka]

2024-02-16 Thread via GitHub


showuon merged PR #15333:
URL: https://github.com/apache/kafka/pull/15333


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15670: add "inter.broker.listener.name" config in KRaft controller config [kafka]

2024-02-16 Thread via GitHub


showuon merged PR #14631:
URL: https://github.com/apache/kafka/pull/14631


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-02-16 Thread via GitHub


dajac commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1492178089


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java:
##
@@ -20,78 +20,225 @@
 import kafka.admin.ConsumerGroupCommand;
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
 import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.GroupType;
 import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static 
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class ListConsumerGroupTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testListConsumerGroups(String quorum) throws Exception {
+@ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+@MethodSource("getTestQuorumAndGroupProtocolParametersAll")
+public void testListConsumerGroupsWithoutFilters(String quorum, String 
groupProtocol) throws Exception {
 String simpleGroup = "simple-group";
+
+createOffsetsTopic(listenerName(), new Properties());
+
 addSimpleGroupExecutor(simpleGroup);
 addConsumerGroupExecutor(1);
+addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol);
 
 String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list"};
 ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-scala.collection.Set expectedGroups = set(Arrays.asList(GROUP, 
simpleGroup));
+
+scala.collection.Set expectedGroups = set(Arrays.asList(GROUP, 
simpleGroup, PROTOCOL_GROUP));
 final AtomicReference foundGroups = new 
AtomicReference<>();
+
 TestUtils.waitForCondition(() -> {
 foundGroups.set(service.listConsumerGroups().toSet());
 return Objects.equals(expectedGroups, foundGroups.get());
 }, "Expected --list to show groups " + expectedGroups + ", but found " 
+ foundGroups.get() + ".");
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testListWithUnrecognizedNewConsumerOption() {
+@Test
+public void testListWithUnrecognizedNewConsumerOption() throws Exception {
 String[] cgcArgs = new String[]{"--new-consumer", 
"--bootstrap-server", bootstrapServers(listenerName()), "--list"};
 assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testListConsumerGroupsWithStates() throws Exception {
+@ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+@MethodSource("getTestQuorumAndGroupProtocolParametersAll")
+public void testListConsumerGroupsWithStates(String quorum, String 
groupProtocol) throws Exception {

Review Comment:
   `groupProtocol` is never used in this test and we run it with all consumer 
types. I suppose that we either want to use `groupProtocol` or only run it with 
the classic type. If we do the latter, do we have a test listing new consumer 
groups too?



##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java:
##
@@ -143,22 +312,134 @@ public void testListGroupCommand(String quorum) throws 
Exception {
 return out.get().contains("STATE") && 
out.get().contains(simpleGroup) && out.get().contains(GROUP);
 }, "Expected to find " + simpleGroup + ", " + GROUP + " and the 
header, but found " + out.get());
 
-String[] cgcArgs3 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "Stable"};
+String[] cgcArgs3 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--type"};
 TestUtils.waitForCondition(() -> {
 out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
 ConsumerGroupCommand.main(cgcArgs3);
 return null;
 }));
+return out.get().contains("TYPE") && !out.get().contains("STATE") 
&& out.get().contains(simpleGroup) && out.get().contains(GROUP);
+}, "Expected to find " + simpleGroup + ", " + GROUP + " and the 
header, but found " + out.get());
+
+String[] 

Re: [PR] KAFKA-16230: Update verifiable_consumer.py to support KIP-848’s group protocol config [kafka]

2024-02-16 Thread via GitHub


lucasbru merged PR #15328:
URL: https://github.com/apache/kafka/pull/15328


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Improve ListConsumerGroupTest.testListGroupCommand [kafka]

2024-02-16 Thread via GitHub


dajac opened a new pull request, #15382:
URL: https://github.com/apache/kafka/pull/15382

   While reviewing https://github.com/apache/kafka/pull/15150, I found that our 
tests verifying the console output are really hard to read. Here is my proposal 
to make it better.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Improve ListConsumerGroupTest.testListGroupCommand [kafka]

2024-02-16 Thread via GitHub


dajac commented on PR #15382:
URL: https://github.com/apache/kafka/pull/15382#issuecomment-1948168236

   @mimaison @nizhikov @rreddy-22 @jolshan Could you take a look when you get a 
chance?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-16 Thread via GitHub


msn-tldr commented on PR #15323:
URL: https://github.com/apache/kafka/pull/15323#issuecomment-1948142511

   @hachikuji thanks for merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16165: Fix invalid transition on poll timer expiration [kafka]

2024-02-16 Thread via GitHub


lucasbru merged PR #15375:
URL: https://github.com/apache/kafka/pull/15375


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16243: Make sure that we do not exceed max poll interval inside poll [kafka]

2024-02-16 Thread via GitHub


lucasbru commented on code in PR #15372:
URL: https://github.com/apache/kafka/pull/15372#discussion_r1492266247


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -265,6 +265,32 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 ensureNoRebalance(consumer, listener)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testMaxPollIntervalMsShorterThanPollTimeout(quorum: String, 
groupProtocol: String): Unit = {
+
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
1000.toString)
+
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
500.toString)
+
+val consumer = createConsumer()
+val listener = new TestConsumerReassignmentListener
+consumer.subscribe(List(topic).asJava, listener)
+
+// rebalance to get the initial assignment
+awaitRebalance(consumer, listener)
+
+val initialAssignedCalls = listener.callsToAssigned
+
+consumer.poll(Duration.ofMillis(2000));

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16264) Expose `producer.id.expiration.check.interval.ms` as dynamic broker configuration

2024-02-16 Thread Jorge Esteban Quilcate Otoya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817867#comment-17817867
 ] 

Jorge Esteban Quilcate Otoya commented on KAFKA-16264:
--

cc [~jolshan] – as related to https://issues.apache.org/jira/browse/KAFKA-16229

> Expose `producer.id.expiration.check.interval.ms` as dynamic broker 
> configuration
> -
>
> Key: KAFKA-16264
> URL: https://issues.apache.org/jira/browse/KAFKA-16264
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>
> Dealing with a scenario where too many producer ids lead to issues (e.g. high 
> cpu utilization, see KAFKA-16229) put operators in need to flush producer ids 
> more promptly than usual.
> Currently, only the expiration timeout `producer.id.expiration.ms` is exposed 
> as dynamic config. This is helpful (e.g. by reducing the timeout, less 
> producer would eventually be kept in memory), but not enough if the 
> evaluation frequency is not sufficiently short to flush producer ids before 
> becoming an issue. Only by tuning both, the issue could be workaround.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16264) Expose `producer.id.expiration.check.interval.ms` as dynamic broker configuration

2024-02-16 Thread Jorge Esteban Quilcate Otoya (Jira)
Jorge Esteban Quilcate Otoya created KAFKA-16264:


 Summary: Expose `producer.id.expiration.check.interval.ms` as 
dynamic broker configuration
 Key: KAFKA-16264
 URL: https://issues.apache.org/jira/browse/KAFKA-16264
 Project: Kafka
  Issue Type: Improvement
Reporter: Jorge Esteban Quilcate Otoya
Assignee: Jorge Esteban Quilcate Otoya


Dealing with a scenario where too many producer ids lead to issues (e.g. high 
cpu utilization, see KAFKA-16229) put operators in need to flush producer ids 
more promptly than usual.

Currently, only the expiration timeout `producer.id.expiration.ms` is exposed 
as dynamic config. This is helpful (e.g. by reducing the timeout, less producer 
would eventually be kept in memory), but not enough if the evaluation frequency 
is not sufficiently short to flush producer ids before becoming an issue. Only 
by tuning both, the issue could be workaround.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-02-16 Thread via GitHub


VictorvandenHoven commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1492090658


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -609,17 +842,18 @@ public void testOrdering() {
 inputTopic1.pipeInput(1, "A1", 100L);
 processor.checkAndClearProcessResult();
 
-// push one item to the other window that has a join; this should 
produce non-joined records with a closed window first, then
-// the joined records
+// push one item to the other window that has a join; 
+// this should produce the joined record first;
+// then non-joined record with a closed window
 // by the time they were produced before
 // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // w2 = { }
 // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // --> w2 = { 1:a1 (ts: 110) }
 inputTopic2.pipeInput(1, "a1", 110L);
 processor.checkAndClearProcessResult(
-new KeyValueTimestamp<>(0, "A0+null", 0L),
-new KeyValueTimestamp<>(1, "A1+a1", 110L)
+new KeyValueTimestamp<>(1, "A1+a1", 110L),
+new KeyValueTimestamp<>(0, "A0+null", 0L)

Review Comment:
   When we change the order in which we check back to as it was, only the 
ordering tests will fail. So there seems to be no bug (anymore).
   I think that in a previous version of this PR (before checking both the left 
and right in the outerjoin) we saw that it did matter and a left-join test 
failed. But this has been solved now with the getOuterJoinLookBackTimeMs().
   I think option 2 and option 3 will do fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16167: re-enable PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup [kafka]

2024-02-16 Thread via GitHub


lucasbru merged PR #15358:
URL: https://github.com/apache/kafka/pull/15358


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org