[jira] [Updated] (KAFKA-15282) Implement client support for KIP-848 client-side assignors

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15282:
--
Labels: kip-848-client-support  (was: kip-848 kip-848-client-support)

> Implement client support for KIP-848 client-side assignors
> --
>
> Key: KAFKA-15282
> URL: https://issues.apache.org/jira/browse/KAFKA-15282
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 4.0.0
>
>
> The client-side assignor provides the logic for the partition assignments 
> instead of on the server. Client-side assignment is the main approach used by 
> the “old protocol” for divvying up partitions. While the “new protocol” 
> favors server-side assignors, the client-side assignor will continue to be 
> used for backward compatibility, including KSQL, Connect, etc.
> Note: I _*think*_ that the client-side assignor logic and the reconciliation 
> logic can remain separate from each other. We should strive to keep the two 
> pieces unencumbered, unless it’s unavoidable.
> This task includes:
>  * Validate the client’s configuration for assignor selection
>  * Integrate with the new {{PartitionAssignor}} interface to invoke the logic 
> from the user-provided assignor implementation
>  * Implement the necessary logic around the request/response from the 
> {{ConsumerGroupPrepareAssignment}} RPC call using the information from the 
> {{PartitionAssignor}} above
>  * Implement the necessary logic around the request/response from the 
> {{ConsumerGroupInstallAssignment}} RPC call, again using the information 
> calculated by the {{PartitionAssignor}}
> This task is part of the work to implement support for the new KIP-848 
> consumer group protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16004) Review new consumer inflight offset commit logic

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16004:
--
Labels: consumer-threading-refactor kip-848-client-support  (was: 
consumer-threading-refactor kip-848 kip-848-client-support)

> Review new consumer inflight offset commit logic
> 
>
> Key: KAFKA-16004
> URL: https://issues.apache.org/jira/browse/KAFKA-16004
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>
> New consumer logic for committing offsets handles inflight requests, to 
> validate that no commit requests are sent if a previous one hasn't received a 
> response. Review how that logic is currently applied to both, sync and async 
> commits and validate against the legacy coordinator, who seems to apply it 
> only for async commits. Review considering behaviour for auto-commit 
> (considering the expected behaviour for auto-commit on the interval, 
> auto-commit on reconciliation, auto-commit on close) 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15284) Implement ConsumerGroupProtocolVersionResolver to determine consumer group protocol

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15284:
--
Labels: kip-848-client-support  (was: consumer-threading-refactor 
kip-848-client-support)

> Implement ConsumerGroupProtocolVersionResolver to determine consumer group 
> protocol
> ---
>
> Key: KAFKA-15284
> URL: https://issues.apache.org/jira/browse/KAFKA-15284
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 4.0.0
>
>
> At client initialization, we need to determine which of the 
> {{ConsumerDelegate}} implementations to use:
>  # {{LegacyKafkaConsumerDelegate}}
>  # {{AsyncKafkaConsumerDelegate}}
> There are conditions defined by KIP-848 that determine client eligibility to 
> use the new protocol. This will be modeled by the—deep 
> breath—{{{}ConsumerGroupProtocolVersionResolver{}}}.
> Known tasks:
>  * Determine at what point in the {{Consumer}} initialization the network 
> communication should happen
>  * Determine what RPCs to invoke in order to determine eligibility (API 
> versions, IBP version, etc.)
>  * Implement the network client lifecycle (startup, communication, shutdown, 
> etc.)
>  * Determine the fallback path in case the client is not eligible to use the 
> protocol



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15848) Consumer API timeout inconsistent between ConsumerDelegate implementations

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15848:
--
Labels: consumer-threading-refactor timeout  (was: 
consumer-threading-refactor kip-848-client-support timeout)

> Consumer API timeout inconsistent between ConsumerDelegate implementations
> --
>
> Key: KAFKA-15848
> URL: https://issues.apache.org/jira/browse/KAFKA-15848
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, timeout
> Fix For: 3.8.0
>
>
> The two {{ConsumerDelegate}} implementations ({{{}LegacyKafkaConsumer{}}} and 
> {{{}AsyncKafkaConsumer{}}}) have a fundamental difference related to their 
> use and interpretation of the {{Timer}} that is supplied.
> h3. tl;dr
> {{AsyncKafkaConsumer}} is very literal about the timeout, whereas 
> {{LegacyKafkaConsumer}} seems to give a little wiggle room.
> {{LegacyKafkaConsumer}} is structured so that the logic it uses can check for 
> success of its operations _before_ checking the timer:
>  # Submit operation asynchronously
>  # Wait for operation to complete using {{NetworkClient.poll()}}
>  # Check for result
>  ## If successful, return success
>  ## If fatal failure, return failure
>  # Check timer
>  ## If timer expired, return failure
> {{AsyncKafkaConsumer}} uses {{Future.get()}} to wait for its operations:
>  # Submit operation asynchronously
>  # Wait for operation to complete using {{Future.get()}}
>  ## If operation timed out, {{Future.get()}} will throw a timeout error
>  # Check for result
>  ## If successful, return success
>  ## Otherwise, return failure
> h3. How to reproduce
> This causes subtle timing issues, but they can be easily reproduced via any 
> of the {{KafkaConsumerTest}} unit tests that invoke the {{consumer.poll(0)}} 
> API. Here's a bit of code that illustrates the difference between the two 
> approaches.
> {{LegacyKafkaConsumer}} performs a lot of its network I/O operations in a 
> manner similar to this:
> {code:java}
> public int getCount(Timer timer) {
> do {
> final RequestFuture future = sendSomeRequest(partitions);
> client.poll(future, timer);
> if (future.isDone())
> return future.get();
> } while (timer.notExpired());
> return -1;
> }
> {code}
> {{AsyncKafkaConsumer}} has similar logic, but it is structured like this:
> {code:java}
> private int getCount(Timer timer) {
> try {
> CompletableFuture future = new CompleteableFuture<>();
> applicationEventQueue.add(future);
> return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
> } catch (TimeoutException e) {
> return -1;
> }
> }
> {code}
> The call to {{add}} enqueues the network operation, but it then _immediately_ 
> invokes {{Future.get()}} with the timeout to implement a time-bounded 
> blocking call. Since this method is being called with a timeout of 0, it 
> _immediately_ throws a {{{}TimeoutException{}}}. 
> h3. Suggested fix
> TBD :(



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15588) Purge the unsent offset commits/fetches when the member is fenced/failed

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15588:
--
Labels: kip-848-client-support  (was: kip-848 kip-848-client-support)

> Purge the unsent offset commits/fetches when the member is fenced/failed
> 
>
> Key: KAFKA-15588
> URL: https://issues.apache.org/jira/browse/KAFKA-15588
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> When the member is fenced/failed, we should purge the inflight offset commits 
> and fetches.  HeartbeatRequestManager should be able to handle this



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16011) Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16011:
--
Labels: consumer-threading-refactor kip-848-client-support  (was: 
consumer-threading-refactor kip-848 kip-848-client-support)

> Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose
> 
>
> Key: KAFKA-16011
> URL: https://issues.apache.org/jira/browse/KAFKA-16011
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>
> The integration test 
> {{PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose}} is failing 
> when using the {{AsyncKafkaConsumer}}.
> The error is:
> {code}
> org.opentest4j.AssertionFailedError: Did not get valid assignment for 
> partitions HashSet(topic1-2, topic1-4, topic-1, topic-0, topic1-5, topic1-1, 
> topic1-0, topic1-3). Instead, got ArrayBuffer(Set(topic1-0, topic-0, 
> topic-1), Set(), Set(topic1-1, topic1-5))
>   at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
>   at org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
>   at 
> kafka.api.AbstractConsumerTest.validateGroupAssignment(AbstractConsumerTest.scala:286)
>   at 
> kafka.api.PlaintextConsumerTest.runMultiConsumerSessionTimeoutTest(PlaintextConsumerTest.scala:1865)
>   at 
> kafka.api.PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose(PlaintextConsumerTest.scala:1277)
> {code}
> The logs include these lines:
>  
> {code}
> [2023-12-13 15:33:33,180] ERROR [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] GroupHeartbeatRequest failed due to error: INVALID_REQUEST 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:376)
> [2023-12-13 15:33:33,180] ERROR [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] Member JQ_e0S5FTzKnYyStB3aBrQ with epoch 0 transitioned to 
> FATAL state 
> (org.apache.kafka.clients.consumer.internals.MembershipManagerImpl:456)
> [2023-12-13 15:33:33,212] ERROR [daemon-consumer-assignment]: Error due to 
> (kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:139)
> org.apache.kafka.common.errors.InvalidRequestException: RebalanceTimeoutMs 
> must be provided in first request.
> [2023-12-13 15:33:39,196] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> [2023-12-13 15:33:39,200] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> {code} 
> I don't know if that's related or not.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15283) Client support for OffsetFetch and OffsetCommit with topic ID

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15283:
--
Labels: kip-848-client-support newbie  (was: kip-848 kip-848-client-support 
newbie)

> Client support for OffsetFetch and OffsetCommit with topic ID
> -
>
> Key: KAFKA-15283
> URL: https://issues.apache.org/jira/browse/KAFKA-15283
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Lianet Magrans
>Priority: Minor
>  Labels: kip-848-client-support, newbie
> Fix For: 3.8.0
>
>
> Currently, {{KafkaConsumer}} keeps track of topic IDs in the in-memory 
> {{ConsumerMetadata}} object, and they are provided to the {{FETCH}} and 
> {{METADATA}} RPC calls.
> With KIP-848 the OffsetFetch and OffsetCommit will start using topic IDs in 
> the same way, so the new client implementation will provide it when issuing 
> those requests. Topic names should continue to be supported as needed by the 
> {{{}AdminClient{}}}.
> We should also review/clean-up the support for topic names in requests such 
> as the {{METADATA}} request (currently supporting topic names as well as 
> topic IDs on the client side).
> Tasks include:
>  * Introduce Topic ID in existing OffsetFetch and OffsetCommit API that will 
> be upgraded on the server to support topic ID
>  * Check topic ID propagation internally in the client based on RPCs 
> including it.
>  * Review existing support for topic name for potential clean if not needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15846) Review consumer leave group request best effort and response handling

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15846:
--
Labels: kip-848-client-support  (was: kip-848 kip-848-client-support)

> Review consumer leave group request best effort and response handling
> -
>
> Key: KAFKA-15846
> URL: https://issues.apache.org/jira/browse/KAFKA-15846
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> New consumer sends out a leave group request with a best effort approach. 
> Transitions to LEAVING to indicate the HB manager that the request must be 
> sent, but it does not do any response handling or retrying (note that the 
> response is still handled as any other HB response). After the first HB 
> manager poll iteration while on LEAVING, the consumer transitions into 
> UNSUBSCRIBE (no matter if the request was actually sent out or not, ex, due 
> to coordinator not known). Review if this is good enough as an effort to send 
> the request, and consider effect of responses that may be received and 
> processed when there are no longer relevant



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15538) Client support for java regex based subscription

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15538:
--
Labels: kip-848-client-support newbie  (was: kip-848 kip-848-client-support 
newbie)

> Client support for java regex based subscription
> 
>
> Key: KAFKA-15538
> URL: https://issues.apache.org/jira/browse/KAFKA-15538
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: kip-848-client-support, newbie
> Fix For: 3.8.0
>
>
> When using subscribe with a java regex (Pattern), we need to resolve it on 
> the client side to send the broker a list of topic names to subscribe to.
> Context:
> The new consumer group protocol uses [Google 
> RE2/J|https://github.com/google/re2j] for regular expressions and introduces 
> new methods in the consumer API to subscribe using a `SubscribePattern`. The 
> subscribe using a java `Pattern` will be still supported for a while but 
> eventually removed.
>  * When the subscribe with SubscriptionPattern is used, the client should 
> just send the regex to the broker and it will be resolved on the server side.
>  * In the case of the subscribe with Pattern, the regex should be resolved on 
> the client side.
> As part of this task, we should re-enable all integration tests defined in 
> the PlainTextAsyncConsumer that relate to subscription with pattern and that 
> are currently disabled for the new consumer + new protocol



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15321) Document consumer group member state machine

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15321:
--
Labels: kip-848-client-support  (was: kip-848 kip-848-client-support)

> Document consumer group member state machine
> 
>
> Key: KAFKA-15321
> URL: https://issues.apache.org/jira/browse/KAFKA-15321
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer, documentation
>Reporter: Kirk True
>Assignee: Lianet Magrans
>Priority: Minor
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> We need to first document the new consumer group member state machine. What 
> are the different states and what are the transitions?
> See [~pnee]'s notes: 
> [https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design]
> *_Don’t forget to include diagrams for clarity!_*
> This should be documented on the AK wiki.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15839) Topic ID integration in consumer subscription state

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15839:
--
Labels: kip-848-client-support  (was: kip-848 kip-848-client-support)

> Topic ID integration in consumer subscription state
> ---
>
> Key: KAFKA-15839
> URL: https://issues.apache.org/jira/browse/KAFKA-15839
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> With the new consumer group protocol, assignments received by the consumer 
> contain topic IDs instead of topic names. Topic Ids are used in the 
> reconciliation path, integrated using TopicIdPartition. When reconciling, 
> topic names are resolved via a metadata update, but they are also kept in a 
> local #MembershipManager cache. This local cache serves the purpose of 
> keeping assigned topicId-names (that might have been deleted from metadata, 
> ex. topic deleted). 
> That's just an initial step towards spreading topic IDs internally in the 
> consumer code. Next step to address with this task would be to include topic 
> IDs in the subscription state, so that assigned topicId-names can be accessed 
> from other components without the need of resolving names multiple times.
> Note that this task aims only at spreading topic IDs internally in the 
> consumer, no changes to expose them at the API level. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15279) Implement client support for KIP-848 client-side assigner RPCs

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15279:
--
Labels: kip-848-client-support  (was: kip-848 kip-848-client-support)

> Implement client support for KIP-848 client-side assigner RPCs
> --
>
> Key: KAFKA-15279
> URL: https://issues.apache.org/jira/browse/KAFKA-15279
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 4.0.0
>
>
> The protocol introduces three new RPCs that the client uses to communicate 
> with the broker:
>  # 
> [ConsumerGroupHeartbeat|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupHeartbeatAPI]
>  # 
> [ConsumerGroupPrepareAssignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupPrepareAssignmentAPI]
>  # 
> [ConsumerGroupInstallAssignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupInstallAssignmentAPI]
> Support for ConsumerGroupHeartbeat is handled by KAFKA-15278. This task is to 
> implement the ConsumerGroupAssignmentRequestManager to handle the second and 
> third RPCs on the above list.
> This task is part of the work to implement support for the new KIP-848 
> consumer group protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15325) Integrate topicId in OffsetFetch and OffsetCommit async consumer calls

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15325:
--
Labels: kip-848-client-support  (was: kip-848 kip-848-client-support)

> Integrate topicId in OffsetFetch and OffsetCommit async consumer calls
> --
>
> Key: KAFKA-15325
> URL: https://issues.apache.org/jira/browse/KAFKA-15325
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> KIP-848 introduces support for topicIds in the OffsetFetch and OffsetCommit 
> APIs. The consumer calls to those APIs should be updated to include topicIds 
> when available.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15561) Client support for new SubscriptionPattern based subscription

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15561:
--
Labels: consumer-threading-refactor kip-848-client-support  (was: 
consumer-threading-refactor kip-848 kip-848-client-support)

> Client support for new SubscriptionPattern based subscription
> -
>
> Key: KAFKA-15561
> URL: https://issues.apache.org/jira/browse/KAFKA-15561
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>
> New consumer should support subscribe with the new SubscriptionPattern 
> introduced in the new consumer group protocol. When subscribing with this 
> regex, the client should provide the regex in the HB request on the 
> SubscribedTopicRegex field, delegating the resolution to the server.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16011) Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16011:
--
Labels: consumer-threading-refactor kip-848 kip-848-client-support  (was: 
consumer-threading-refactor kip-848)

> Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose
> 
>
> Key: KAFKA-16011
> URL: https://issues.apache.org/jira/browse/KAFKA-16011
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support
> Fix For: 3.8.0
>
>
> The integration test 
> {{PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose}} is failing 
> when using the {{AsyncKafkaConsumer}}.
> The error is:
> {code}
> org.opentest4j.AssertionFailedError: Did not get valid assignment for 
> partitions HashSet(topic1-2, topic1-4, topic-1, topic-0, topic1-5, topic1-1, 
> topic1-0, topic1-3). Instead, got ArrayBuffer(Set(topic1-0, topic-0, 
> topic-1), Set(), Set(topic1-1, topic1-5))
>   at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
>   at org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
>   at 
> kafka.api.AbstractConsumerTest.validateGroupAssignment(AbstractConsumerTest.scala:286)
>   at 
> kafka.api.PlaintextConsumerTest.runMultiConsumerSessionTimeoutTest(PlaintextConsumerTest.scala:1865)
>   at 
> kafka.api.PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose(PlaintextConsumerTest.scala:1277)
> {code}
> The logs include these lines:
>  
> {code}
> [2023-12-13 15:33:33,180] ERROR [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] GroupHeartbeatRequest failed due to error: INVALID_REQUEST 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:376)
> [2023-12-13 15:33:33,180] ERROR [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] Member JQ_e0S5FTzKnYyStB3aBrQ with epoch 0 transitioned to 
> FATAL state 
> (org.apache.kafka.clients.consumer.internals.MembershipManagerImpl:456)
> [2023-12-13 15:33:33,212] ERROR [daemon-consumer-assignment]: Error due to 
> (kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:139)
> org.apache.kafka.common.errors.InvalidRequestException: RebalanceTimeoutMs 
> must be provided in first request.
> [2023-12-13 15:33:39,196] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> [2023-12-13 15:33:39,200] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> {code} 
> I don't know if that's related or not.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16198) Reconciliation may lose partitions when topic metadata is delayed

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16198:
--
Labels: client-transitions-issues kip-848-client-support  (was: 
client-transitions-issues clients consumer kip-848 kip-848-client-support)

> Reconciliation may lose partitions when topic metadata is delayed
> -
>
> Key: KAFKA-16198
> URL: https://issues.apache.org/jira/browse/KAFKA-16198
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Critical
>  Labels: client-transitions-issues, kip-848-client-support
> Fix For: 3.8.0
>
>
> The current reconciliation code in `AsyncKafkaConsumer`s `MembershipManager` 
> may lose part of the server-provided assignment when metadata is delayed. The 
> reason is incorrect handling of partially resolved topic names, as in this 
> example:
>  * We get assigned {{T1-1}} and {{T2-1}}
>  * We reconcile {{{}T1-1{}}}, {{T2-1}} remains in {{assignmentUnresolved}} 
> since the topic id {{T2}} is not known yet
>  * We get new cluster metadata, which includes {{{}T2{}}}, so {{T2-1}} is 
> moved to {{assignmentReadyToReconcile}}
>  * We call {{reconcile}} -- {{T2-1}} is now treated as the full assignment, 
> so {{T1-1}} is being revoked
>  * We end up with assignment {{T2-1, which is inconsistent with the 
> broker-side target assignment.}}
>  
> Generally, this seems to be a problem around semantics of the internal 
> collections `assignmentUnresolved` and `assignmentReadyToReconcile`. Absence 
> of a topic in `assignmentReadyToReconcile` may mean either revocation of the 
> topic partition(s), or unavailability of a topic name for the topic.
> Internal state with simpler and correct invariants could be achieved by using 
> a single collection `currentTargetAssignment` which is based on topic IDs and 
> always corresponds to the latest assignment received from the broker. During 
> every attempted reconciliation, all topic IDs will be resolved from the local 
> cache, which should not introduce a lot of overhead. `assignmentUnresolved` 
> and `assignmentReadyToReconcile` are removed. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16165) Consumer invalid transition on expired poll interval

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16165:
--
Labels: client-transitions-issues kip-848-client-support  (was: 
client-transitions-issues)

> Consumer invalid transition on expired poll interval
> 
>
> Key: KAFKA-16165
> URL: https://issues.apache.org/jira/browse/KAFKA-16165
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: client-transitions-issues, kip-848-client-support
> Fix For: 3.8.0
>
>
> Running system tests with the new async consumer revealed an invalid 
> transition related to the consumer not being polled on the interval in some 
> kind of scenario (maybe relates to consumer close, as the transition is 
> leaving->stale)
> Log trace:
> [2024-01-17 19:45:07,379] WARN [Consumer 
> clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, 
> groupId=consumer-groups-test-2] consumer poll timeout has expired. This means 
> the time between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> [2024-01-17 19:45:07,379] ERROR [Consumer 
> clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, 
> groupId=consumer-groups-test-2] Unexpected error caught in consumer network 
> thread (org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread:91)
> java.lang.IllegalStateException: Invalid state transition from LEAVING to 
> STALE
>   at 
> org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionTo(MembershipManagerImpl.java:303)
>   at 
> org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionToStale(MembershipManagerImpl.java:739)
>   at 
> org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager.poll(HeartbeatRequestManager.java:194)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$runOnce$0(ConsumerNetworkThread.java:137)
>   at 
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
>   at 
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
>   at 
> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
>   at 
> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
>   at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>   at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>   at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
>   at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:657)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.runOnce(ConsumerNetworkThread.java:139)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:88)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15846) Review consumer leave group request best effort and response handling

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15846:
--
Fix Version/s: 3.8.0

> Review consumer leave group request best effort and response handling
> -
>
> Key: KAFKA-15846
> URL: https://issues.apache.org/jira/browse/KAFKA-15846
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> New consumer sends out a leave group request with a best effort approach. 
> Transitions to LEAVING to indicate the HB manager that the request must be 
> sent, but it does not do any response handling or retrying (note that the 
> response is still handled as any other HB response). After the first HB 
> manager poll iteration while on LEAVING, the consumer transitions into 
> UNSUBSCRIBE (no matter if the request was actually sent out or not, ex, due 
> to coordinator not known). Review if this is good enough as an effort to send 
> the request, and consider effect of responses that may be received and 
> processed when there are no longer relevant



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15540) Handle heartbeat and revocation when consumer leaves group

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15540:
--
Fix Version/s: 3.7.0

> Handle heartbeat and revocation when consumer leaves group
> --
>
> Key: KAFKA-15540
> URL: https://issues.apache.org/jira/browse/KAFKA-15540
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 3.7.0
>
>
> When a consumer intentionally leaves a group we should:
>  * release assignment (revoke partitions)
>  * send a last Heartbeat request with epoch -1 (or -2 if static member)
> Note that the revocation involves stop fetching, committing offsets if 
> auto-commit enabled and invoking the onPartitionsRevoked callback.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15631) Do not send new heartbeat request while another one in-flight

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15631:
--
Fix Version/s: 3.7.0

> Do not send new heartbeat request while another one in-flight
> -
>
> Key: KAFKA-15631
> URL: https://issues.apache.org/jira/browse/KAFKA-15631
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Philip Nee
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 3.7.0
>
>
> Client consumer should not send a new heartbeat request while there is a 
> previous in-flight. If a HB is in-flight, we should wait for a response or 
> timeout before sending a next one.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15694) New integration tests to have full coverage for preview

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15694:
--
Fix Version/s: 3.7.0

> New integration tests to have full coverage for preview
> ---
>
> Key: KAFKA-15694
> URL: https://issues.apache.org/jira/browse/KAFKA-15694
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: kip-848, kip-848-client-support, kip-848-preview
> Fix For: 3.7.0
>
>
> These are to fix bugs discovered during PR reviews but not tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15543) Send HB request right after reconciliation completes

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15543:
--
Fix Version/s: 3.7.0

> Send HB request right after reconciliation completes
> 
>
> Key: KAFKA-15543
> URL: https://issues.apache.org/jira/browse/KAFKA-15543
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 3.7.0
>
>
> HeartbeatRequest manager should send HB request outside of the interval, 
> right after the reconciliation process completes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15592) Member does not need to always try to join a group when a groupId is configured

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15592:
--
Fix Version/s: 3.7.0

> Member does not need to always try to join a group when a groupId is 
> configured
> ---
>
> Key: KAFKA-15592
> URL: https://issues.apache.org/jira/browse/KAFKA-15592
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 3.7.0
>
>
> Currently, instantiating a membershipManager means the member will always 
> seek to join a group unless it has failed fatally.  However, this is not 
> always the case because the member should be able to join and leave a group 
> any time during its life cycle. Maybe we should include an "inactive" state 
> in the state machine indicating the member does not want to be in a rebalance 
> group.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15840) Correct initialization of ConsumerGroupHeartbeat by client

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15840:
--
Fix Version/s: 3.7.0

> Correct initialization of ConsumerGroupHeartbeat by client
> --
>
> Key: KAFKA-15840
> URL: https://issues.apache.org/jira/browse/KAFKA-15840
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 3.7.0
>
>
> The new consumer using the KIP-848 protocol currently leaves the 
> TopicPartitions set to null for the ConsumerGroupHeartbeat request, even when 
> the MemberEpoch is zero. This violates the KIP which expects the list to be 
> empty (but not null).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15573) Implement auto-commit on partition assignment revocation

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15573:
--
Fix Version/s: 3.7.0

> Implement auto-commit on partition assignment revocation
> 
>
> Key: KAFKA-15573
> URL: https://issues.apache.org/jira/browse/KAFKA-15573
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 3.7.0
>
>
> When the group member's assignment changes and partitions are revoked and 
> auto-commit is enabled, we need to ensure that the commit request manager is 
> invoked to queue up the commits.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15539) Client should stop fetching while partitions being revoked

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15539:
--
Fix Version/s: 3.7.0

> Client should stop fetching while partitions being revoked
> --
>
> Key: KAFKA-15539
> URL: https://issues.apache.org/jira/browse/KAFKA-15539
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-preview
> Fix For: 3.7.0
>
>
> When partitions are being revoked (client received revocation on heartbeat 
> and is in the process of invoking the callback), we need to make sure we do 
> not fetch from those partitions anymore:
>  * no new fetches should be sent out for the partitions being revoked
>  * no fetch responses should be handled for those partitions (case where a 
> fetch was already in-flight when the partition revocation started.
> This does not seem to be handled in the current KafkaConsumer and the old 
> consumer protocol (only for the EAGER protocol). 
> Consider re-using the existing pendingRevocation logic that already exist in 
> the subscriptionState & used from the fetcher to determine if a partition is 
> fetchable. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15651) Investigate auto commit guarantees during Consumer.assign()

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15651:
--
Fix Version/s: 3.7.0

> Investigate auto commit guarantees during Consumer.assign()
> ---
>
> Key: KAFKA-15651
> URL: https://issues.apache.org/jira/browse/KAFKA-15651
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-preview
> Fix For: 3.7.0
>
>
> In the {{assign()}} method implementation, both {{KafkaConsumer}} and 
> {{PrototypeAsyncConsumer}} commit offsets asynchronously. Is this 
> intentional? [~junrao] asks in a [recent PR 
> review|https://github.com/apache/kafka/pull/14406/files/193af8230d0c61853d764cbbe29bca2fc6361af9#r1349023459]:
> {quote}Do we guarantee that the new owner of the unsubscribed partitions 
> could pick up the latest committed offset?
> {quote}
> Let's confirm whether the asynchronous approach is acceptable and correct. If 
> it is, great, let's enhance the documentation to briefly explain why. If it 
> is not, let's correct the behavior if it's within the API semantic 
> expectations.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15270) Integration tests for AsyncConsumer simple consume case

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15270:
--
Fix Version/s: 3.8.0

> Integration tests for AsyncConsumer simple consume case
> ---
>
> Key: KAFKA-15270
> URL: https://issues.apache.org/jira/browse/KAFKA-15270
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-preview
> Fix For: 3.8.0
>
>
> This task involves writing integration tests for covering the simple consume 
> functionality of the AsyncConsumer. This should include validation of the 
> assign, fetch and positions logic.
> Not covering any committed offset functionality as part of this task. 
> Integration tests should have a similar form as the existing 
> PlaintextConsumerTest, but scoped to the simple consume flow. 
>   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15872) Investigate autocommit retry logic

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15872:
--
Fix Version/s: 3.8.0

> Investigate autocommit retry logic
> --
>
> Key: KAFKA-15872
> URL: https://issues.apache.org/jira/browse/KAFKA-15872
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> This is purely an investigation ticket.
> Currently, we send an autocommit only if there isn't an inflight one; 
> however, this logic might not be correct because I think we should:
>  # expires the request if it is not completed in time
>  # always send an autocommit on the clock



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15515) Remove duplicated integration tests for new consumer

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15515:
--
Fix Version/s: 3.7.0

> Remove duplicated integration tests for new consumer
> 
>
> Key: KAFKA-15515
> URL: https://issues.apache.org/jira/browse/KAFKA-15515
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.7.0
>
>
> This task involves removing the temporary `PlaintextAsyncConsumer` file 
> containing duplicated integration tests for the new consumer. The copy was 
> generated to catch regressions and validate functionality in the new consumer 
> while in development. It should be deleted when the new consumer is fully 
> implemented and the existing integration tests (`PlaintextConsumerTest`) can 
> be executed for both implementations.
>  
> Context:
>  
> For the current KafkaConsumer, a set of integration tests exist in the file 
> PlaintextConsumerTest. Those tests cannot be executed as such for the new 
> consumer implementation for 2 main reasons
> - the new consumer is being developed as a new PrototypeAsyncConsumer class, 
> in parallel to the existing KafkaConsumer. 
> - the new consumer is under development, so it does not support all the 
> consumer functionality yet. 
>  
> In order to be able to run the subsets of tests that the new consumer 
> supports while the implementation completes, it was decided to :  
>  - to make a copy of the `PlaintextAsyncConsumer` class, named 
> PlaintextAsyncConsumer.
> - leave all the existing integration tests that cover the simple consumer 
> case unchanged, and disable the tests that are not yet supported by the new 
> consumer. Disabled tests will be enabled as the async consumer
> evolves.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16155) Investigate testAutoCommitIntercept

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16155:
--
Fix Version/s: 3.8.0

> Investigate testAutoCommitIntercept
> ---
>
> Key: KAFKA-16155
> URL: https://issues.apache.org/jira/browse/KAFKA-16155
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Lucas Brutschy
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> Even with KAFKA-15942, the test PlaintextConsumerTest.testAutoCommitIntercept 
> flakes on the the initial setup (before using interceptors, so interceptors 
> are unrelated here, except for being used later in the test).
> The problem is that we are seeking two topic partitions to offset 10 and 20, 
> respectively, but when we commit, we seem to have lost one of the offsets, 
> likely due to a race condition. 
> When I output `subscriptionState.allConsumed` repeatedly, I get this output:
> {noformat}
> allConsumed: {topic-0=OffsetAndMetadata{offset=100, leaderEpoch=0, 
> metadata=''}, topic-1=OffsetAndMetadata{offset=0, leaderEpoch=null, 
> metadata=''}} 
> seeking topic-0 to FetchPosition{offset=10, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[localhost:58298 (id: 0 rack: 
> null)], epoch=0}} 
> seeking topic-1 to FetchPosition{offset=20, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[localhost:58301 (id: 1 rack: 
> null)], epoch=0}} 
> allConsumed: {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, 
> metadata=''}, topic-1=OffsetAndMetadata{offset=20, leaderEpoch=null, 
> metadata=''}} 
> allConsumed: {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, 
> metadata=''}} 
> allConsumed: {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, 
> metadata=''}, topic-1=OffsetAndMetadata{offset=0, leaderEpoch=null, 
> metadata=''}}
> autocommit start {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, 
> metadata=''}, topic-1=OffsetAndMetadata{offset=0, leaderEpoch=null, 
> metadata=''}}
> {noformat}
> So we after we seek to 10 / 20, we lose one of the offsets, maybe because we 
> haven't reconciled the assignment yet. Later, we get the second topic 
> partition assigned, but the offset is initialized to 0.
> We should investigate whether this can be made more like the behavior in the 
> original consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15697) Add local assignor and ensure it cannot be used with server side assignor

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15697:
--
Labels: kip-848-client-support  (was: kip-848 kip-848-client-support 
kip-848-e2e kip-848-preview)

> Add local assignor and ensure it cannot be used with server side assignor
> -
>
> Key: KAFKA-15697
> URL: https://issues.apache.org/jira/browse/KAFKA-15697
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Philip Nee
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 4.0.0
>
>
> When we start supporting local/client-side assignor, we should:
>  # Add the config to ConsumerConfig
>  # Examine where should we implement to logic to ensure it is not used along 
> side with the server side assignor, i.e. you can only specify local or remote 
> assignor, or non.
>  ## If both assignors are specified: Throw illegalArgumentException



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15832) Trigger client reconciliation based on manager poll

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15832:
--
Labels: kip-848-client-support  (was: kip-848 kip-848-client-support 
kip-848-e2e kip-848-preview)

> Trigger client reconciliation based on manager poll
> ---
>
> Key: KAFKA-15832
> URL: https://issues.apache.org/jira/browse/KAFKA-15832
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lucas Brutschy
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> Currently the reconciliation logic on the client is triggered when a new 
> target assignment is received and resolved, or when new unresolved target 
> assignments are discovered in metadata.
> This could be improved by triggering the reconciliation logic on each poll 
> iteration, to reconcile whatever is ready to be reconciled. This would 
> require changes to support poll on the MembershipManager, and integrate it 
> with the current polling logic in the background thread. Receiving a new 
> target assignment from the broker, or resolving new topic names via a 
> metadata update could only ensure that the #assignmentReadyToReconcile is 
> properly updated (currently done), but wouldn't trigger the #reconcile() 
> logic, leaving that to the #poll() operation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16037) Upgrade existing system tests to use new consumer

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16037:
--
Labels: kip-848-client-support  (was: kip-848 kip-848-client-support 
kip-848-e2e kip-848-preview)

> Upgrade existing system tests to use new consumer
> -
>
> Key: KAFKA-16037
> URL: https://issues.apache.org/jira/browse/KAFKA-16037
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> This task is to parameterize the tests to run twice: both for the old and the 
> new Consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15835) Group commit/callbacks triggering logic

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15835:
--
Labels: kip-848-client-support  (was: kip-848 kip-848-client-support 
kip-848-e2e kip-848-preview)

> Group commit/callbacks triggering logic
> ---
>
> Key: KAFKA-15835
> URL: https://issues.apache.org/jira/browse/KAFKA-15835
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> The new consumer reconciliation logic triggers a commit request, revocation 
> callback and assignment callbacks sequentially to ensure that they are 
> executed in that order. This means that we could require multiple iterations 
> of the poll loop to complete reconciling an assignment. 
> We could consider triggering them all together, to be executed in the same 
> poll iteration, while still making sure that they are executed in the right 
> order. Note that the sequence sometimes should not block on failures (ex. if 
> commit fails revocation proceeds anyways), and other times it does block (if 
> revocation callbacks fail onPartitionsAssigned is not called).
> As part of this task, review the time boundaries for the commit request 
> issued when the assignment changes. It will be effectively time bounded by 
> the rebalance timeout enforced by the broker, so initial approach is to use 
> the same rebalance timeout as boundary on the client. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15691) Add new system tests to use new consumer

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15691:
--
Labels: kip-848-client-support  (was: kip-848 kip-848-client-support 
kip-848-e2e kip-848-preview)

> Add new system tests to use new consumer
> 
>
> Key: KAFKA-15691
> URL: https://issues.apache.org/jira/browse/KAFKA-15691
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15843) Review consumer onPartitionsAssigned called with empty partitions

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15843:
--
Labels: kip-848-client-support  (was: kip-848 kip-848-client-support)

> Review consumer onPartitionsAssigned called with empty partitions
> -
>
> Key: KAFKA-15843
> URL: https://issues.apache.org/jira/browse/KAFKA-15843
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> Legacy coordinator triggers onPartitionsAssigned with empty assignment (which 
> is not the case when triggering onPartitionsRevoked or Lost). This is the 
> behaviour of the legacy coordinator, and the new consumer implementation 
> maintains the same principle. We should review this to fully understand if it 
> is really needed to call onPartitionsAssigned with empty assignment (or if it 
> should behave consistently with the onRevoke/Lost).
> Note that the consumer integration tests rely on this call to 
> onPartitionsAssigned to #awaitRebalance (AbstractConsumerTest.scala)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15773) Group protocol configuration should be validated

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15773:
--
Labels: kip-848-client-support  (was: kip-848 kip-848-client-support 
kip-848-e2e kip-848-preview)

> Group protocol configuration should be validated
> 
>
> Key: KAFKA-15773
> URL: https://issues.apache.org/jira/browse/KAFKA-15773
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Priority: Minor
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> If the user specifies using the generic group, or not specifying the 
> group.protocol config at all, we should invalidate all group.remote.assignor
>  
> If group.local.assignor and group.remote.assignor are both configured, we 
> should also invalidate the configuration
>  
> This is an optimization/user experience improvement.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15843) Review consumer onPartitionsAssigned called with empty partitions

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15843:
--
Labels: kip-848 kip-848-client-support  (was: kip-848 
kip-848-client-support kip-848-e2e kip-848-preview)

> Review consumer onPartitionsAssigned called with empty partitions
> -
>
> Key: KAFKA-15843
> URL: https://issues.apache.org/jira/browse/KAFKA-15843
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> Legacy coordinator triggers onPartitionsAssigned with empty assignment (which 
> is not the case when triggering onPartitionsRevoked or Lost). This is the 
> behaviour of the legacy coordinator, and the new consumer implementation 
> maintains the same principle. We should review this to fully understand if it 
> is really needed to call onPartitionsAssigned with empty assignment (or if it 
> should behave consistently with the onRevoke/Lost).
> Note that the consumer integration tests rely on this call to 
> onPartitionsAssigned to #awaitRebalance (AbstractConsumerTest.scala)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14094) KIP-853: KRaft controller membership changes

2024-01-30 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-14094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-14094:
---
Description: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Controller+Membership+Changes

> KIP-853: KRaft controller membership changes
> 
>
> Key: KAFKA-14094
> URL: https://issues.apache.org/jira/browse/KAFKA-14094
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Reporter: José Armando García Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>  Labels: 4.0-blocker
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Controller+Membership+Changes



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16194) KafkaConsumer.groupMetadata() should be correct when first records are returned

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16194:
--
Fix Version/s: 3.8.0

> KafkaConsumer.groupMetadata() should be correct when first records are 
> returned
> ---
>
> Key: KAFKA-16194
> URL: https://issues.apache.org/jira/browse/KAFKA-16194
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: David Jacot
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>
> The following code returns records before the group metadata is updated. This 
> fails the first transactions ever run by the Producer/Consumer.
>  
> {code:java}
> Producer txnProducer = new KafkaProducer<>(txnProducerProps);
> Consumer consumer = new KafkaConsumer<>(consumerProps);
> txnProducer.initTransactions();
> System.out.println("Init transactions called");
> try {
> txnProducer.beginTransaction();
> System.out.println("Begin transactions called");
> consumer.subscribe(Collections.singletonList("input"));
> System.out.println("Consumer subscribed to topic -> KIP848-topic-2 ");
> ConsumerRecords records = 
> consumer.poll(Duration.ofSeconds(10));
> System.out.println("Returned " + records.count() + " records.");
> // Process and send txn messages.
> for (ConsumerRecord processedRecord : records) {
> txnProducer.send(new ProducerRecord<>("output", 
> processedRecord.key(), "Processed: " + processedRecord.value()));
> }
> ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();
> System.out.println("Group metadata inside test" + groupMetadata);
> Map offsetsToCommit = new HashMap<>();
> for (ConsumerRecord record : records) {
> offsetsToCommit.put(new TopicPartition(record.topic(), 
> record.partition()),
> new OffsetAndMetadata(record.offset() + 1));
> }
> System.out.println("Offsets to commit" + offsetsToCommit);
> // Send offsets to transaction with ConsumerGroupMetadata.
> txnProducer.sendOffsetsToTransaction(offsetsToCommit, groupMetadata);
> System.out.println("Send offsets to transaction done");
> // Commit the transaction.
> txnProducer.commitTransaction();
> System.out.println("Commit transaction done");
> } catch (ProducerFencedException | OutOfOrderSequenceException | 
> AuthorizationException e) {
> e.printStackTrace();
> txnProducer.close();
> } catch (KafkaException e) {
> e.printStackTrace();
> txnProducer.abortTransaction();
> } finally {
> txnProducer.close();
> consumer.close();
> } {code}
> The issue seems to be that while it waits in `poll`, the event to update the 
> group metadata is not processed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16194) KafkaConsumer.groupMetadata() should be correct when first records are returned

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16194:
--
Labels: consumer-threading-refactor kip-848-client-support  (was: 
consumer-threading-refactor)

> KafkaConsumer.groupMetadata() should be correct when first records are 
> returned
> ---
>
> Key: KAFKA-16194
> URL: https://issues.apache.org/jira/browse/KAFKA-16194
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: David Jacot
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
>
> The following code returns records before the group metadata is updated. This 
> fails the first transactions ever run by the Producer/Consumer.
>  
> {code:java}
> Producer txnProducer = new KafkaProducer<>(txnProducerProps);
> Consumer consumer = new KafkaConsumer<>(consumerProps);
> txnProducer.initTransactions();
> System.out.println("Init transactions called");
> try {
> txnProducer.beginTransaction();
> System.out.println("Begin transactions called");
> consumer.subscribe(Collections.singletonList("input"));
> System.out.println("Consumer subscribed to topic -> KIP848-topic-2 ");
> ConsumerRecords records = 
> consumer.poll(Duration.ofSeconds(10));
> System.out.println("Returned " + records.count() + " records.");
> // Process and send txn messages.
> for (ConsumerRecord processedRecord : records) {
> txnProducer.send(new ProducerRecord<>("output", 
> processedRecord.key(), "Processed: " + processedRecord.value()));
> }
> ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();
> System.out.println("Group metadata inside test" + groupMetadata);
> Map offsetsToCommit = new HashMap<>();
> for (ConsumerRecord record : records) {
> offsetsToCommit.put(new TopicPartition(record.topic(), 
> record.partition()),
> new OffsetAndMetadata(record.offset() + 1));
> }
> System.out.println("Offsets to commit" + offsetsToCommit);
> // Send offsets to transaction with ConsumerGroupMetadata.
> txnProducer.sendOffsetsToTransaction(offsetsToCommit, groupMetadata);
> System.out.println("Send offsets to transaction done");
> // Commit the transaction.
> txnProducer.commitTransaction();
> System.out.println("Commit transaction done");
> } catch (ProducerFencedException | OutOfOrderSequenceException | 
> AuthorizationException e) {
> e.printStackTrace();
> txnProducer.close();
> } catch (KafkaException e) {
> e.printStackTrace();
> txnProducer.abortTransaction();
> } finally {
> txnProducer.close();
> consumer.close();
> } {code}
> The issue seems to be that while it waits in `poll`, the event to update the 
> group metadata is not processed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16156) System test failing for new consumer on endOffsets with negative timestamps

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16156:
--
Labels: consumer-threading-refactor kip-848-client-support  (was: 
consumer-threading-refactor)

> System test failing for new consumer on endOffsets with negative timestamps
> ---
>
> Key: KAFKA-16156
> URL: https://issues.apache.org/jira/browse/KAFKA-16156
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>
> TransactionalMessageCopier run with 3.7 new consumer fails with "Invalid 
> negative timestamp".
> Trace:
> [2024-01-15 07:42:33,932] TRACE [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Received ListOffsetResponse 
> ListOffsetsResponseData(throttleTimeMs=0, 
> topics=[ListOffsetsTopicResponse(name='input-topic', 
> partitions=[ListOffsetsPartitionResponse(partitionIndex=0, errorCode=0, 
> oldStyleOffsets=[], timestamp=-1, offset=42804, leaderEpoch=0)])]) from 
> broker worker2:9092 (id: 2 rack: null) 
> (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager)
> [2024-01-15 07:42:33,932] DEBUG [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Handling ListOffsetResponse 
> response for input-topic-0. Fetched offset 42804, timestamp -1 
> (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils)
> [2024-01-15 07:42:33,932] TRACE [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Updating last stable offset for 
> partition input-topic-0 to 42804 
> (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils)
> [2024-01-15 07:42:33,933] DEBUG [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Fetch offsets completed 
> successfully for partitions and timestamps {input-topic-0=-1}. Result 
> org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils$ListOffsetResult@bf2a862
>  (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager)
> [2024-01-15 07:42:33,933] TRACE [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] No events to process 
> (org.apache.kafka.clients.consumer.internals.events.EventProcessor)
> [2024-01-15 07:42:33,933] ERROR Shutting down after unexpected error in event 
> loop (org.apache.kafka.tools.TransactionalMessageCopier)
> org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: 
> Invalid negative timestamp
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:234)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerUtils.getResult(ConsumerUtils.java:212)
>   at 
> org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent.get(CompletableApplicationEvent.java:44)
>   at 
> org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler.addAndGet(ApplicationEventHandler.java:113)
>   at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.beginningOrEndOffset(AsyncKafkaConsumer.java:1134)
>   at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1113)
>   at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1108)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:1651)
>   at 
> org.apache.kafka.tools.TransactionalMessageCopier.messagesRemaining(TransactionalMessageCopier.java:246)
>   at 
> org.apache.kafka.tools.TransactionalMessageCopier.runEventLoop(TransactionalMessageCopier.java:342)
>   at 
> org.apache.kafka.tools.TransactionalMessageCopier.main(TransactionalMessageCopier.java:292)
> Caused by: java.lang.IllegalArgumentException: Invalid negative timestamp
>   at 
> org.apache.kafka.clients.consumer.OffsetAndTimestamp.(OffsetAndTimestamp.java:39)
>   at 
> org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.buildOffsetsForTimesResult(OffsetFetcherUtils.java:253)
>   at 
> org.apache.kafka.clients.consumer.internals.OffsetsRequestManager.lambda$fetchOffsets$1(OffsetsRequestManager.java:181)
>   at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>   at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>   at 
> java.

[jira] [Created] (KAFKA-16211) Inconsistent static config values in CreateTopicsResult and DescribeConfigsResult

2024-01-30 Thread Gantigmaa Selenge (Jira)
Gantigmaa Selenge created KAFKA-16211:
-

 Summary: Inconsistent static config values in CreateTopicsResult 
and DescribeConfigsResult
 Key: KAFKA-16211
 URL: https://issues.apache.org/jira/browse/KAFKA-16211
 Project: Kafka
  Issue Type: Bug
  Components: controller
Reporter: Gantigmaa Selenge


When creating a topic in KRaft cluster, a config value returned in 
CreateTopicsResult is different than what you get from describe topic configs, 
if the config was set in broker.properties or controller.properties or in both 
but with different values. 

 

For example, start a broker with `segment.bytes` set to 573741824 in the 
properties file and then create a topic, the CreateTopicsResult contains:

ConfigEntry(name=segment.bytes, value=1073741824, source=DEFAULT_CONFIG, 
isSensitive=false, isReadOnly=false, synonyms=[], type=INT, documentation=null)

 because the controller was started without setting this config. 

However when you describe configurations for the same topic, the config value 
set by the broker is returned:

Create topic configsConfigEntry(name=segment.bytes, value=573741824, 
source=STATIC_BROKER_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[], 
type=null, documentation=null)

 

Vice versa, if the controller is started with this config set to a different 
value, the create topic request returns the value set by the controller and 
then when you describe the config for the same topic, you get the value set by 
the broker. This makes it confusing to understand which value being is used.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16211) Inconsistent config values in CreateTopicsResult and DescribeConfigsResult

2024-01-30 Thread Gantigmaa Selenge (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gantigmaa Selenge updated KAFKA-16211:
--
Summary: Inconsistent config values in CreateTopicsResult and 
DescribeConfigsResult  (was: Inconsistent static config values in 
CreateTopicsResult and DescribeConfigsResult)

> Inconsistent config values in CreateTopicsResult and DescribeConfigsResult
> --
>
> Key: KAFKA-16211
> URL: https://issues.apache.org/jira/browse/KAFKA-16211
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Reporter: Gantigmaa Selenge
>Priority: Minor
>
> When creating a topic in KRaft cluster, a config value returned in 
> CreateTopicsResult is different than what you get from describe topic 
> configs, if the config was set in broker.properties or controller.properties 
> or in both but with different values. 
>  
> For example, start a broker with `segment.bytes` set to 573741824 in the 
> properties file and then create a topic, the CreateTopicsResult contains:
> ConfigEntry(name=segment.bytes, value=1073741824, source=DEFAULT_CONFIG, 
> isSensitive=false, isReadOnly=false, synonyms=[], type=INT, 
> documentation=null)
>  because the controller was started without setting this config. 
> However when you describe configurations for the same topic, the config value 
> set by the broker is returned:
> Create topic configsConfigEntry(name=segment.bytes, value=573741824, 
> source=STATIC_BROKER_CONFIG, isSensitive=false, isReadOnly=false, 
> synonyms=[], type=null, documentation=null)
>  
> Vice versa, if the controller is started with this config set to a different 
> value, the create topic request returns the value set by the controller and 
> then when you describe the config for the same topic, you get the value set 
> by the broker. This makes it confusing to understand which value being is 
> used.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15853: Move AuthorizerUtils and its dependencies to server module [kafka]

2024-01-30 Thread via GitHub


OmniaGM commented on PR #15167:
URL: https://github.com/apache/kafka/pull/15167#issuecomment-1917632608

   Just rebased, it was an easy one 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-8606) Provide a method to fetch committed offsets for a collection of TopicPartition

2024-01-30 Thread Andrew Schofield (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Schofield resolved KAFKA-8606.
-
Fix Version/s: 2.4.0
   Resolution: Fixed

Method added in AK 2.4.

> Provide a method to fetch committed offsets for a collection of TopicPartition
> --
>
> Key: KAFKA-8606
> URL: https://issues.apache.org/jira/browse/KAFKA-8606
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Affects Versions: 2.3.0, 2.2.1
>Reporter: ov7a
>Priority: Major
> Fix For: 2.4.0
>
>
> Currently KafkaConsumer has methods for fetching begging offsets, end offsets 
> and offsets for times, all of them accepting a collection of TopicPartition.
> There is a method to fetch committed offset for single TopicPartition, but 
> there is no public API to fetch commited offsets for a collection of 
> TopicPartition. So, If one wants to fetch all committed offsets for topic, a 
> request per partition is created.
> Note that ConsumerCoordinator.fetchCommittedOffsets which called internally 
> in "committed" method does accept a collection of TopicPartition. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]

2024-01-30 Thread via GitHub


OmniaGM commented on code in PR #15263:
URL: https://github.com/apache/kafka/pull/15263#discussion_r1471733042


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -874,7 +874,13 @@ class Partition(val topicPartition: TopicPartition,
   private def createLogInAssignedDirectoryId(partitionState: 
LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, 
topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = {
 targetLogDirectoryId match {
   case Some(directoryId) =>
-createLogIfNotExists(directoryId == DirectoryId.UNASSIGNED, 
isFutureReplica = false, highWatermarkCheckpoints, topicId, 
targetLogDirectoryId)
+if (logManager.onlineLogDirId(directoryId) || 
!logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) {
+  createLogIfNotExists(partitionState.isNew, isFutureReplica = false, 
highWatermarkCheckpoints, topicId, targetLogDirectoryId)
+} else {
+  warn(s"Skipping creation of log because there are potentially 
offline log " +

Review Comment:
   I don't believe it will cause an issue. We just need to ensure we have good 
test coverage for this at least in unit tests as JBOD doesn't have strong 
integration tests at the moment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14585: Refactoring for moving the storage tool [kafka]

2024-01-30 Thread via GitHub


fvaleri commented on code in PR #15273:
URL: https://github.com/apache/kafka/pull/15273#discussion_r1471706520


##
raft/src/main/java/org/apache/kafka/raft/RaftConfig.java:
##
@@ -206,6 +245,27 @@ private static Integer parseVoterId(String idString) {
 }
 }
 
+private static Set parseProcessRoles(List processRoles, 
Map voterConnections, int nodeId) {

Review Comment:
   KafkaConfig:1537, I kept the original code here. I don't remember the exact 
reason, as two months have passed. 
   
   I have to move ProcessRole from server to server-common to avoid circular 
dependency. Let me know if this is fine.



##
raft/src/main/java/org/apache/kafka/raft/RaftConfig.java:
##
@@ -206,6 +245,27 @@ private static Integer parseVoterId(String idString) {
 }
 }
 
+private static Set parseProcessRoles(List processRoles, 
Map voterConnections, int nodeId) {

Review Comment:
   KafkaConfig:1537, I kept the original code here. I don't remember the exact 
reason, as two months have passed. I have to move ProcessRole from server to 
server-common to avoid circular dependency. Let me know if this is fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14585: Refactoring for moving the storage tool [kafka]

2024-01-30 Thread via GitHub


fvaleri commented on code in PR #15273:
URL: https://github.com/apache/kafka/pull/15273#discussion_r1471706520


##
raft/src/main/java/org/apache/kafka/raft/RaftConfig.java:
##
@@ -206,6 +245,27 @@ private static Integer parseVoterId(String idString) {
 }
 }
 
+private static Set parseProcessRoles(List processRoles, 
Map voterConnections, int nodeId) {

Review Comment:
   KafkaConfig:1537, I kept the original code here. I don't remember the exact 
reason, as I did it two months ago. I have to move ProcessRole from server to 
server-common to avoid circular dependency. Let me know if this is fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move AuthorizerUtils and its dependencies to server module [kafka]

2024-01-30 Thread via GitHub


mimaison commented on PR #15167:
URL: https://github.com/apache/kafka/pull/15167#issuecomment-1917615982

   FYI I merged https://github.com/apache/kafka/pull/15246 and this indeed 
needs rebasing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16165) Consumer invalid transition on expired poll interval

2024-01-30 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16165:
---
Fix Version/s: 3.8.0

> Consumer invalid transition on expired poll interval
> 
>
> Key: KAFKA-16165
> URL: https://issues.apache.org/jira/browse/KAFKA-16165
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: client-transitions-issues
> Fix For: 3.8.0
>
>
> Running system tests with the new async consumer revealed an invalid 
> transition related to the consumer not being polled on the interval in some 
> kind of scenario (maybe relates to consumer close, as the transition is 
> leaving->stale)
> Log trace:
> [2024-01-17 19:45:07,379] WARN [Consumer 
> clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, 
> groupId=consumer-groups-test-2] consumer poll timeout has expired. This means 
> the time between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> [2024-01-17 19:45:07,379] ERROR [Consumer 
> clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, 
> groupId=consumer-groups-test-2] Unexpected error caught in consumer network 
> thread (org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread:91)
> java.lang.IllegalStateException: Invalid state transition from LEAVING to 
> STALE
>   at 
> org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionTo(MembershipManagerImpl.java:303)
>   at 
> org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionToStale(MembershipManagerImpl.java:739)
>   at 
> org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager.poll(HeartbeatRequestManager.java:194)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$runOnce$0(ConsumerNetworkThread.java:137)
>   at 
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
>   at 
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
>   at 
> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
>   at 
> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
>   at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>   at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>   at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
>   at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:657)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.runOnce(ConsumerNetworkThread.java:139)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:88)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14585: Refactoring for moving the storage tool [kafka]

2024-01-30 Thread via GitHub


fvaleri commented on code in PR #15273:
URL: https://github.com/apache/kafka/pull/15273#discussion_r1471706370


##
core/src/test/scala/unit/kafka/log/LogConfigTest.scala:
##
@@ -94,6 +94,18 @@ class LogConfigTest {
   case TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG => 
assertPropertyInvalid(name, "not_a_boolean")
   case TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG => 
assertPropertyInvalid(name, "not_a_number", "-3")
   case TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG => 
assertPropertyInvalid(name, "not_a_number", "-3")
+  case LogConfig.LOG_DIR_PROP => assert(true)
+  case LogConfig.LOG_DIRS_PROP => assert(true)

Review Comment:
   These are both optional configs (even if you have to specify at least one), 
that's why I skip validation here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16165) Consumer invalid transition on expired poll interval

2024-01-30 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16165:
---
Affects Version/s: 3.7.0

> Consumer invalid transition on expired poll interval
> 
>
> Key: KAFKA-16165
> URL: https://issues.apache.org/jira/browse/KAFKA-16165
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: client-transitions-issues
>
> Running system tests with the new async consumer revealed an invalid 
> transition related to the consumer not being polled on the interval in some 
> kind of scenario (maybe relates to consumer close, as the transition is 
> leaving->stale)
> Log trace:
> [2024-01-17 19:45:07,379] WARN [Consumer 
> clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, 
> groupId=consumer-groups-test-2] consumer poll timeout has expired. This means 
> the time between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> [2024-01-17 19:45:07,379] ERROR [Consumer 
> clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, 
> groupId=consumer-groups-test-2] Unexpected error caught in consumer network 
> thread (org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread:91)
> java.lang.IllegalStateException: Invalid state transition from LEAVING to 
> STALE
>   at 
> org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionTo(MembershipManagerImpl.java:303)
>   at 
> org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionToStale(MembershipManagerImpl.java:739)
>   at 
> org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager.poll(HeartbeatRequestManager.java:194)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$runOnce$0(ConsumerNetworkThread.java:137)
>   at 
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
>   at 
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
>   at 
> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
>   at 
> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
>   at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>   at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>   at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
>   at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:657)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.runOnce(ConsumerNetworkThread.java:139)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:88)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15853: Move PasswordEncoder to server-common [kafka]

2024-01-30 Thread via GitHub


mimaison merged PR #15246:
URL: https://github.com/apache/kafka/pull/15246


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16162: resend broker registration on metadata update to IBP 3.7-IV2 [kafka]

2024-01-30 Thread via GitHub


cmccabe commented on PR #15270:
URL: https://github.com/apache/kafka/pull/15270#issuecomment-1917609564

   Thanks, all. Committed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16162: resend broker registration on metadata update to IBP 3.7-IV2 [kafka]

2024-01-30 Thread via GitHub


cmccabe closed pull request #15270: KAFKA-16162: resend broker registration on 
metadata update to IBP 3.7-IV2
URL: https://github.com/apache/kafka/pull/15270


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14585: Refactoring for moving the storage tool [kafka]

2024-01-30 Thread via GitHub


fvaleri commented on code in PR #15273:
URL: https://github.com/apache/kafka/pull/15273#discussion_r1471706370


##
core/src/test/scala/unit/kafka/log/LogConfigTest.scala:
##
@@ -94,6 +94,18 @@ class LogConfigTest {
   case TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG => 
assertPropertyInvalid(name, "not_a_boolean")
   case TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG => 
assertPropertyInvalid(name, "not_a_number", "-3")
   case TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG => 
assertPropertyInvalid(name, "not_a_number", "-3")
+  case LogConfig.LOG_DIR_PROP => assert(true)
+  case LogConfig.LOG_DIRS_PROP => assert(true)

Review Comment:
   These are both optional strings (even if you have to specify at least one), 
that's why I skip validation here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14585: Refactoring for moving the storage tool [kafka]

2024-01-30 Thread via GitHub


fvaleri commented on code in PR #15273:
URL: https://github.com/apache/kafka/pull/15273#discussion_r1471706683


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##
@@ -500,6 +553,34 @@ public static Map configKeys() {
 return Collections.unmodifiableMap(CONFIG.configKeys());
 }
 
+public List logDirs() {
+String csvList = logDirs != null ? logDirs : logDir;
+if (csvList == null || csvList.isEmpty()) {
+return Collections.emptyList();
+} else {
+return Arrays.stream(csvList.split("\\s*,\\s*"))
+.filter(v -> !v.equals(""))
+.collect(Collectors.toList());
+}
+}
+
+public String getMetadataLogDir() {
+if (metadataLogDir != null) {
+return metadataLogDir;
+} else {
+return logDirs().get(0);
+}
+}
+
+public Optional interBrokerProtocolVersion() {
+String originalIBP = (String) 
originals().get(LogConfig.INTER_BROKER_PROTOCOL_VERSION_PROP);
+return originalIBP != null ? Optional.of(originalIBP) : 
Optional.empty();
+}
+
+public Boolean unstableMetadataVersionsEnabled() {
+return unstableMetadataVersionsEnabled;
+}

Review Comment:
   They are used by the tool, removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14585: Refactoring for moving the storage tool [kafka]

2024-01-30 Thread via GitHub


fvaleri commented on code in PR #15273:
URL: https://github.com/apache/kafka/pull/15273#discussion_r1471706253


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##
@@ -290,7 +320,15 @@ public Optional serverConfigName(String 
configName) {
 .define(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, LONG, 
DEFAULT_LOCAL_RETENTION_MS, atLeast(-2), MEDIUM,
 TopicConfig.LOCAL_LOG_RETENTION_MS_DOC)
 .define(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, LONG, 
DEFAULT_LOCAL_RETENTION_BYTES, atLeast(-2), MEDIUM,
-TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC);
+TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC)
+.define(LOG_DIR_PROP, STRING, DEFAULT_LOG_DIR, HIGH, LOG_DIR_DOC)
+.define(LOG_DIRS_PROP, STRING, null, HIGH, LOG_DIRS_DOC)
+.define(METADATA_LOG_DIR_PROP, STRING, null, HIGH, 
METADATA_LOG_DIR_DOC)
+.define(INTER_BROKER_PROTOCOL_VERSION_PROP, STRING, 
DEFAULT_INTER_BROKER_PROTOCOL_VERSION, new MetadataVersionValidator(), MEDIUM, 
INTER_BROKER_PROTOCOL_VERSION_DOC)
+// This indicates whether unreleased APIs should be advertised by 
this node.
+.defineInternal(UNSTABLE_API_VERSIONS_ENABLE_PROP, BOOLEAN, 
DEFAULT_UNSTABLE_API_VERSIONS_ENABLE, HIGH)
+// This indicates whether unreleased MetadataVersions should be 
enabled on this node.
+.defineInternal(UNSTABLE_METADATA_VERSIONS_ENABLE_PROP, BOOLEAN, 
DEFAULT_UNSTABLE_METADATA_VERSIONS_ENABLE, HIGH);

Review Comment:
   KafkaConfig:446-447



##
core/src/test/scala/unit/kafka/log/LogConfigTest.scala:
##
@@ -94,6 +94,18 @@ class LogConfigTest {
   case TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG => 
assertPropertyInvalid(name, "not_a_boolean")
   case TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG => 
assertPropertyInvalid(name, "not_a_number", "-3")
   case TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG => 
assertPropertyInvalid(name, "not_a_number", "-3")
+  case LogConfig.LOG_DIR_PROP => assert(true)
+  case LogConfig.LOG_DIRS_PROP => assert(true)

Review Comment:
   These are both optional strings, that's why I skip validation here.



##
raft/src/main/java/org/apache/kafka/raft/RaftConfig.java:
##
@@ -206,6 +245,27 @@ private static Integer parseVoterId(String idString) {
 }
 }
 
+private static Set parseProcessRoles(List processRoles, 
Map voterConnections, int nodeId) {

Review Comment:
   KafkaConfig:1537, I kept the original code here. I don't remember the exact 
reason, as I did this two months ago. I have to move ProcessRole from server to 
server-common to avoid circular dependency. Let me know if this is fine.



##
raft/src/main/java/org/apache/kafka/raft/RaftConfig.java:
##
@@ -206,6 +245,27 @@ private static Integer parseVoterId(String idString) {
 }
 }
 
+private static Set parseProcessRoles(List processRoles, 
Map voterConnections, int nodeId) {
+Set distinctRoles = new HashSet<>();
+for (String role : processRoles) {
+switch (role) {
+case "broker":
+distinctRoles.add("BrokerRole");

Review Comment:
   Fixed. See my previous comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Cleanup log.dirs in ReplicaManagerTest on JVM exit [kafka]

2024-01-30 Thread via GitHub


gaurav-narula commented on PR #15289:
URL: https://github.com/apache/kafka/pull/15289#issuecomment-1917579474

   Addressed in 
[4406670](https://github.com/apache/kafka/pull/15289/commits/4406670459210eb7fac009c836288955c7bcb23c)
   
   > Shouldn't we instead update the tests to use temp directories and cleanup 
any files they create?
   
   I think the tests deliberately use relative directories because of 
KAFKA-5759.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16165) Consumer invalid transition on expired poll interval

2024-01-30 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16165:
---
Labels: client-transitions-issues  (was: )

> Consumer invalid transition on expired poll interval
> 
>
> Key: KAFKA-16165
> URL: https://issues.apache.org/jira/browse/KAFKA-16165
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: client-transitions-issues
>
> Running system tests with the new async consumer revealed an invalid 
> transition related to the consumer not being polled on the interval in some 
> kind of scenario (maybe relates to consumer close, as the transition is 
> leaving->stale)
> Log trace:
> [2024-01-17 19:45:07,379] WARN [Consumer 
> clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, 
> groupId=consumer-groups-test-2] consumer poll timeout has expired. This means 
> the time between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> [2024-01-17 19:45:07,379] ERROR [Consumer 
> clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, 
> groupId=consumer-groups-test-2] Unexpected error caught in consumer network 
> thread (org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread:91)
> java.lang.IllegalStateException: Invalid state transition from LEAVING to 
> STALE
>   at 
> org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionTo(MembershipManagerImpl.java:303)
>   at 
> org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionToStale(MembershipManagerImpl.java:739)
>   at 
> org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager.poll(HeartbeatRequestManager.java:194)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$runOnce$0(ConsumerNetworkThread.java:137)
>   at 
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
>   at 
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
>   at 
> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
>   at 
> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
>   at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>   at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>   at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
>   at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:657)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.runOnce(ConsumerNetworkThread.java:139)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:88)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-10184) Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores

2024-01-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-10184.
-
  Assignee: (was: John Roesler)
Resolution: Cannot Reproduce

This ticket is pretty old, and looking into Gradle Enterprise the test seems to 
be stable. Closing for now.

> Flaky 
> HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
> --
>
> Key: KAFKA-10184
> URL: https://issues.apache.org/jira/browse/KAFKA-10184
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Priority: Critical
>
> {code}
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 12. Input 
> records haven't all been written to the changelog: 442
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398)
>   at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:149)
>   at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:91)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.ja

Re: [PR] KAFKA-16162: resend broker registration on metadata update to IBP 3.7-IV2 [kafka]

2024-01-30 Thread via GitHub


cmccabe commented on code in PR #15270:
URL: https://github.com/apache/kafka/pull/15270#discussion_r1471687028


##
server-common/src/test/java/org/apache/kafka/common/DirectoryIdTest.java:
##
@@ -67,18 +68,23 @@ void testCreateAssignmentMap() {
 
 @Test
 void testIsOnline() {
+// Given
 List sortedDirs = Arrays.asList(
 Uuid.fromString("imQKg2cXTVe8OUFNa3R9bg"),
 Uuid.fromString("Mwy5wxTDQxmsZwGzjsaX7w"),
 Uuid.fromString("s8rHMluuSDCnxt3FmKwiyw")
 );
 sortedDirs.sort(Uuid::compareTo);
+List emptySortedDirs = Collections.emptyList();

Review Comment:
   Since this is a critical fix, I'm not going to hold it up for this comment. 
But in the future, it would be better to put separate test cases in separate 
test functions, rather than creating long tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move AuthorizerUtils and its dependencies to server module [kafka]

2024-01-30 Thread via GitHub


mimaison commented on PR #15167:
URL: https://github.com/apache/kafka/pull/15167#issuecomment-1917549316

   It's next on my list once https://github.com/apache/kafka/pull/15246 is 
merged. It will probably require you to rebase (sorry).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16204: Create partition dir for mockLog [kafka]

2024-01-30 Thread via GitHub


mimaison commented on PR #15288:
URL: https://github.com/apache/kafka/pull/15288#issuecomment-1917532745

   It's just a cleanup, not in any way related to a blocker item, hence why I'm 
asking.
   
   It's fine if you prefer to only accept commits fixing blockers in the 3.7 
branch. Ultimately we don't want to backport cosmetic changes and risk 
introducing issues while the release is ongoing. We back also backport to 3.7 
once 3.7.0 is out.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move AuthorizerUtils and its dependencies to server module [kafka]

2024-01-30 Thread via GitHub


OmniaGM commented on PR #15167:
URL: https://github.com/apache/kafka/pull/15167#issuecomment-1917531480

   Hi @mimaison can you review this pr please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16204: Create partition dir for mockLog [kafka]

2024-01-30 Thread via GitHub


stanislavkozlovski commented on PR #15288:
URL: https://github.com/apache/kafka/pull/15288#issuecomment-1917509162

   oh lol it's 3 lines, :shipit: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16200) Ensure RequestManager handling of expired timeouts are consistent

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16200:
--
Labels: consumer-threading-refactor timeout  (was: 
consumer-threading-refactor consumer-timeout timeout)

> Ensure RequestManager handling of expired timeouts are consistent
> -
>
> Key: KAFKA-16200
> URL: https://issues.apache.org/jira/browse/KAFKA-16200
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, timeout
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16208) Design new Consumer timeout policy

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16208:
--
Labels: consumer-threading-refactor timeout  (was: 
consumer-threading-refactor consumer-timeout timeout)

> Design new Consumer timeout policy
> --
>
> Key: KAFKA-16208
> URL: https://issues.apache.org/jira/browse/KAFKA-16208
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer, documentation
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, timeout
> Fix For: 3.8.0
>
>
> This task is to design and document the timeout policy for the new Consumer 
> implementation.
> The documentation lives here: 
> https://cwiki.apache.org/confluence/display/KAFKA/Java+client+Consumer+timeouts



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15848) Consumer API timeout inconsistent between ConsumerDelegate implementations

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15848:
--
Labels: consumer-threading-refactor kip-848-client-support timeout  (was: 
consumer-threading-refactor consumer-timeout kip-848-client-support timeout)

> Consumer API timeout inconsistent between ConsumerDelegate implementations
> --
>
> Key: KAFKA-15848
> URL: https://issues.apache.org/jira/browse/KAFKA-15848
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> timeout
> Fix For: 3.8.0
>
>
> The two {{ConsumerDelegate}} implementations ({{{}LegacyKafkaConsumer{}}} and 
> {{{}AsyncKafkaConsumer{}}}) have a fundamental difference related to their 
> use and interpretation of the {{Timer}} that is supplied.
> h3. tl;dr
> {{AsyncKafkaConsumer}} is very literal about the timeout, whereas 
> {{LegacyKafkaConsumer}} seems to give a little wiggle room.
> {{LegacyKafkaConsumer}} is structured so that the logic it uses can check for 
> success of its operations _before_ checking the timer:
>  # Submit operation asynchronously
>  # Wait for operation to complete using {{NetworkClient.poll()}}
>  # Check for result
>  ## If successful, return success
>  ## If fatal failure, return failure
>  # Check timer
>  ## If timer expired, return failure
> {{AsyncKafkaConsumer}} uses {{Future.get()}} to wait for its operations:
>  # Submit operation asynchronously
>  # Wait for operation to complete using {{Future.get()}}
>  ## If operation timed out, {{Future.get()}} will throw a timeout error
>  # Check for result
>  ## If successful, return success
>  ## Otherwise, return failure
> h3. How to reproduce
> This causes subtle timing issues, but they can be easily reproduced via any 
> of the {{KafkaConsumerTest}} unit tests that invoke the {{consumer.poll(0)}} 
> API. Here's a bit of code that illustrates the difference between the two 
> approaches.
> {{LegacyKafkaConsumer}} performs a lot of its network I/O operations in a 
> manner similar to this:
> {code:java}
> public int getCount(Timer timer) {
> do {
> final RequestFuture future = sendSomeRequest(partitions);
> client.poll(future, timer);
> if (future.isDone())
> return future.get();
> } while (timer.notExpired());
> return -1;
> }
> {code}
> {{AsyncKafkaConsumer}} has similar logic, but it is structured like this:
> {code:java}
> private int getCount(Timer timer) {
> try {
> CompletableFuture future = new CompleteableFuture<>();
> applicationEventQueue.add(future);
> return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
> } catch (TimeoutException e) {
> return -1;
> }
> }
> {code}
> The call to {{add}} enqueues the network operation, but it then _immediately_ 
> invokes {{Future.get()}} with the timeout to implement a time-bounded 
> blocking call. Since this method is being called with a timeout of 0, it 
> _immediately_ throws a {{{}TimeoutException{}}}. 
> h3. Suggested fix
> TBD :(



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15475) Request might retry forever even if the user API timeout expires

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15475:
--
Labels: consumer-threading-refactor timeout  (was: 
consumer-threading-refactor consumer-timeout timeout)

> Request might retry forever even if the user API timeout expires
> 
>
> Key: KAFKA-15475
> URL: https://issues.apache.org/jira/browse/KAFKA-15475
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, timeout
> Fix For: 3.8.0
>
>
> If the request timeout in the background thread, it will be completed with 
> TimeoutException, which is Retriable.  In the TopicMetadataRequestManager and 
> possibly other managers, the request might continue to be retried forever.
>  
> There are two ways to fix this
>  # Pass a timer to the manager to remove the inflight requests when it is 
> expired.
>  # Pass the future to the application layer and continue to retry.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15558) Determine if Timer should be used elsewhere in PrototypeAsyncConsumer.updateFetchPositions()

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15558:
--
Labels: consumer-threading-refactor fetcher timeout  (was: 
consumer-threading-refactor consumer-timeout fetcher timeout)

> Determine if Timer should be used elsewhere in 
> PrototypeAsyncConsumer.updateFetchPositions()
> 
>
> Key: KAFKA-15558
> URL: https://issues.apache.org/jira/browse/KAFKA-15558
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: consumer-threading-refactor, fetcher, timeout
> Fix For: 3.8.0
>
>
> This is a followup ticket based on a question from [~junrao] when reviewing 
> the [fetch request manager pull 
> request|https://github.com/apache/kafka/pull/14406]:
> {quote}It still seems weird that we only use the timer for 
> {{{}refreshCommittedOffsetsIfNeeded{}}}, but not for other cases where we 
> don't have valid fetch positions. For example, if all partitions are in 
> {{AWAIT_VALIDATION}} state, it seems that {{PrototypeAsyncConsumer.poll()}} 
> will just go in a busy loop, which is not efficient.
> {quote}
> The goal here is to determine if we should also be propagating the Timer to 
> the validate positions and reset positions operations.
> Note: we should also investigate if the existing {{KafkaConsumer}} 
> implementation should be fixed, too.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16009) Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16009:
--
Labels: consumer-threading-refactor kip-848-client-support timeout  (was: 
consumer-threading-refactor consumer-timeout kip-848-client-support timeout)

> Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation
> 
>
> Key: KAFKA-16009
> URL: https://issues.apache.org/jira/browse/KAFKA-16009
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> timeout
> Fix For: 3.8.0
>
>
> The integration test 
> {{PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation}} is failing 
> when using the {{AsyncKafkaConsumer}}.
> The error is:
> {code}
> org.opentest4j.AssertionFailedError: Timed out before expected rebalance 
> completed
>   at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
>   at org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
>   at 
> kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317)
>   at 
> kafka.api.PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation(PlaintextConsumerTest.scala:235)
> {code}
> The logs include this line:
>  
> {code}
> [2023-12-13 15:20:27,824] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> {code} 
> I don't know if that's related or not.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16199) Prune the event queue if event timeout expired before starting

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16199:
--
Labels: consumer-threading-refactor timeout  (was: 
consumer-threading-refactor consumer-timeout timeout)

> Prune the event queue if event timeout expired before starting
> --
>
> Key: KAFKA-16199
> URL: https://issues.apache.org/jira/browse/KAFKA-16199
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, timeout
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15974) Enforce that events and requests respect user-provided timeout

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15974:
--
Labels: consumer-threading-refactor kip-848-client-support timeout  (was: 
consumer-threading-refactor consumer-timeout kip-848-client-support timeout)

> Enforce that events and requests respect user-provided timeout
> --
>
> Key: KAFKA-15974
> URL: https://issues.apache.org/jira/browse/KAFKA-15974
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> timeout
> Fix For: 3.8.0
>
>
> The intention of the {{CompletableApplicationEvent}} is for a {{Consumer}} to 
> block waiting for the event to complete. The application thread will block 
> for the timeout, but there is not yet a consistent manner in which events are 
> timed out.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16008) Fix PlaintextConsumerTest.testMaxPollIntervalMs

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16008:
--
Labels: consumer-threading-refactor timeout  (was: 
consumer-threading-refactor consumer-timeout timeout)

> Fix PlaintextConsumerTest.testMaxPollIntervalMs
> ---
>
> Key: KAFKA-16008
> URL: https://issues.apache.org/jira/browse/KAFKA-16008
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, timeout
> Fix For: 3.8.0
>
>
> The integration test {{PlaintextConsumerTest.testMaxPollIntervalMs}} is 
> failing when using the {{AsyncKafkaConsumer}}.
> The error is:
> {code}
> org.opentest4j.AssertionFailedError: Timed out before expected rebalance 
> completed
>     at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
> at org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
> at 
> kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317)
> at 
> kafka.api.PlaintextConsumerTest.testMaxPollIntervalMs(PlaintextConsumerTest.scala:194)
> {code}
> The logs include this line:
>  
> {code}
> [2023-12-13 15:11:16,134] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> {code} 
> I don't know if that's related or not.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16100) Add timeout to all the CompletableApplicationEvents

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16100:
--
Labels: consumer-threading-refactor timeout  (was: 
consumer-threading-refactor consumer-timeout timeout)

> Add timeout to all the CompletableApplicationEvents
> ---
>
> Key: KAFKA-16100
> URL: https://issues.apache.org/jira/browse/KAFKA-16100
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Andrew Schofield
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor, timeout
> Fix For: 3.8.0
>
>   Original Estimate: 40h
>  Remaining Estimate: 40h
>
> The handling of timeouts and responses for the various kinds of 
> ApplicationEvents in the new consumer is not consistent. A small amount of 
> refactoring would make the code more maintainable and give consistent 
> behaviour for the different requests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16010) Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16010:
--
Labels: consumer-threading-refactor kip-848-client-support timeout  (was: 
consumer-threading-refactor consumer-timeout kip-848-client-support timeout)

> Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling
> --
>
> Key: KAFKA-16010
> URL: https://issues.apache.org/jira/browse/KAFKA-16010
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> timeout
> Fix For: 3.8.0
>
>
> The integration test 
> {{PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling}} is 
> failing when using the {{AsyncKafkaConsumer}}.
> The error is:
> {code}
> org.opentest4j.AssertionFailedError: Did not get valid assignment for 
> partitions [topic1-2, topic1-4, topic-1, topic-0, topic1-5, topic1-1, 
> topic1-0, topic1-3] after one consumer left
>   at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
>   at org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
>   at 
> kafka.api.AbstractConsumerTest.validateGroupAssignment(AbstractConsumerTest.scala:286)
>   at 
> kafka.api.PlaintextConsumerTest.runMultiConsumerSessionTimeoutTest(PlaintextConsumerTest.scala:1883)
>   at 
> kafka.api.PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling(PlaintextConsumerTest.scala:1281)
> {code}
> The logs include these lines:
>  
> {code}
> [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> {code} 
> I don't know if that's related or not.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16009) Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16009:
--
Labels: consumer-threading-refactor consumer-timeout kip-848-client-support 
timeout  (was: consumer-threading-refactor consumer-timeout 
kip-848-client-support)

> Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation
> 
>
> Key: KAFKA-16009
> URL: https://issues.apache.org/jira/browse/KAFKA-16009
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, consumer-timeout, 
> kip-848-client-support, timeout
> Fix For: 3.8.0
>
>
> The integration test 
> {{PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation}} is failing 
> when using the {{AsyncKafkaConsumer}}.
> The error is:
> {code}
> org.opentest4j.AssertionFailedError: Timed out before expected rebalance 
> completed
>   at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
>   at org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
>   at 
> kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317)
>   at 
> kafka.api.PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation(PlaintextConsumerTest.scala:235)
> {code}
> The logs include this line:
>  
> {code}
> [2023-12-13 15:20:27,824] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> {code} 
> I don't know if that's related or not.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15475) Request might retry forever even if the user API timeout expires

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15475:
--
Labels: consumer-threading-refactor consumer-timeout timeout  (was: 
consumer-threading-refactor consumer-timeout)

> Request might retry forever even if the user API timeout expires
> 
>
> Key: KAFKA-15475
> URL: https://issues.apache.org/jira/browse/KAFKA-15475
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, consumer-timeout, timeout
> Fix For: 3.8.0
>
>
> If the request timeout in the background thread, it will be completed with 
> TimeoutException, which is Retriable.  In the TopicMetadataRequestManager and 
> possibly other managers, the request might continue to be retried forever.
>  
> There are two ways to fix this
>  # Pass a timer to the manager to remove the inflight requests when it is 
> expired.
>  # Pass the future to the application layer and continue to retry.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16100) Add timeout to all the CompletableApplicationEvents

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16100:
--
Labels: consumer-threading-refactor consumer-timeout timeout  (was: 
consumer-threading-refactor consumer-timeout)

> Add timeout to all the CompletableApplicationEvents
> ---
>
> Key: KAFKA-16100
> URL: https://issues.apache.org/jira/browse/KAFKA-16100
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Andrew Schofield
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor, consumer-timeout, timeout
> Fix For: 3.8.0
>
>   Original Estimate: 40h
>  Remaining Estimate: 40h
>
> The handling of timeouts and responses for the various kinds of 
> ApplicationEvents in the new consumer is not consistent. A small amount of 
> refactoring would make the code more maintainable and give consistent 
> behaviour for the different requests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16199) Prune the event queue if event timeout expired before starting

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16199:
--
Labels: consumer-threading-refactor consumer-timeout timeout  (was: 
consumer-threading-refactor consumer-timeout)

> Prune the event queue if event timeout expired before starting
> --
>
> Key: KAFKA-16199
> URL: https://issues.apache.org/jira/browse/KAFKA-16199
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, consumer-timeout, timeout
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16008) Fix PlaintextConsumerTest.testMaxPollIntervalMs

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16008:
--
Labels: consumer-threading-refactor consumer-timeout timeout  (was: 
consumer-threading-refactor consumer-timeout)

> Fix PlaintextConsumerTest.testMaxPollIntervalMs
> ---
>
> Key: KAFKA-16008
> URL: https://issues.apache.org/jira/browse/KAFKA-16008
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, consumer-timeout, timeout
> Fix For: 3.8.0
>
>
> The integration test {{PlaintextConsumerTest.testMaxPollIntervalMs}} is 
> failing when using the {{AsyncKafkaConsumer}}.
> The error is:
> {code}
> org.opentest4j.AssertionFailedError: Timed out before expected rebalance 
> completed
>     at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
> at org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
> at 
> kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317)
> at 
> kafka.api.PlaintextConsumerTest.testMaxPollIntervalMs(PlaintextConsumerTest.scala:194)
> {code}
> The logs include this line:
>  
> {code}
> [2023-12-13 15:11:16,134] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> {code} 
> I don't know if that's related or not.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16200) Ensure RequestManager handling of expired timeouts are consistent

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16200:
--
Labels: consumer-threading-refactor consumer-timeout timeout  (was: 
consumer-threading-refactor consumer-timeout)

> Ensure RequestManager handling of expired timeouts are consistent
> -
>
> Key: KAFKA-16200
> URL: https://issues.apache.org/jira/browse/KAFKA-16200
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, consumer-timeout, timeout
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15974) Enforce that events and requests respect user-provided timeout

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15974:
--
Labels: consumer-threading-refactor consumer-timeout kip-848-client-support 
timeout  (was: consumer-threading-refactor consumer-timeout 
kip-848-client-support)

> Enforce that events and requests respect user-provided timeout
> --
>
> Key: KAFKA-15974
> URL: https://issues.apache.org/jira/browse/KAFKA-15974
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor, consumer-timeout, 
> kip-848-client-support, timeout
> Fix For: 3.8.0
>
>
> The intention of the {{CompletableApplicationEvent}} is for a {{Consumer}} to 
> block waiting for the event to complete. The application thread will block 
> for the timeout, but there is not yet a consistent manner in which events are 
> timed out.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16010) Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16010:
--
Labels: consumer-threading-refactor consumer-timeout kip-848-client-support 
timeout  (was: consumer-threading-refactor consumer-timeout 
kip-848-client-support)

> Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling
> --
>
> Key: KAFKA-16010
> URL: https://issues.apache.org/jira/browse/KAFKA-16010
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, consumer-timeout, 
> kip-848-client-support, timeout
> Fix For: 3.8.0
>
>
> The integration test 
> {{PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling}} is 
> failing when using the {{AsyncKafkaConsumer}}.
> The error is:
> {code}
> org.opentest4j.AssertionFailedError: Did not get valid assignment for 
> partitions [topic1-2, topic1-4, topic-1, topic-0, topic1-5, topic1-1, 
> topic1-0, topic1-3] after one consumer left
>   at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
>   at org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
>   at 
> kafka.api.AbstractConsumerTest.validateGroupAssignment(AbstractConsumerTest.scala:286)
>   at 
> kafka.api.PlaintextConsumerTest.runMultiConsumerSessionTimeoutTest(PlaintextConsumerTest.scala:1883)
>   at 
> kafka.api.PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling(PlaintextConsumerTest.scala:1281)
> {code}
> The logs include these lines:
>  
> {code}
> [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> {code} 
> I don't know if that's related or not.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15848) Consumer API timeout inconsistent between ConsumerDelegate implementations

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15848:
--
Labels: consumer-threading-refactor consumer-timeout kip-848-client-support 
timeout  (was: consumer-threading-refactor consumer-timeout 
kip-848-client-support)

> Consumer API timeout inconsistent between ConsumerDelegate implementations
> --
>
> Key: KAFKA-15848
> URL: https://issues.apache.org/jira/browse/KAFKA-15848
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, consumer-timeout, 
> kip-848-client-support, timeout
> Fix For: 3.8.0
>
>
> The two {{ConsumerDelegate}} implementations ({{{}LegacyKafkaConsumer{}}} and 
> {{{}AsyncKafkaConsumer{}}}) have a fundamental difference related to their 
> use and interpretation of the {{Timer}} that is supplied.
> h3. tl;dr
> {{AsyncKafkaConsumer}} is very literal about the timeout, whereas 
> {{LegacyKafkaConsumer}} seems to give a little wiggle room.
> {{LegacyKafkaConsumer}} is structured so that the logic it uses can check for 
> success of its operations _before_ checking the timer:
>  # Submit operation asynchronously
>  # Wait for operation to complete using {{NetworkClient.poll()}}
>  # Check for result
>  ## If successful, return success
>  ## If fatal failure, return failure
>  # Check timer
>  ## If timer expired, return failure
> {{AsyncKafkaConsumer}} uses {{Future.get()}} to wait for its operations:
>  # Submit operation asynchronously
>  # Wait for operation to complete using {{Future.get()}}
>  ## If operation timed out, {{Future.get()}} will throw a timeout error
>  # Check for result
>  ## If successful, return success
>  ## Otherwise, return failure
> h3. How to reproduce
> This causes subtle timing issues, but they can be easily reproduced via any 
> of the {{KafkaConsumerTest}} unit tests that invoke the {{consumer.poll(0)}} 
> API. Here's a bit of code that illustrates the difference between the two 
> approaches.
> {{LegacyKafkaConsumer}} performs a lot of its network I/O operations in a 
> manner similar to this:
> {code:java}
> public int getCount(Timer timer) {
> do {
> final RequestFuture future = sendSomeRequest(partitions);
> client.poll(future, timer);
> if (future.isDone())
> return future.get();
> } while (timer.notExpired());
> return -1;
> }
> {code}
> {{AsyncKafkaConsumer}} has similar logic, but it is structured like this:
> {code:java}
> private int getCount(Timer timer) {
> try {
> CompletableFuture future = new CompleteableFuture<>();
> applicationEventQueue.add(future);
> return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
> } catch (TimeoutException e) {
> return -1;
> }
> }
> {code}
> The call to {{add}} enqueues the network operation, but it then _immediately_ 
> invokes {{Future.get()}} with the timeout to implement a time-bounded 
> blocking call. Since this method is being called with a timeout of 0, it 
> _immediately_ throws a {{{}TimeoutException{}}}. 
> h3. Suggested fix
> TBD :(



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16208) Design new Consumer timeout policy

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16208:
--
Labels: consumer-threading-refactor consumer-timeout timeout  (was: 
consumer-threading-refactor consumer-timeout)

> Design new Consumer timeout policy
> --
>
> Key: KAFKA-16208
> URL: https://issues.apache.org/jira/browse/KAFKA-16208
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer, documentation
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, consumer-timeout, timeout
> Fix For: 3.8.0
>
>
> This task is to design and document the timeout policy for the new Consumer 
> implementation.
> The documentation lives here: 
> https://cwiki.apache.org/confluence/display/KAFKA/Java+client+Consumer+timeouts



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15558) Determine if Timer should be used elsewhere in PrototypeAsyncConsumer.updateFetchPositions()

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15558:
--
Labels: consumer-threading-refactor consumer-timeout fetcher timeout  (was: 
consumer-threading-refactor consumer-timeout fetcher)

> Determine if Timer should be used elsewhere in 
> PrototypeAsyncConsumer.updateFetchPositions()
> 
>
> Key: KAFKA-15558
> URL: https://issues.apache.org/jira/browse/KAFKA-15558
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: consumer-threading-refactor, consumer-timeout, fetcher, 
> timeout
> Fix For: 3.8.0
>
>
> This is a followup ticket based on a question from [~junrao] when reviewing 
> the [fetch request manager pull 
> request|https://github.com/apache/kafka/pull/14406]:
> {quote}It still seems weird that we only use the timer for 
> {{{}refreshCommittedOffsetsIfNeeded{}}}, but not for other cases where we 
> don't have valid fetch positions. For example, if all partitions are in 
> {{AWAIT_VALIDATION}} state, it seems that {{PrototypeAsyncConsumer.poll()}} 
> will just go in a busy loop, which is not efficient.
> {quote}
> The goal here is to determine if we should also be propagating the Timer to 
> the validate positions and reset positions operations.
> Note: we should also investigate if the existing {{KafkaConsumer}} 
> implementation should be fixed, too.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16111) Implement tests for tricky rebalance callback scenarios

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16111:
--
Labels: callback consumer-threading-refactor kip-848-client-support  (was: 
consumer-callbacks consumer-threading-refactor kip-848-client-support)

> Implement tests for tricky rebalance callback scenarios
> ---
>
> Key: KAFKA-16111
> URL: https://issues.apache.org/jira/browse/KAFKA-16111
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: callback, consumer-threading-refactor, 
> kip-848-client-support
> Fix For: 3.8.0
>
>
> There is justified concern that the new threading model may not play well 
> with "tricky" {{ConsumerRebalanceListener}} callbacks. We need to provide 
> some assurance that it will support complicated patterns.
>  # Design and implement test scenarios
>  # Update and document any design changes with the callback sub-system where 
> needed
>  # Provide fix(es) to the {{AsyncKafkaConsumer}} implementation to abide by 
> said design



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15615) Improve handling of fetching during metadata updates

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15615:
--
Labels: consumer-threading-refactor fetcher  (was: 
consumer-threading-refactor kip-848-preview)

> Improve handling of fetching during metadata updates
> 
>
> Key: KAFKA-15615
> URL: https://issues.apache.org/jira/browse/KAFKA-15615
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, fetcher
> Fix For: 3.8.0
>
>
> [During a review of the new 
> fetcher|https://github.com/apache/kafka/pull/14406#discussion_r193941], 
> [~junrao] found what appears to be an opportunity for optimization.
> When a fetch response receives an error about partition leadership, fencing, 
> etc. a metadata refresh is triggered. However, it takes time for that refresh 
> to occur, and in the interim, it appears that the consumer will blindly 
> attempt to fetch data for the partition again, in kind of a "definition of 
> insanity" type of way. Ideally, the consumer would have a way to temporarily 
> ignore those partitions, in a way somewhat like the "pausing" approach so 
> that they are skipped until the metadata refresh response is fully processed.
> This affects both the existing KafkaConsumer and the new 
> PrototypeAsyncConsumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16029) Investigate cause of "Unable to find FetchSessionHandler for node X" in logs

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16029:
--
Labels: consumer-threading-refactor fetcher  (was: 
consumer-threading-refactor)

> Investigate cause of "Unable to find FetchSessionHandler for node X" in logs
> 
>
> Key: KAFKA-16029
> URL: https://issues.apache.org/jira/browse/KAFKA-16029
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor, fetcher
> Fix For: 3.7.0
>
>
> From [~mjsax]:
> {quote}Looking into AK unit/integration test logs for KS, I regularly see an 
> ERROR log line that is triggered here:
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java#L150-L151]
> Given that it seems not to impact the test (it's not failing because of 
> this), I am wondering why we log this at ERROR level or if it might be better 
> to reduce to WARN? It seems to happen fairly frequently, but it also seems 
> that it's nothing one would need to be concerned about, and thus using ERROR 
> might be more alerting to end users than it needs to be? Thoughts?
> {quote}
> According to Matthias, the running the {{EosIntegrationTest}} locally 
> reproduces the log line.
> This is also reproducible by running the Apache Kafka quickstart.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15558) Determine if Timer should be used elsewhere in PrototypeAsyncConsumer.updateFetchPositions()

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15558:
--
Labels: consumer-threading-refactor consumer-timeout fetcher  (was: 
consumer-threading-refactor consumer-timeout)

> Determine if Timer should be used elsewhere in 
> PrototypeAsyncConsumer.updateFetchPositions()
> 
>
> Key: KAFKA-15558
> URL: https://issues.apache.org/jira/browse/KAFKA-15558
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: consumer-threading-refactor, consumer-timeout, fetcher
> Fix For: 3.8.0
>
>
> This is a followup ticket based on a question from [~junrao] when reviewing 
> the [fetch request manager pull 
> request|https://github.com/apache/kafka/pull/14406]:
> {quote}It still seems weird that we only use the timer for 
> {{{}refreshCommittedOffsetsIfNeeded{}}}, but not for other cases where we 
> don't have valid fetch positions. For example, if all partitions are in 
> {{AWAIT_VALIDATION}} state, it seems that {{PrototypeAsyncConsumer.poll()}} 
> will just go in a busy loop, which is not efficient.
> {quote}
> The goal here is to determine if we should also be propagating the Timer to 
> the validate positions and reset positions operations.
> Note: we should also investigate if the existing {{KafkaConsumer}} 
> implementation should be fixed, too.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16111) Implement tests for tricky rebalance callback scenarios

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16111:
--
Labels: consumer-callbacks consumer-threading-refactor 
kip-848-client-support  (was: consumer-threading-refactor 
kip-848-client-support)

> Implement tests for tricky rebalance callback scenarios
> ---
>
> Key: KAFKA-16111
> URL: https://issues.apache.org/jira/browse/KAFKA-16111
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: consumer-callbacks, consumer-threading-refactor, 
> kip-848-client-support
> Fix For: 3.8.0
>
>
> There is justified concern that the new threading model may not play well 
> with "tricky" {{ConsumerRebalanceListener}} callbacks. We need to provide 
> some assurance that it will support complicated patterns.
>  # Design and implement test scenarios
>  # Update and document any design changes with the callback sub-system where 
> needed
>  # Provide fix(es) to the {{AsyncKafkaConsumer}} implementation to abide by 
> said design



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16204: Create partition dir for mockLog [kafka]

2024-01-30 Thread via GitHub


stanislavkozlovski commented on PR #15288:
URL: https://github.com/apache/kafka/pull/15288#issuecomment-1917467132

   Can we add some detail as to what we benefit from backporting? Does it make 
some of the blocker fixes easier?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16107 [2/N]: Fix inverted args & more tests [kafka]

2024-01-30 Thread via GitHub


lianetm commented on PR #15285:
URL: https://github.com/apache/kafka/pull/15285#issuecomment-1917462793

   Hey @dajac , comment addressed and rebased, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16107 [2/N]: Fix inverted args & more tests [kafka]

2024-01-30 Thread via GitHub


lianetm commented on code in PR #15285:
URL: https://github.com/apache/kafka/pull/15285#discussion_r1471590270


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -1160,7 +1160,7 @@ private CompletableFuture assignPartitions(
 
 // Invoke user call back.
 CompletableFuture result = 
invokeOnPartitionsAssignedCallback(addedPartitions);
-result.whenComplete((error, callbackResult) -> {
+result.whenComplete((callbackResult, error) -> {

Review Comment:
   Sure, done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15558) Determine if Timer should be used elsewhere in PrototypeAsyncConsumer.updateFetchPositions()

2024-01-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15558:
--
Labels: consumer-threading-refactor consumer-timeout  (was: 
consumer-threading-refactor kip-848-preview)

> Determine if Timer should be used elsewhere in 
> PrototypeAsyncConsumer.updateFetchPositions()
> 
>
> Key: KAFKA-15558
> URL: https://issues.apache.org/jira/browse/KAFKA-15558
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: consumer-threading-refactor, consumer-timeout
> Fix For: 3.8.0
>
>
> This is a followup ticket based on a question from [~junrao] when reviewing 
> the [fetch request manager pull 
> request|https://github.com/apache/kafka/pull/14406]:
> {quote}It still seems weird that we only use the timer for 
> {{{}refreshCommittedOffsetsIfNeeded{}}}, but not for other cases where we 
> don't have valid fetch positions. For example, if all partitions are in 
> {{AWAIT_VALIDATION}} state, it seems that {{PrototypeAsyncConsumer.poll()}} 
> will just go in a busy loop, which is not efficient.
> {quote}
> The goal here is to determine if we should also be propagating the Timer to 
> the validate positions and reset positions operations.
> Note: we should also investigate if the existing {{KafkaConsumer}} 
> implementation should be fixed, too.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15675: Improve worker liveness check during Connect integration tests [kafka]

2024-01-30 Thread via GitHub


C0urante commented on PR #15249:
URL: https://github.com/apache/kafka/pull/15249#issuecomment-1917431710

   The [third CI 
run](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15249/8/tests/)
 also contained no failures for the `ConnectorRestartApiIntegrationTest` suite. 
Kicking off a fourth. I think I'll mark this ready for review if/when it 
reaches five consecutive successful CI runs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



<    1   2   3   >