[jira] [Updated] (KAFKA-16142) Update metrics documentation for errors
[ https://issues.apache.org/jira/browse/KAFKA-16142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16142: -- Fix Version/s: (was: 4.0.0) > Update metrics documentation for errors > --- > > Key: KAFKA-16142 > URL: https://issues.apache.org/jira/browse/KAFKA-16142 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer, documentation, metrics >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor, metrics > > We need to identify the “errors” that exist in the current JMX documentation > and resolve them. Per [~pnee] there are errors on the JMX web page, which he > will identify and resolve. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15556) Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect
[ https://issues.apache.org/jira/browse/KAFKA-15556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15556: -- Fix Version/s: (was: 4.0.0) > Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, > and tryConnect > - > > Key: KAFKA-15556 > URL: https://issues.apache.org/jira/browse/KAFKA-15556 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Phuc Hong Tran >Priority: Minor > Labels: consumer-threading-refactor > > The "new consumer" (i.e. {{{}PrototypeAsyncConsumer{}}}) was designed to > handle networking details in a more centralized way. However, in order to > reuse code between the existing {{KafkaConsumer}} and the new > {{{}PrototypeAsyncConsumer{}}}, that design goal was "relaxed" when the > {{NetworkClientDelegate}} capitulated and -stole- copied three methods from > {{ConsumerNetworkClient}} related to detecting node status: > # {{isUnavailable}} > # {{maybeThrowAuthFailure}} > # {{tryConnect}} > Unfortunately, these have found their way into the {{FetchRequestManager}} > and {{OffsetsRequestManager}} implementations. We should review if we can > clean up—or even remove—this leaky abstraction. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15867) Should ConsumerNetworkThread wrap the exception and notify the polling thread?
[ https://issues.apache.org/jira/browse/KAFKA-15867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15867: -- Fix Version/s: (was: 4.0.0) > Should ConsumerNetworkThread wrap the exception and notify the polling thread? > -- > > Key: KAFKA-15867 > URL: https://issues.apache.org/jira/browse/KAFKA-15867 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Phuc Hong Tran >Priority: Minor > Labels: consumer-threading-refactor, events > > The ConsumerNetworkThread runs a tight loop infinitely. However, when > encountering an unexpected exception, it logs an error and continues. > > I think this might not be ideal because user can run blind for a long time > before discovering there's something wrong with the code; so I believe we > should propagate the throwable back to the polling thread. > > cc [~lucasbru] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16799) NetworkClientDelegate is not backing off if the node is not found
[ https://issues.apache.org/jira/browse/KAFKA-16799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16799: -- Priority: Major (was: Blocker) > NetworkClientDelegate is not backing off if the node is not found > - > > Key: KAFKA-16799 > URL: https://issues.apache.org/jira/browse/KAFKA-16799 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > > When performing stress testing, I found that AsycnKafkaConsumer's network > client delegate isn't backing off if the node is not ready, causing a large > number of: > {code:java} > 358 [2024-05-20 22:59:02,591] DEBUG [Consumer > clientId=consumer.7136899e-0c20-4ccb-8ba3-497e9e683594-0, > groupId=consumer-groups-test-5] Node is not ready, handle the request in the > next event loop: node=b4-pkc-devcmkz697.us-west-2.aws.devel.cpd > ev.cloud:9092 (id: 2147483643 rack: null), > request=UnsentRequest{requestBuilder=ConsumerGroupHeartbeatRequestData(groupId='consumer-groups-test-5', > memberId='', memberEpoch=0, instanceId=null, rackId=null, > rebalanceTimeoutMs=10, subscri > bedTopicNames=[_kengine-565-test-topic8081], serverAssignor=null, > topicPartitions=[]), > handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@139a8761, > node=Optional[b4-pkc-devcmkz697.us-west-2.aws .devel.cpdev.cloud:9092 > (id: 2147483643 rack: null)], > timer=org.apache.kafka.common.utils.Timer@649fffad} > (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate:169) {code} > show up in the log. > What should have happened is: 1. node is not ready 2. exponential back off 3. > retry -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16315) Investigate propagating metadata updates via queues
[ https://issues.apache.org/jira/browse/KAFKA-16315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16315: -- Fix Version/s: (was: 4.0.0) > Investigate propagating metadata updates via queues > --- > > Key: KAFKA-16315 > URL: https://issues.apache.org/jira/browse/KAFKA-16315 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Priority: Major > Labels: consumer-threading-refactor > > Some of the new {{AsyncKafkaConsumer}} logic enqueues events for the network > I/O thread then issues a call to update the {{ConsumerMetadata}} via > {{requestUpdate()}}. If the event ends up stuck in the queue for a long time, > it is possible that the metadata is not updated at the correct time. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15321) Document consumer group member state machine
[ https://issues.apache.org/jira/browse/KAFKA-15321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15321: -- Fix Version/s: (was: 4.0.0) > Document consumer group member state machine > > > Key: KAFKA-15321 > URL: https://issues.apache.org/jira/browse/KAFKA-15321 > Project: Kafka > Issue Type: Task > Components: clients, consumer, documentation >Reporter: Kirk True >Assignee: Colin McCabe >Priority: Minor > Labels: kip-848-client-support, reconciliation > > We need to first document the new consumer group member state machine. What > are the different states and what are the transitions? > See [~pnee]'s notes: > [https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design] > *_Don’t forget to include diagrams for clarity!_* > This should be documented on the AK wiki. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15652) Add tests to verify OffsetFetcherUtils.getOffsetResetTimestamp()
[ https://issues.apache.org/jira/browse/KAFKA-15652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15652: -- Fix Version/s: (was: 4.0.0) > Add tests to verify OffsetFetcherUtils.getOffsetResetTimestamp() > > > Key: KAFKA-15652 > URL: https://issues.apache.org/jira/browse/KAFKA-15652 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor, position, unit-tests > > In the {{updateFetchPositions()}} method implementation, both > {{KafkaConsumer}} and {{PrototypeAsyncConsumer}} reset positions > asynchronously. [~junrao] stated the following in a [recent PR > review|https://github.com/apache/kafka/pull/14406#discussion_r1349173413]: > {quote}There is a subtle difference between transitioning to reset from > initializing and transitioning to reset from {{OffsetOutOfRangeException}} > during fetch. In the latter, the application thread will call > {{{}FetchCollector.handleInitializeErrors(){}}}. If there is no default > offset reset policy, an {{OffsetOutOfRangeException}} will be thrown to the > application thread during {{{}poll{}}}, which is what we want. > However, for the former, if there is no default offset reset policy, we > simply ignore that partition through > {{{}OffsetFetcherUtils.getOffsetResetTimestamp{}}}. It seems in that case, > the partition will be forever in the reset state and the application thread > won't get the {{{}OffsetOutOfRangeException{}}}. > {quote} > I intentionally changed the code so that no exceptions were thrown in > {{OffsetFetcherUtils.getOffsetResetTimestamp()}} and would simply return an > empty map. When I ran the unit tests and integration tests, there were no > failures, strongly suggesting that there is no coverage of this particular > edge case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16966) Allow offset commit fetch to reuse previous request if partitions are a subset
[ https://issues.apache.org/jira/browse/KAFKA-16966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16966: -- Fix Version/s: (was: 4.0.0) > Allow offset commit fetch to reuse previous request if partitions are a subset > -- > > Key: KAFKA-16966 > URL: https://issues.apache.org/jira/browse/KAFKA-16966 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Affects Versions: 3.8.0 >Reporter: Kirk True >Assignee: TaiJuWu >Priority: Minor > Labels: consumer-threading-refactor > > In {{{}initWithCommittedOffsetsIfNeeded{}}}, the behavior of the existing > {{LegacyKafkaConsumer}} is to allow reuse only if the partitions for the > _current_ request equal those of the _previous_ request *exactly* > ([source|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java?rgh-link-date=2024-06-14T16%3A43%3A11Z#L927]). > That logic is the basis for the behavior used in the > {{{}AsyncKafkaConsumer{}}}. > The proposed change is to allow for request reuse if the partitions for the > _current_ request are a subset of those of the _previous_ request. This > introduces a subtle difference in behavior between the two {{Consumer}} > implementations, so we need to decided if we want to change both > implementations or just {{{}AsyncKafkaConsumer{}}}. Also, the specific case > that the request reuse logic solves is when the user has passed in a very low > timeout value in a tight {{poll()}} loop, which suggests the partitions > wouldn't be changing between those loops. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15173) Consumer event queues should be bounded
[ https://issues.apache.org/jira/browse/KAFKA-15173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15173: -- Fix Version/s: (was: 4.0.0) > Consumer event queues should be bounded > --- > > Key: KAFKA-15173 > URL: https://issues.apache.org/jira/browse/KAFKA-15173 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor, events > > The async consumer uses ApplicationEventQueue and BackgroundEventQueue to > facilitate message passing between the application thread and the background > thread. The current implementation is boundless, which can potentially cause > OOM and other performance-related issues. > I think the queues need a finite bound, and we need to decide how to handle > the situation when the bound is reached. In particular, I would like to > answer these questions: > > # What should the upper limit be for both queues: Can this be a > configurable, memory-based bound? Or just an arbitrary number of events as > the bound. > # What should happen when the application event queue is filled up? It > seems like we should introduce a new exception type and notify the user that > the consumer is full. > # What should happen when the background event queue is filled up? This > seems less likely to happen, but I imagine it could happen when the user > stops polling the consumer, causing the queue to be filled. > # Is it necessary to introduce a public configuration for the queue? I think > initially we would select an arbitrary constant number and see the community > feedback to make a forward plan accordingly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16109) Write system tests cover the "simple consumer + commit" use case
[ https://issues.apache.org/jira/browse/KAFKA-16109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16109: -- Fix Version/s: (was: 4.0.0) > Write system tests cover the "simple consumer + commit" use case > > > Key: KAFKA-16109 > URL: https://issues.apache.org/jira/browse/KAFKA-16109 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Reporter: Kirk True >Assignee: PoAn Yang >Priority: Major > Labels: consumer-threading-refactor, system-tests > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16111) Implement tests for tricky rebalance callback scenarios
[ https://issues.apache.org/jira/browse/KAFKA-16111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16111: -- Fix Version/s: (was: 4.0.0) > Implement tests for tricky rebalance callback scenarios > --- > > Key: KAFKA-16111 > URL: https://issues.apache.org/jira/browse/KAFKA-16111 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Priority: Minor > Labels: callback, consumer-threading-refactor, integration-tests > > There is justified concern that the new threading model may not play well > with "tricky" {{ConsumerRebalanceListener}} callbacks. We need to provide > some assurance that it will support complicated patterns. > # Design and implement test scenarios > # Update and document any design changes with the callback sub-system where > needed > # Provide fix(es) to the {{AsyncKafkaConsumer}} implementation to abide by > said design -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15551) Evaluate conditions for short circuiting consumer API calls
[ https://issues.apache.org/jira/browse/KAFKA-15551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15551: -- Fix Version/s: (was: 4.0.0) > Evaluate conditions for short circuiting consumer API calls > --- > > Key: KAFKA-15551 > URL: https://issues.apache.org/jira/browse/KAFKA-15551 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, events > > For conditions like: > * Committing empty offset > * Fetching offsets for empty partitions > * Getting empty topic partition position > Should be short circuit possibly at the API level. > As a bonus, we should double-check whether the existing {{KafkaConsumer}} > implementation suffers from this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15617) Investigate FetcherTest's/FetchRequestManager's testFetchingPendingPartitions and testInflightFetchOnPendingPartitions overlap
[ https://issues.apache.org/jira/browse/KAFKA-15617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15617: -- Fix Version/s: (was: 4.0.0) > Investigate FetcherTest's/FetchRequestManager's testFetchingPendingPartitions > and testInflightFetchOnPendingPartitions overlap > -- > > Key: KAFKA-15617 > URL: https://issues.apache.org/jira/browse/KAFKA-15617 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, fetcher, unit-tests > > In FetcherTest, the two tests testFetchingPendingPartitions and > testInflightFetchOnPendingPartitions have significant overlap. Perhaps the > former subsumes the latter? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16799) NetworkClientDelegate is not backing off if the node is not found
[ https://issues.apache.org/jira/browse/KAFKA-16799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16799: -- Fix Version/s: (was: 4.0.0) > NetworkClientDelegate is not backing off if the node is not found > - > > Key: KAFKA-16799 > URL: https://issues.apache.org/jira/browse/KAFKA-16799 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Blocker > Labels: consumer-threading-refactor > > When performing stress testing, I found that AsycnKafkaConsumer's network > client delegate isn't backing off if the node is not ready, causing a large > number of: > {code:java} > 358 [2024-05-20 22:59:02,591] DEBUG [Consumer > clientId=consumer.7136899e-0c20-4ccb-8ba3-497e9e683594-0, > groupId=consumer-groups-test-5] Node is not ready, handle the request in the > next event loop: node=b4-pkc-devcmkz697.us-west-2.aws.devel.cpd > ev.cloud:9092 (id: 2147483643 rack: null), > request=UnsentRequest{requestBuilder=ConsumerGroupHeartbeatRequestData(groupId='consumer-groups-test-5', > memberId='', memberEpoch=0, instanceId=null, rackId=null, > rebalanceTimeoutMs=10, subscri > bedTopicNames=[_kengine-565-test-topic8081], serverAssignor=null, > topicPartitions=[]), > handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@139a8761, > node=Optional[b4-pkc-devcmkz697.us-west-2.aws .devel.cpdev.cloud:9092 > (id: 2147483643 rack: null)], > timer=org.apache.kafka.common.utils.Timer@649fffad} > (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate:169) {code} > show up in the log. > What should have happened is: 1. node is not ready 2. exponential back off 3. > retry -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16110) Document and publicize performance test results for AsyncKafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-16110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16110: -- Fix Version/s: (was: 4.0.0) > Document and publicize performance test results for AsyncKafkaConsumer > -- > > Key: KAFKA-16110 > URL: https://issues.apache.org/jira/browse/KAFKA-16110 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Kirk True >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, performance-benchmark > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15637) Investigate FetcherTest's/FetchRequestManager's testFetchCompletedBeforeHandlerAdded
[ https://issues.apache.org/jira/browse/KAFKA-15637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15637: -- Fix Version/s: (was: 4.0.0) > Investigate FetcherTest's/FetchRequestManager's > testFetchCompletedBeforeHandlerAdded > > > Key: KAFKA-15637 > URL: https://issues.apache.org/jira/browse/KAFKA-15637 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, fetcher, unit-tests > > Thanks for the reply. I still don't quite understand the test. Why do we > duplicate the following code both inside and outside of {{{}setWakeupHook{}}}? > > {code:java} > networkClientDelegate.disconnectAsync(readReplica); > networkClientDelegate.poll(time.timer(0)); > {code} > > MockClient is only woken up through > {{{}networkClientDelegate.disconnectAsync{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15639) Investigate ConsumerNetworkThreadTest's testResetPositionsProcessFailureIsIgnored
[ https://issues.apache.org/jira/browse/KAFKA-15639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15639: -- Fix Version/s: (was: 4.0.0) > Investigate ConsumerNetworkThreadTest's > testResetPositionsProcessFailureIsIgnored > - > > Key: KAFKA-15639 > URL: https://issues.apache.org/jira/browse/KAFKA-15639 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, unit-tests > > The {{testResetPositionsProcessFailureIsIgnored}} test looks like this: > > {code:java} > @Test > public void testResetPositionsProcessFailureIsIgnored() { > doThrow(new > NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded(); > ResetPositionsApplicationEvent event = new > ResetPositionsApplicationEvent(); > applicationEventsQueue.add(event); > assertDoesNotThrow(() -> consumerNetworkThread.runOnce()); > > verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class)); > } > {code} > > [~junrao] asks: > > {quote}Not sure if this is a useful test since > {{offsetsRequestManager.resetPositionsIfNeeded()}} seems to never directly > throw an exception? > {quote} > > I commented out the {{doThrow}} line and it did not impact the test. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15638) Investigate ConsumerNetworkThreadTest's testPollResultTimer
[ https://issues.apache.org/jira/browse/KAFKA-15638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15638: -- Fix Version/s: (was: 4.0.0) > Investigate ConsumerNetworkThreadTest's testPollResultTimer > --- > > Key: KAFKA-15638 > URL: https://issues.apache.org/jira/browse/KAFKA-15638 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, timeout, unit-tests > > Regarding this comment in {{{}testPollResultTimer{}}}... > {code:java} > // purposely setting a non-MAX time to ensure it is returning Long.MAX_VALUE > upon success| > {code} > [~junrao] asked: > {quote}Which call is returning Long.MAX_VALUE? > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15615) Improve handling of fetching during metadata updates
[ https://issues.apache.org/jira/browse/KAFKA-15615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15615: -- Fix Version/s: (was: 4.0.0) > Improve handling of fetching during metadata updates > > > Key: KAFKA-15615 > URL: https://issues.apache.org/jira/browse/KAFKA-15615 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Affects Versions: 3.8.0 >Reporter: Kirk True >Assignee: appchemist >Priority: Major > Labels: consumer-threading-refactor, fetcher > > [During a review of the new > fetcher|https://github.com/apache/kafka/pull/14406#discussion_r193941], > [~junrao] found what appears to be an opportunity for optimization. > When a fetch response receives an error about partition leadership, fencing, > etc. a metadata refresh is triggered. However, it takes time for that refresh > to occur, and in the interim, it appears that the consumer will blindly > attempt to fetch data for the partition again, in kind of a "definition of > insanity" type of way. Ideally, the consumer would have a way to temporarily > ignore those partitions, in a way somewhat like the "pausing" approach so > that they are skipped until the metadata refresh response is fully processed. > This affects both the existing KafkaConsumer and the new > PrototypeAsyncConsumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15320) Document event queueing patterns
[ https://issues.apache.org/jira/browse/KAFKA-15320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15320: -- Fix Version/s: (was: 4.0.0) > Document event queueing patterns > > > Key: KAFKA-15320 > URL: https://issues.apache.org/jira/browse/KAFKA-15320 > Project: Kafka > Issue Type: Task > Components: clients, consumer, documentation >Reporter: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, events > > We need to first document the event enqueuing patterns in the > PrototypeAsyncConsumer. As part of this task, determine if it’s > necessary/beneficial to _conditionally_ add events and/or coalesce any > duplicate events in the queue. > _Don’t forget to include diagrams for clarity!_ > This should be documented on the AK wiki. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15635) Investigate FetcherTest's/FetchRequestManager's testFetcherLeadMetric
[ https://issues.apache.org/jira/browse/KAFKA-15635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15635: -- Fix Version/s: (was: 4.0.0) > Investigate FetcherTest's/FetchRequestManager's testFetcherLeadMetric > - > > Key: KAFKA-15635 > URL: https://issues.apache.org/jira/browse/KAFKA-15635 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, fetcher, unit-tests > > Why is {{recordsFetchLeadMin}} different from {{partitionLead}} given there > is only 1 assigned partition? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15634) Investigate FetcherTest's/FetchRequestManager's testQuotaMetrics
[ https://issues.apache.org/jira/browse/KAFKA-15634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15634: -- Fix Version/s: (was: 4.0.0) > Investigate FetcherTest's/FetchRequestManager's testQuotaMetrics > > > Key: KAFKA-15634 > URL: https://issues.apache.org/jira/browse/KAFKA-15634 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, fetcher, unit-tests > > What is the point of the code in the initial {{while}} loop since the receive > is delayed and thus there is no {{throttleDelayMs}} received in the client? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15557) Investigate FetcherTest's/FetchRequestManager's duplicate metadata update in assignFromUserNoId
[ https://issues.apache.org/jira/browse/KAFKA-15557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15557: -- Fix Version/s: (was: 4.0.0) > Investigate FetcherTest's/FetchRequestManager's duplicate metadata update in > assignFromUserNoId > --- > > Key: KAFKA-15557 > URL: https://issues.apache.org/jira/browse/KAFKA-15557 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, fetcher, unit-tests > > The unit tests {{FetcherTest}} and {{FetchRequestManagerTest}} have methods > named {{assignFromUser()}} and {{assignFromUserNoId()}} that appear to > perform duplicate metadata updates: > {code:java} > private void assignFromUser(Set partitions) { > subscriptions.assignFromUser(partitions); > client.updateMetadata(initialUpdateResponse); > // A dummy metadata update to ensure valid leader epoch. > metadata.updateWithCurrentRequestVersion( > RequestTestUtils.metadataUpdateWithIds( > "dummy", > 1, > Collections.emptyMap(), > singletonMap(topicName, 4), > tp -> validLeaderEpoch, topicIds > ), > false, > 0L > ); > } > {code} > {{client.updateMetadata()}} eventually calls > {{metadata.updateWithCurrentRequestVersion()}}. Determine why the test is > updating the cluster metadata twice with different values. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16444) Run KIP-848 unit tests under code coverage
[ https://issues.apache.org/jira/browse/KAFKA-16444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16444: -- Fix Version/s: (was: 4.0.0) > Run KIP-848 unit tests under code coverage > -- > > Key: KAFKA-16444 > URL: https://issues.apache.org/jira/browse/KAFKA-16444 > Project: Kafka > Issue Type: Task > Components: clients, consumer, unit tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, kip-848-client-support > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16642) Update KafkaConsumerTest to show parameters in test lists
[ https://issues.apache.org/jira/browse/KAFKA-16642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16642: -- Fix Version/s: (was: 4.0.0) > Update KafkaConsumerTest to show parameters in test lists > - > > Key: KAFKA-16642 > URL: https://issues.apache.org/jira/browse/KAFKA-16642 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Chia-Ping Tsai >Priority: Minor > Labels: consumer-threading-refactor > > {{KafkaConsumerTest}} was recently updated to make many of its tests > parameterized to exercise both the {{CLASSIC}} and {{CONSUMER}} group > protocols. However, in some of the tools in which [lists of tests are > provided|https://ge.apache.org/scans/tests?search.names=Git%20branch&search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.timeZoneId=America%2FLos_Angeles&search.values=trunk&tests.container=org.apache.kafka.clients.consumer.KafkaConsumerTest&tests.sortField=FLAKY], > say, for analysis, the group protocol information is not exposed. For > example, one test ({{{}testReturnRecordsDuringRebalance{}}}) is flaky, but > it's difficult to know at a glance which group protocol is causing the > problem because the list simply shows: > {quote}{{testReturnRecordsDuringRebalance(GroupProtocol)[1]}} > {quote} > Ideally, it would expose more information, such as: > {quote}{{testReturnRecordsDuringRebalance(GroupProtocol=CONSUMER)}} > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15636) Investigate FetcherTest's/FetchRequestManager's testFetchResponseMetrics
[ https://issues.apache.org/jira/browse/KAFKA-15636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15636: -- Fix Version/s: (was: 4.0.0) > Investigate FetcherTest's/FetchRequestManager's testFetchResponseMetrics > > > Key: KAFKA-15636 > URL: https://issues.apache.org/jira/browse/KAFKA-15636 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, fetcher, unit-tests > > {{expectedBytes}} is calculated as total, instead of avg. Is this correct? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15606) Investigate FetcherTest's/FetchRequestManager's testCompletedFetchRemoval
[ https://issues.apache.org/jira/browse/KAFKA-15606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15606: -- Fix Version/s: (was: 4.0.0) > Investigate FetcherTest's/FetchRequestManager's testCompletedFetchRemoval > - > > Key: KAFKA-15606 > URL: https://issues.apache.org/jira/browse/KAFKA-15606 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Priority: Major > Labels: consumer-threading-refactor, fetcher, unit-tests > > As part of the review for [FetchRequestManager pull > request|https://github.com/apache/kafka/pull/14406], [~junrao] had some > questions related to the correctness and clarity of the > {{FetcherTest.testCompletedFetchRemoval()}} test: > Questions: > * https://github.com/apache/kafka/pull/14406#discussion_r1347908197 > * https://github.com/apache/kafka/pull/14406#discussion_r1347910980 > * https://github.com/apache/kafka/pull/14406#discussion_r1347913781 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16272) Update connect_distributed_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True resolved KAFKA-16272. --- Resolution: Fixed > Update connect_distributed_test.py to support KIP-848’s group protocol config > - > > Key: KAFKA-16272 > URL: https://issues.apache.org/jira/browse/KAFKA-16272 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Sagar Rao >Priority: Major > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in {{connect_distributed_test.py}} > to support the {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15908) Remove deprecated Consumer API poll(long timeout)
[ https://issues.apache.org/jira/browse/KAFKA-15908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-15908: - Assignee: (was: Kirk True) > Remove deprecated Consumer API poll(long timeout) > - > > Key: KAFKA-15908 > URL: https://issues.apache.org/jira/browse/KAFKA-15908 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, timeout > Fix For: 4.0.0 > > > Per > [KIP-266|https://cwiki.apache.org/confluence/display/KAFKA/KIP-266%3A+Fix+consumer+indefinite+blocking+behavior], > the {{Consumer.poll(long timeout)}} method was deprecated back in 2.0.0. > In 3.7, there are two implementations, each with different behavior: > * The {{LegacyKafkaConsumer}} implementation will continue to work but will > log a warning about its removal > * The {{AsyncKafkaConsumer}} implementation will throw an error. > In 4.0, the `poll` method that takes a single `long` timeout will be removed > altogether. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15908) Remove deprecated Consumer API poll(long timeout)
[ https://issues.apache.org/jira/browse/KAFKA-15908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17886780#comment-17886780 ] Kirk True commented on KAFKA-15908: --- I've unassigned myself, so whomever is willing and available can take it over. Thanks :) > Remove deprecated Consumer API poll(long timeout) > - > > Key: KAFKA-15908 > URL: https://issues.apache.org/jira/browse/KAFKA-15908 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, timeout > Fix For: 4.0.0 > > > Per > [KIP-266|https://cwiki.apache.org/confluence/display/KAFKA/KIP-266%3A+Fix+consumer+indefinite+blocking+behavior], > the {{Consumer.poll(long timeout)}} method was deprecated back in 2.0.0. > In 3.7, there are two implementations, each with different behavior: > * The {{LegacyKafkaConsumer}} implementation will continue to work but will > log a warning about its removal > * The {{AsyncKafkaConsumer}} implementation will throw an error. > In 4.0, the `poll` method that takes a single `long` timeout will be removed > altogether. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17686) AsyncKafkaConsumer.offsetsForTimes() fails with NullPointerException
[ https://issues.apache.org/jira/browse/KAFKA-17686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17686: -- Summary: AsyncKafkaConsumer.offsetsForTimes() fails with NullPointerException (was: AsyncKafkaConsumer.offsetsForTimes fails with NullPointerException) > AsyncKafkaConsumer.offsetsForTimes() fails with NullPointerException > > > Key: KAFKA-17686 > URL: https://issues.apache.org/jira/browse/KAFKA-17686 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Affects Versions: 3.9.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, integration-tests > Fix For: 4.0.0 > > > Error when running the integration test: > {noformat} > Gradle Test Run :core:integrationTest > Gradle Test Executor 10 > > PlaintextAdminIntegrationTest > testOffsetsForTimesAfterDeleteRecords(String) > > "testOffsetsForTimesAfterDeleteRecords(String).quorum=kraft" FAILED > java.lang.NullPointerException: Cannot invoke > "org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal.buildOffsetAndTimestamp()" > because the return value of "java.util.Map$Entry.getValue()" is null > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.lambda$offsetsForTimes$4(AsyncKafkaConsumer.java:1082) > at > java.base/java.util.stream.Collectors.lambda$uniqKeysMapAccumulator$1(Collectors.java:180) > at > java.base/java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) > at > java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1858) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) > at > java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) > > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.offsetsForTimes(AsyncKafkaConsumer.java:1080) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.offsetsForTimes(AsyncKafkaConsumer.java:1043) > at > org.apache.kafka.clients.consumer.KafkaConsumer.offsetsForTimes(KafkaConsumer.java:1560) > at > kafka.api.PlaintextAdminIntegrationTest.testOffsetsForTimesAfterDeleteRecords(PlaintextAdminIntegrationTest.scala:1535) > > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17686) AsyncKafkaConsumer.offsetsForTimes fails with NullPointerException
[ https://issues.apache.org/jira/browse/KAFKA-17686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17686: -- Summary: AsyncKafkaConsumer.offsetsForTimes fails with NullPointerException (was: PlaintextAdminIntegrationTest’s testOffsetsForTimesAfterDeleteRecords fails with NPE with new consumer) > AsyncKafkaConsumer.offsetsForTimes fails with NullPointerException > -- > > Key: KAFKA-17686 > URL: https://issues.apache.org/jira/browse/KAFKA-17686 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Affects Versions: 3.9.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, integration-tests > Fix For: 4.0.0 > > > Error when running the integration test: > {noformat} > Gradle Test Run :core:integrationTest > Gradle Test Executor 10 > > PlaintextAdminIntegrationTest > testOffsetsForTimesAfterDeleteRecords(String) > > "testOffsetsForTimesAfterDeleteRecords(String).quorum=kraft" FAILED > java.lang.NullPointerException: Cannot invoke > "org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal.buildOffsetAndTimestamp()" > because the return value of "java.util.Map$Entry.getValue()" is null > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.lambda$offsetsForTimes$4(AsyncKafkaConsumer.java:1082) > at > java.base/java.util.stream.Collectors.lambda$uniqKeysMapAccumulator$1(Collectors.java:180) > at > java.base/java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) > at > java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1858) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) > at > java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) > > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.offsetsForTimes(AsyncKafkaConsumer.java:1080) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.offsetsForTimes(AsyncKafkaConsumer.java:1043) > at > org.apache.kafka.clients.consumer.KafkaConsumer.offsetsForTimes(KafkaConsumer.java:1560) > at > kafka.api.PlaintextAdminIntegrationTest.testOffsetsForTimesAfterDeleteRecords(PlaintextAdminIntegrationTest.scala:1535) > > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17686) PlaintextAdminIntegrationTest’s testOffsetsForTimesAfterDeleteRecords fails with NPE with new consumer
Kirk True created KAFKA-17686: - Summary: PlaintextAdminIntegrationTest’s testOffsetsForTimesAfterDeleteRecords fails with NPE with new consumer Key: KAFKA-17686 URL: https://issues.apache.org/jira/browse/KAFKA-17686 Project: Kafka Issue Type: Improvement Components: clients, consumer Affects Versions: 3.9.0 Reporter: Kirk True Assignee: Kirk True Fix For: 4.0.0 Error when running the integration test: {noformat} Gradle Test Run :core:integrationTest > Gradle Test Executor 10 > PlaintextAdminIntegrationTest > testOffsetsForTimesAfterDeleteRecords(String) > "testOffsetsForTimesAfterDeleteRecords(String).quorum=kraft" FAILED java.lang.NullPointerException: Cannot invoke "org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal.buildOffsetAndTimestamp()" because the return value of "java.util.Map$Entry.getValue()" is null at org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.lambda$offsetsForTimes$4(AsyncKafkaConsumer.java:1082) at java.base/java.util.stream.Collectors.lambda$uniqKeysMapAccumulator$1(Collectors.java:180) at java.base/java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) at java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1858) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) at org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.offsetsForTimes(AsyncKafkaConsumer.java:1080) at org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.offsetsForTimes(AsyncKafkaConsumer.java:1043) at org.apache.kafka.clients.consumer.KafkaConsumer.offsetsForTimes(KafkaConsumer.java:1560) at kafka.api.PlaintextAdminIntegrationTest.testOffsetsForTimesAfterDeleteRecords(PlaintextAdminIntegrationTest.scala:1535) {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16109) Write system tests cover the "simple consumer + commit" use case
[ https://issues.apache.org/jira/browse/KAFKA-16109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17885936#comment-17885936 ] Kirk True commented on KAFKA-16109: --- I think we can push it to post-4.0 and/or lower the priority. Thanks! > Write system tests cover the "simple consumer + commit" use case > > > Key: KAFKA-16109 > URL: https://issues.apache.org/jira/browse/KAFKA-16109 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Reporter: Kirk True >Assignee: PoAn Yang >Priority: Major > Labels: consumer-threading-refactor, system-tests > Fix For: 4.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17647) Check PlaintextConsumerSubscriptionTest#testSubscribeInvalidTopicCanUnsubscribe really ignore ErrorEvent
[ https://issues.apache.org/jira/browse/KAFKA-17647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17647: -- Component/s: clients consumer > Check > PlaintextConsumerSubscriptionTest#testSubscribeInvalidTopicCanUnsubscribe > really ignore ErrorEvent > > > Key: KAFKA-17647 > URL: https://issues.apache.org/jira/browse/KAFKA-17647 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: PoAn Yang >Assignee: PoAn Yang >Priority: Minor > > Follow up for [https://github.com/apache/kafka/pull/17244]. > We only check unsubscribe can succeed, but this call may run too fast, so > there may not have further ErrorEvent. We may check new metric > background-event-queue-size to make sure there is new ErrorEvent and > unsubscribe can ignore it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17648) AsyncKafkaConsumer#unsubscribe swallow TopicAuthorizationException
[ https://issues.apache.org/jira/browse/KAFKA-17648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17648: -- Component/s: clients consumer > AsyncKafkaConsumer#unsubscribe swallow TopicAuthorizationException > -- > > Key: KAFKA-17648 > URL: https://issues.apache.org/jira/browse/KAFKA-17648 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: PoAn Yang >Assignee: PoAn Yang >Priority: Minor > > Followup for [https://github.com/apache/kafka/pull/17244]. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17623) Flaky testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback
[ https://issues.apache.org/jira/browse/KAFKA-17623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17623: -- Component/s: clients > Flaky > testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback > > > Key: KAFKA-17623 > URL: https://issues.apache.org/jira/browse/KAFKA-17623 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: PoAn Yang >Priority: Major > Labels: consumer-threading-refactor > > Flaky for the new consumer, failing with : > org.apache.kafka.common.KafkaException: User rebalance callback throws an > error at > app//org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:259) > at > app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.invokeRebalanceCallbacks(AsyncKafkaConsumer.java:1867) > at > app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer$BackgroundEventProcessor.process(AsyncKafkaConsumer.java:195) > at > app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer$BackgroundEventProcessor.process(AsyncKafkaConsumer.java:181) > at > app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.processBackgroundEvents(AsyncKafkaConsumer.java:1758) > at > app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.updateAssignmentMetadataIfNeeded(AsyncKafkaConsumer.java:1618) > ... > Caused by: java.lang.IllegalStateException: No current assignment for > partition topic-0 at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:378) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:395) > at > org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor.process(ApplicationEventProcessor.java:425) > at > org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor.process(ApplicationEventProcessor.java:147) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.processApplicationEvents(ConsumerNetworkThread.java:171) > > Flaky behaviour: > > https://ge.apache.org/scans/tests?search.buildOutcome=failure&search.names=Git%20branch&search.rootProjectNames=kafka&search.startTimeMax=172740959&search.startTimeMin=172248480&search.timeZoneId=America%2FToronto&search.values=trunk&tests.container=integration.kafka.api.PlaintextConsumerCallbackTest&tests.test=testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback(String%2C%20String)%5B3%5D -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17618) group consumer heartbeat interval should be less than session timeout
[ https://issues.apache.org/jira/browse/KAFKA-17618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17618: -- Component/s: (was: clients) (was: consumer) > group consumer heartbeat interval should be less than session timeout > - > > Key: KAFKA-17618 > URL: https://issues.apache.org/jira/browse/KAFKA-17618 > Project: Kafka > Issue Type: Task >Reporter: PoAn Yang >Assignee: PoAn Yang >Priority: Minor > Labels: kip-848 > > [KIP-848|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217387038#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Heartbeat&Session] > mentions: > bq. The member is expected to heartbeat every > group.consumer.heartbeat.interval.ms in order to keep its session opened. If > it does not heartbeat at least once within the > group.consumer.session.timeout.ms, the group coordinator will kick the member > out from the group. > To avoid users configure _group.consumer.heartbeat.interval.ms_ bigger than > _group.consumer.session.timeout.ms_, we can add validation for it. > We can do similar validation for _group.share.heartbeat.interval.ms_ and > _group.share.session.timeout.ms_ as well. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17618) group consumer heartbeat interval should be less than session timeout
[ https://issues.apache.org/jira/browse/KAFKA-17618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17618: -- Component/s: clients consumer > group consumer heartbeat interval should be less than session timeout > - > > Key: KAFKA-17618 > URL: https://issues.apache.org/jira/browse/KAFKA-17618 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: PoAn Yang >Assignee: PoAn Yang >Priority: Minor > Labels: kip-848 > > [KIP-848|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217387038#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Heartbeat&Session] > mentions: > bq. The member is expected to heartbeat every > group.consumer.heartbeat.interval.ms in order to keep its session opened. If > it does not heartbeat at least once within the > group.consumer.session.timeout.ms, the group coordinator will kick the member > out from the group. > To avoid users configure _group.consumer.heartbeat.interval.ms_ bigger than > _group.consumer.session.timeout.ms_, we can add validation for it. > We can do similar validation for _group.share.heartbeat.interval.ms_ and > _group.share.session.timeout.ms_ as well. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17518) AsyncKafkaConsumer cannot reliably leave group when closed with small timeout
[ https://issues.apache.org/jira/browse/KAFKA-17518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884834#comment-17884834 ] Kirk True commented on KAFKA-17518: --- Thanks! :) > AsyncKafkaConsumer cannot reliably leave group when closed with small timeout > - > > Key: KAFKA-17518 > URL: https://issues.apache.org/jira/browse/KAFKA-17518 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.9.0 >Reporter: Kirk True >Assignee: TengYao Chi >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > > If {{close()}} is called with a short timeout (e.g. 0 ms), the process cannot > complete, leading to the consumer remaining in the consumer group. > On {{{}close(){}}}, the consumer attempts to unsubscribe and leave the > consumer group. This process requires hops back and forth between the > application and background threads to call the > {{{}ConsumerRebalanceListener{}}}. Those hops add a nonzero amount of time to > the close step. > The events used to communicate between the application and background threads > are based on the timeout provided by the user. If the timeout is not > sufficient, the events will expire, and the process will be left incomplete. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17518) AsyncKafkaConsumer cannot reliably leave group when closed with small timeout
[ https://issues.apache.org/jira/browse/KAFKA-17518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884832#comment-17884832 ] Kirk True commented on KAFKA-17518: --- [~frankvicky]—as [~lianetm] mentioned, this Jira came out of the PR discussion from [the pull request for KAFKA-16985|https://github.com/apache/kafka/pull/16686]. My personal preference would be to encourage you to join the discourse in that PR so that we arrive at consistent approaches for these various edge cases. > AsyncKafkaConsumer cannot reliably leave group when closed with small timeout > - > > Key: KAFKA-17518 > URL: https://issues.apache.org/jira/browse/KAFKA-17518 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.9.0 >Reporter: Kirk True >Assignee: TengYao Chi >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > > If {{close()}} is called with a short timeout (e.g. 0 ms), the process cannot > complete, leading to the consumer remaining in the consumer group. > On {{{}close(){}}}, the consumer attempts to unsubscribe and leave the > consumer group. This process requires hops back and forth between the > application and background threads to call the > {{{}ConsumerRebalanceListener{}}}. Those hops add a nonzero amount of time to > the close step. > The events used to communicate between the application and background threads > are based on the timeout provided by the user. If the timeout is not > sufficient, the events will expire, and the process will be left incomplete. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16778) AsyncKafkaConsumer fetcher might occasionally try to fetch to a revoked partition
[ https://issues.apache.org/jira/browse/KAFKA-16778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16778: -- Labels: kip-848-client-support (was: ) > AsyncKafkaConsumer fetcher might occasionally try to fetch to a revoked > partition > - > > Key: KAFKA-16778 > URL: https://issues.apache.org/jira/browse/KAFKA-16778 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Priority: Major > Labels: kip-848-client-support > > > {code:java} > java.lang.IllegalStateException: No current assignment for partition > output-topic-26 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:369) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.position(SubscriptionState.java:542) > at > org.apache.kafka.clients.consumer.internals.AbstractFetch.prepareFetchRequests(AbstractFetch.java:411) > at > org.apache.kafka.clients.consumer.internals.FetchRequestManager.poll(FetchRequestManager.java:74) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$new$2(ConsumerNetworkThread.java:159) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$runOnce$0(ConsumerNetworkThread.java:143) > at > java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) > at > java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) > at > java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) > at > java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) > at > java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:657) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.runOnce(ConsumerNetworkThread.java:145) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:94) > {code} > The setup is - running 30 consumers consuming from a 300 partitions topic. > We can occasionally get an IllegalStateException from the consumer. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16778) AsyncKafkaConsumer fetcher might occasionally try to fetch to a revoked partition
[ https://issues.apache.org/jira/browse/KAFKA-16778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16778: -- Component/s: clients consumer > AsyncKafkaConsumer fetcher might occasionally try to fetch to a revoked > partition > - > > Key: KAFKA-16778 > URL: https://issues.apache.org/jira/browse/KAFKA-16778 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Priority: Major > > > {code:java} > java.lang.IllegalStateException: No current assignment for partition > output-topic-26 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:369) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.position(SubscriptionState.java:542) > at > org.apache.kafka.clients.consumer.internals.AbstractFetch.prepareFetchRequests(AbstractFetch.java:411) > at > org.apache.kafka.clients.consumer.internals.FetchRequestManager.poll(FetchRequestManager.java:74) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$new$2(ConsumerNetworkThread.java:159) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$runOnce$0(ConsumerNetworkThread.java:143) > at > java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) > at > java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) > at > java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) > at > java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) > at > java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:657) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.runOnce(ConsumerNetworkThread.java:145) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:94) > {code} > The setup is - running 30 consumers consuming from a 300 partitions topic. > We can occasionally get an IllegalStateException from the consumer. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16966) Allow offset commit fetch to reuse previous request if partitions are a subset
[ https://issues.apache.org/jira/browse/KAFKA-16966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884828#comment-17884828 ] Kirk True commented on KAFKA-16966: --- [~taijuwu]—feel free to take it :) > Allow offset commit fetch to reuse previous request if partitions are a subset > -- > > Key: KAFKA-16966 > URL: https://issues.apache.org/jira/browse/KAFKA-16966 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Affects Versions: 3.8.0 >Reporter: Kirk True >Priority: Minor > Labels: consumer-threading-refactor > Fix For: 4.0.0 > > > In {{{}initWithCommittedOffsetsIfNeeded{}}}, the behavior of the existing > {{LegacyKafkaConsumer}} is to allow reuse only if the partitions for the > _current_ request equal those of the _previous_ request *exactly* > ([source|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java?rgh-link-date=2024-06-14T16%3A43%3A11Z#L927]). > That logic is the basis for the behavior used in the > {{{}AsyncKafkaConsumer{}}}. > The proposed change is to allow for request reuse if the partitions for the > _current_ request are a subset of those of the _previous_ request. This > introduces a subtle difference in behavior between the two {{Consumer}} > implementations, so we need to decided if we want to change both > implementations or just {{{}AsyncKafkaConsumer{}}}. Also, the specific case > that the request reuse logic solves is when the user has passed in a very low > timeout value in a tight {{poll()}} loop, which suggests the partitions > wouldn't be changing between those loops. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15588) Purge the unsent offset commits/fetches when the member is fenced/failed
[ https://issues.apache.org/jira/browse/KAFKA-15588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884827#comment-17884827 ] Kirk True commented on KAFKA-15588: --- [~frankvicky]—I can't say with any certainty, but I would think most pending work pre-fencing should be cleaned up. Feel free to take it. > Purge the unsent offset commits/fetches when the member is fenced/failed > > > Key: KAFKA-15588 > URL: https://issues.apache.org/jira/browse/KAFKA-15588 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Philip Nee >Priority: Major > Labels: kip-848-client-support, reconciliation > Fix For: 4.0.0 > > > When the member is fenced/failed, we should purge the inflight offset commits > and fetches. HeartbeatRequestManager should be able to handle this -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17581) AsyncKafkaConsumer can't unsubscribe invalid topics
[ https://issues.apache.org/jira/browse/KAFKA-17581?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17581: -- Labels: consumer-threading-refactor kip-848-client-support (was: ) > AsyncKafkaConsumer can't unsubscribe invalid topics > --- > > Key: KAFKA-17581 > URL: https://issues.apache.org/jira/browse/KAFKA-17581 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: PoAn Yang >Assignee: PoAn Yang >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > > When consumer subscribes an invalid topic name like " this is test", classic > consumer can unsubscribe without error. However, async consumer can't. We can > use following integration test to validate: > > {code:java} > @ParameterizedTest(name = > TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) > @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) > def testSubscribeInvalidTopic(quorum: String, groupProtocol: String): Unit = { > // Invalid topic name due to space > val invalidTopicName = "topic abc" > val consumer = createConsumer() > consumer.subscribe(List(invalidTopicName).asJava) > var exception : InvalidTopicException = null > TestUtils.waitUntilTrue(() => { > try consumer.poll(Duration.ofMillis(500)) catch { > case e : InvalidTopicException => exception = e > case e : Throwable => fail(s"An InvalidTopicException should be thrown. > But ${e.getClass} is thrown") > } > exception != null > }, waitTimeMs = 5000, msg = "An InvalidTopicException should be thrown.") > assertEquals(s"Invalid topics: [${invalidTopicName}]", exception.getMessage) > // AsyncKafkaConsumer sends request in background thread. Wait enough time > to send next request. > Thread.sleep(1000) > assertDoesNotThrow(new Executable { > override def execute(): Unit = consumer.unsubscribe() > }) > }{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17582) Unpredictable consumer position after transaction abort
[ https://issues.apache.org/jira/browse/KAFKA-17582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17582: -- Component/s: consumer > Unpredictable consumer position after transaction abort > --- > > Key: KAFKA-17582 > URL: https://issues.apache.org/jira/browse/KAFKA-17582 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, documentation >Affects Versions: 3.8.0 >Reporter: Kyle Kingsbury >Priority: Critical > Labels: abort, offset, transaction > Attachments: 20240919T124411.740-0500(1).zip, Screenshot from > 2024-09-19 18-45-34.png > > > With the official Kafka Java client, version 3.8.0, the position of consumers > after a transaction aborts appears unpredictable. Sometimes the consumer > moves on, skipping over the records it polled in the aborted transaction. > Sometimes it rewinds to read them again. Sometimes it rewinds *further* than > the most recent transaction. > Since the goal of transactions is to enable "exactly-once semantics", it > seems sensible that the consumer should rewind on abort, such that any > subsequent transactions would start at the same offsets. Not rewinding leads > to data loss, since messages are consumed but their effects are not > committed. Rewinding too far is... just weird. > I'm seeing this issue in Jepsen tests of Kafka 3.0.0 and other > Kafka-compatible systems. It occurs without faults, and with a single > producer and consumer; no other concurrent processes. Here's the producer and > consumer config: > > {{{}Producer config: {"socket.connection.setup.timeout.max.ms" 1000, > "transactional.id" "jt1", "bootstrap.servers" "n3:9092", "request.timeout.ms" > 3000, "enable.idempotence" true, "max.block.ms" 1, "value.serializer" > "org.apache.kafka.common.serialization.LongSerializer", "retries" 1000, > "key.serializer" "org.apache.kafka.common.serialization.LongSerializer", > "socket.connection.setup.timeout.ms" 500, "reconnect.backoff.max.ms" 1000, > "delivery.timeout.ms" 1, "acks" "all", "transaction.timeout.ms" 1000{ > {{{}Consumer config: {"socket.connection.setup.timeout.max.ms" 1000, > "bootstrap.servers" "n5:9092", "request.timeout.ms" 1, > "connections.max.idle.ms" 6, "session.timeout.ms" 6000, > "heartbeat.interval.ms" 300, "key.deserializer" > "org.apache.kafka.common.serialization.LongDeserializer", "group.id" > "jepsen-group", "metadata.max.age.ms" 6, "auto.offset.reset" "earliest", > "isolation.level" "read_committed", "socket.connection.setup.timeout.ms" 500, > "value.deserializer" > "org.apache.kafka.common.serialization.LongDeserializer", > "enable.auto.commit" false, "default.api.timeout.ms" 1{ > > Attached is a test run that shows this behavior, as well as a visualization > of the reads (polls) and writes (sends) of a single topic-partition. > In this plot, time flows down, and offsets run left to right. Each > transaction is a single horizontal line. `w1` denotes a send of value 1, and > `r2` denotes a poll of read 2. All operations here are performed by the sole > process in the system, which has a single Kafka consumer and a single Kafka > client. First, a transaction writes 35 and commits. Second, a transaction > reads 35 and aborts. Third, a transaction reads 35 and aborts: the consumer > has clearly re-wound to show the same record twice. > Then a transaction writes 37. Immediately thereafter a transaction reads 37 > and 38. Unlike before, it did *not* rewind. This transaction also aborts. > Finally, a transaction writes 39 and 40. Then a transaction reads 39 and 40. > This transaction commits! Values 35, 37, and 38 have been lost! > It doesn't seem possible that this is the effect of a consumer rebalance: > rebalancing should start off the consumer at the last *committed* offset, and > the last committed offset in this history was actually value 31–it should > have picked up at 35, 37, etc. This test uses auto.offset.reset=earliest, so > if the commit were somehow missing, it should have rewound to the start of > the topic-partition. > What... *should* Kafka do with respect to consumer offsets when a transaction > aborts? And is there any sort of documentation for this? I've been digging > into this problem for almost a week–it manifested as write loss in a Jepsen > test--and I'm baffled as to how to proceed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17439) Make polling for new records an explicit action/event in the new consumer
[ https://issues.apache.org/jira/browse/KAFKA-17439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17439: -- Labels: consumer-threading-refactor kip-848-client-support (was: consumer-threading-refactor) > Make polling for new records an explicit action/event in the new consumer > - > > Key: KAFKA-17439 > URL: https://issues.apache.org/jira/browse/KAFKA-17439 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 4.0.0 > > > Presently, the new consumer polls the FetchRequestManager many, many times a > second and creates fetch requests for any fetchable partitions. In order to > more closely mirror how the existing consumer processes fetches, we should > mirror the points at which fetch requests are sent in the new consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16460) New consumer times out consuming records in multiple consumer_test.py system tests
[ https://issues.apache.org/jira/browse/KAFKA-16460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16460: - Assignee: Kirk True (was: Arpit Goyal) > New consumer times out consuming records in multiple consumer_test.py system > tests > -- > > Key: KAFKA-16460 > URL: https://issues.apache.org/jira/browse/KAFKA-16460 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 4.0.0 > > > The {{consumer_test.py}} system test fails with the following errors: > {quote} > * Timed out waiting for consumption > {quote} > Affected tests: > * {{test_broker_failure}} > * {{test_consumer_bounce}} > * {{test_static_consumer_bounce}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16460) New consumer times out consuming records in multiple consumer_test.py system tests
[ https://issues.apache.org/jira/browse/KAFKA-16460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884399#comment-17884399 ] Kirk True commented on KAFKA-16460: --- There's a good chance this is a result of a different problem, so I don't want you to chase issues that are unrelated. If I find an actionable bug to fix, I'll assign it to you. > New consumer times out consuming records in multiple consumer_test.py system > tests > -- > > Key: KAFKA-16460 > URL: https://issues.apache.org/jira/browse/KAFKA-16460 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Arpit Goyal >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 4.0.0 > > > The {{consumer_test.py}} system test fails with the following errors: > {quote} > * Timed out waiting for consumption > {quote} > Affected tests: > * {{test_broker_failure}} > * {{test_consumer_bounce}} > * {{test_static_consumer_bounce}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17581) AsyncKafkaConsumer can't unsubscribe invalid topics
[ https://issues.apache.org/jira/browse/KAFKA-17581?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17581: -- Component/s: clients consumer > AsyncKafkaConsumer can't unsubscribe invalid topics > --- > > Key: KAFKA-17581 > URL: https://issues.apache.org/jira/browse/KAFKA-17581 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: PoAn Yang >Assignee: PoAn Yang >Priority: Major > > When consumer subscribes an invalid topic name like " this is test", classic > consumer can unsubscribe without error. However, async consumer can't. We can > use following integration test to validate: > > {code:java} > @ParameterizedTest(name = > TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) > @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) > def testSubscribeInvalidTopic(quorum: String, groupProtocol: String): Unit = { > // Invalid topic name due to space > val invalidTopicName = "topic abc" > val consumer = createConsumer() > consumer.subscribe(List(invalidTopicName).asJava) > var exception : InvalidTopicException = null > TestUtils.waitUntilTrue(() => { > try consumer.poll(Duration.ofMillis(500)) catch { > case e : InvalidTopicException => exception = e > case e : Throwable => fail(s"An InvalidTopicException should be thrown. > But ${e.getClass} is thrown") > } > exception != null > }, waitTimeMs = 5000, msg = "An InvalidTopicException should be thrown.") > assertEquals(s"Invalid topics: [${invalidTopicName}]", exception.getMessage) > // AsyncKafkaConsumer sends request in background thread. Wait enough time > to send next request. > Thread.sleep(1000) > assertDoesNotThrow(new Executable { > override def execute(): Unit = consumer.unsubscribe() > }) > }{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17581) AsyncKafkaConsumer can't unsubscribe invalid topics
[ https://issues.apache.org/jira/browse/KAFKA-17581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17883613#comment-17883613 ] Kirk True commented on KAFKA-17581: --- [~yangpoan]—this is a good catch! Where in the new consumer is the exception thrown? I’d want to look at the existing consumer to see how it handles this case. > AsyncKafkaConsumer can't unsubscribe invalid topics > --- > > Key: KAFKA-17581 > URL: https://issues.apache.org/jira/browse/KAFKA-17581 > Project: Kafka > Issue Type: Bug >Reporter: PoAn Yang >Assignee: PoAn Yang >Priority: Major > > When consumer subscribes an invalid topic name like " this is test", classic > consumer can unsubscribe without error. However, async consumer can't. We can > use following integration test to validate: > > {code:java} > @ParameterizedTest(name = > TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) > @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) > def testSubscribeInvalidTopic(quorum: String, groupProtocol: String): Unit = { > // Invalid topic name due to space > val invalidTopicName = "topic abc" > val consumer = createConsumer() > consumer.subscribe(List(invalidTopicName).asJava) > var exception : InvalidTopicException = null > TestUtils.waitUntilTrue(() => { > try consumer.poll(Duration.ofMillis(500)) catch { > case e : InvalidTopicException => exception = e > case e : Throwable => fail(s"An InvalidTopicException should be thrown. > But ${e.getClass} is thrown") > } > exception != null > }, waitTimeMs = 5000, msg = "An InvalidTopicException should be thrown.") > assertEquals(s"Invalid topics: [${invalidTopicName}]", exception.getMessage) > // AsyncKafkaConsumer sends request in background thread. Wait enough time > to send next request. > Thread.sleep(1000) > assertDoesNotThrow(new Executable { > override def execute(): Unit = consumer.unsubscribe() > }) > }{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17554) Flaky testFutureCompletionOutsidePoll in ConsumerNetworkClientTest
[ https://issues.apache.org/jira/browse/KAFKA-17554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17554: -- Component/s: clients > Flaky testFutureCompletionOutsidePoll in ConsumerNetworkClientTest > -- > > Key: KAFKA-17554 > URL: https://issues.apache.org/jira/browse/KAFKA-17554 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: 黃竣陽 >Priority: Major > Labels: consumer, flaky-test > > Fails with: > org.opentest4j.AssertionFailedError: expected: but was: at > org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at > org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at > org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31) at > org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:183) at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClientTest.testFutureCompletionOutsidePoll(ConsumerNetworkClientTest.java:295) > at java.lang.reflect.Method.invoke(Method.java:498) at > java.util.ArrayList.forEach(ArrayList.java:1259) at > java.util.ArrayList.forEach(ArrayList.java:1259) > > Flaky behaviour started recently (~ Aug 27, 2024): > [https://ge.apache.org/scans/tests?search.names=Git%20branch&search.rootProjectNames=kafka&search.startTimeMax=1726258207388&search.startTimeMin=172110240&search.timeZoneId=America%2FToronto&search.values=trunk&tests.container=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClientTest&tests.test=testFutureCompletionOutsidePoll()] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17519) Define and validate correctness of Consumer.close() and its timeout when thread is interrupted
[ https://issues.apache.org/jira/browse/KAFKA-17519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17881422#comment-17881422 ] Kirk True commented on KAFKA-17519: --- [~yangpoan]—it's yours if you want it. Thanks! > Define and validate correctness of Consumer.close() and its timeout when > thread is interrupted > -- > > Key: KAFKA-17519 > URL: https://issues.apache.org/jira/browse/KAFKA-17519 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.9.0 >Reporter: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > > The repercussions of a thread's interrupt status on {{Consumer.close()}} and > its timeout is not well defined. It _appears_ that the > {{ClassicKafkaConsumer}} will continue to attempt to close all its resources > even if an interrupt was triggered prior to—or during—the call to {{close()}} > though it effectively ignores the user's supplied timeout since each call to > {{NetworkClient.poll()}} will throw an {{InterruptException}} after first > making an attempt to poll the socket. > The task here is to review the existing code, verify the behavior with some > unit/integration tests, and document it. Furthermore, once the intended > behavior has been confirmed, the {{AsyncKafkaConsumer}} should be updated to > behave likewise. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17535) Flaky testCloseNoWait in KafkaConsumerTest for Classic Consumer
[ https://issues.apache.org/jira/browse/KAFKA-17535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17535: -- Component/s: clients > Flaky testCloseNoWait in KafkaConsumerTest for Classic Consumer > > > Key: KAFKA-17535 > URL: https://issues.apache.org/jira/browse/KAFKA-17535 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Lianet Magrans >Priority: Major > Labels: flaky-test > > Flaky failures due to TimeoutException: > java.util.concurrent.TimeoutException at > java.base/java.util.concurrent.FutureTask.get(FutureTask.java:204) at > org.apache.kafka.clients.consumer.KafkaConsumerTest.consumerCloseTest(KafkaConsumerTest.java:2049) > Flaky in trunk for the classic consumer for a while: > [https://ge.apache.org/scans/tests?search.names=Git%20branch&search.rootProjectNames=kafka&search.startTimeMax=17261&search.startTimeMin=172248480&search.timeZoneId=America%2FToronto&search.values=trunk&tests.container=org.apache.kafka.clients.consumer.KafkaConsumerTest&tests.test=testCloseNoWait(GroupProtocol)%5B1%5D] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17518) AsyncKafkaConsumer cannot reliably leave group when closed with small timeout
[ https://issues.apache.org/jira/browse/KAFKA-17518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17881421#comment-17881421 ] Kirk True commented on KAFKA-17518: --- [~frankvicky]—yes, you're free to take it. Thanks! > AsyncKafkaConsumer cannot reliably leave group when closed with small timeout > - > > Key: KAFKA-17518 > URL: https://issues.apache.org/jira/browse/KAFKA-17518 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.9.0 >Reporter: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > > If {{close()}} is called with a short timeout (e.g. 0 ms), the process cannot > complete, leading to the consumer remaining in the consumer group. > On {{{}close(){}}}, the consumer attempts to unsubscribe and leave the > consumer group. This process requires hops back and forth between the > application and background threads to call the > {{{}ConsumerRebalanceListener{}}}. Those hops add a nonzero amount of time to > the close step. > The events used to communicate between the application and background threads > are based on the timeout provided by the user. If the timeout is not > sufficient, the events will expire, and the process will be left incomplete. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17536) Ensure clear error message when "new" consumer used with incompatible cluster
Kirk True created KAFKA-17536: - Summary: Ensure clear error message when "new" consumer used with incompatible cluster Key: KAFKA-17536 URL: https://issues.apache.org/jira/browse/KAFKA-17536 Project: Kafka Issue Type: Improvement Components: clients, consumer Affects Versions: 3.9.0 Reporter: Kirk True Assignee: Kirk True Fix For: 4.0.0 In Kafka 4.0, by default the consumer uses the updated consumer group protocol defined in KIP-848. When the consumer is used with a cluster that does not support (or is not configured to use) the new protocol, the consumer will get an unfriendly error about unavailable APIs. Since this error could be the user's first impression when attempting to upgrade to 4.0, we need to make sure that the error is very clear about the remediation steps (set the group.protocol to CLASSIC on the client or upgrade and enable the new protocol on the cluster). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17536) Ensure clear error message when "new" consumer used with incompatible cluster
[ https://issues.apache.org/jira/browse/KAFKA-17536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17536: -- Priority: Critical (was: Major) > Ensure clear error message when "new" consumer used with incompatible cluster > - > > Key: KAFKA-17536 > URL: https://issues.apache.org/jira/browse/KAFKA-17536 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Affects Versions: 3.9.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: kip-848-client-support > Fix For: 4.0.0 > > > In Kafka 4.0, by default the consumer uses the updated consumer group > protocol defined in KIP-848. When the consumer is used with a cluster that > does not support (or is not configured to use) the new protocol, the consumer > will get an unfriendly error about unavailable APIs. Since this error could > be the user's first impression when attempting to upgrade to 4.0, we need to > make sure that the error is very clear about the remediation steps (set the > group.protocol to CLASSIC on the client or upgrade and enable the new > protocol on the cluster). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17519) Define behavior of Consumer.close() and its timeout when thread is interrupted
[ https://issues.apache.org/jira/browse/KAFKA-17519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17519: -- Summary: Define behavior of Consumer.close() and its timeout when thread is interrupted (was: Review intended behavior of Consumer.close() and its timeout on interrupt) > Define behavior of Consumer.close() and its timeout when thread is interrupted > -- > > Key: KAFKA-17519 > URL: https://issues.apache.org/jira/browse/KAFKA-17519 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.9.0 >Reporter: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > > The repercussions of a thread's interrupt status on {{Consumer.close()}} and > its timeout is not well defined. It _appears_ that the > {{ClassicKafkaConsumer}} will continue to attempt to close all its resources > even if an interrupt was triggered prior to—or during—the call to {{close()}} > though it effectively ignores the user's supplied timeout since each call to > {{NetworkClient.poll()}} will throw an {{InterruptException}} after first > making an attempt to poll the socket. > The task here is to review the existing code, verify the behavior with some > unit/integration tests, and document it. Furthermore, once the intended > behavior has been confirmed, the {{AsyncKafkaConsumer}} should be updated to > behave likewise. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-17518) AsyncKafkaConsumer cannot reliably leave group when closed with small timeout
[ https://issues.apache.org/jira/browse/KAFKA-17518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17880817#comment-17880817 ] Kirk True edited comment on KAFKA-17518 at 9/11/24 12:32 AM: - This issue was found during [review|https://github.com/apache/kafka/pull/16686] and testing for KAFKA-16985. was (Author: kirktrue): This issue was found during [review|[https://github.com/apache/kafka/pull/16686]] and testing for KAFKA-16985. > AsyncKafkaConsumer cannot reliably leave group when closed with small timeout > - > > Key: KAFKA-17518 > URL: https://issues.apache.org/jira/browse/KAFKA-17518 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.9.0 >Reporter: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > > If {{close()}} is called with a short timeout (e.g. 0 ms), the process cannot > complete, leading to the consumer remaining in the consumer group. > On {{{}close(){}}}, the consumer attempts to unsubscribe and leave the > consumer group. This process requires hops back and forth between the > application and background threads to call the > {{{}ConsumerRebalanceListener{}}}. Those hops add a nonzero amount of time to > the close step. > The events used to communicate between the application and background threads > are based on the timeout provided by the user. If the timeout is not > sufficient, the events will expire, and the process will be left incomplete. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17518) AsyncKafkaConsumer cannot reliably leave group when closed with small timeout
Kirk True created KAFKA-17518: - Summary: AsyncKafkaConsumer cannot reliably leave group when closed with small timeout Key: KAFKA-17518 URL: https://issues.apache.org/jira/browse/KAFKA-17518 Project: Kafka Issue Type: Bug Components: clients, consumer Affects Versions: 3.9.0 Reporter: Kirk True If {{close()}} is called with a short timeout (e.g. 0 ms), the process cannot complete, leading to the consumer remaining in the consumer group. On {{{}close(){}}}, the consumer attempts to unsubscribe and leave the consumer group. This process requires hops back and forth between the application and background threads to call the {{{}ConsumerRebalanceListener{}}}. Those hops add a nonzero amount of time to the close step. The events used to communicate between the application and background threads are based on the timeout provided by the user. If the timeout is not sufficient, the events will expire, and the process will be left incomplete. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16138) QuotaTest system test fails consistently in 3.7
[ https://issues.apache.org/jira/browse/KAFKA-16138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16138: -- Fix Version/s: 4.0.0 > QuotaTest system test fails consistently in 3.7 > --- > > Key: KAFKA-16138 > URL: https://issues.apache.org/jira/browse/KAFKA-16138 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0, 3.8.0 >Reporter: Stanislav Kozlovski >Assignee: Kirk True >Priority: Major > Fix For: 4.0.0 > > > as mentioned in > [https://hackmd.io/@hOneAGCrSmKSpL8VF-1HWQ/HyRgRJmta#kafkatesttestsclientquota_testQuotaTesttest_quotaArguments-%E2%80%9Coverride_quota%E2%80%9D-true-%E2%80%9Cquota_type%E2%80%9D-%E2%80%9Cuser%E2%80%9D,] > the test fails consistently: > {code:java} > ValueError('max() arg is an empty sequence') > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 184, in _do_run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 262, in run_test > return self.test_context.function(self.test) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", > line 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/quota_test.py", > line 169, in test_quota > success, msg = self.validate(self.kafka, producer, consumer) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/quota_test.py", > line 197, in validate > metric.value for k, metrics in producer.metrics(group='producer-metrics', > name='outgoing-byte-rate', client_id=producer.client_id) for metric in metrics > ValueError: max() arg is an empty sequence {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16138) QuotaTest system test fails consistently in 3.7
[ https://issues.apache.org/jira/browse/KAFKA-16138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16138: - Assignee: Kirk True (was: Philip Nee) > QuotaTest system test fails consistently in 3.7 > --- > > Key: KAFKA-16138 > URL: https://issues.apache.org/jira/browse/KAFKA-16138 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0, 3.8.0 >Reporter: Stanislav Kozlovski >Assignee: Kirk True >Priority: Major > > as mentioned in > [https://hackmd.io/@hOneAGCrSmKSpL8VF-1HWQ/HyRgRJmta#kafkatesttestsclientquota_testQuotaTesttest_quotaArguments-%E2%80%9Coverride_quota%E2%80%9D-true-%E2%80%9Cquota_type%E2%80%9D-%E2%80%9Cuser%E2%80%9D,] > the test fails consistently: > {code:java} > ValueError('max() arg is an empty sequence') > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 184, in _do_run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 262, in run_test > return self.test_context.function(self.test) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", > line 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/quota_test.py", > line 169, in test_quota > success, msg = self.validate(self.kafka, producer, consumer) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/quota_test.py", > line 197, in validate > metric.value for k, metrics in producer.metrics(group='producer-metrics', > name='outgoing-byte-rate', client_id=producer.client_id) for metric in metrics > ValueError: max() arg is an empty sequence {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17448) New consumer seek should update positions in background thread
[ https://issues.apache.org/jira/browse/KAFKA-17448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17879653#comment-17879653 ] Kirk True commented on KAFKA-17448: --- [~yangpoan]—sorry for the nag, but can you mark this as Patch Available since the PR is out for review? Thanks! > New consumer seek should update positions in background thread > -- > > Key: KAFKA-17448 > URL: https://issues.apache.org/jira/browse/KAFKA-17448 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0, 3.8.0, 3.7.1 >Reporter: Lianet Magrans >Assignee: PoAn Yang >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 4.0.0 > > > In the new AsyncKafkaConsumer, a call to seek will update the positions in > subscription state for the assigned partitions in the app thread > ([https://github.com/apache/kafka/blob/c23b6b0365af5c58b76d8ad3fb628f766f95348f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L796]) > This could lead to race conditions like we've seen when subscription state > changes in the app thread (over a set of assigned partitions), that could > have been modified in the background thread, leading to errors on "No current > assignment for partition " > [https://github.com/apache/kafka/blob/c23b6b0365af5c58b76d8ad3fb628f766f95348f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L378] > > Also, positions update is moved the background with KAFKA-17066 for the same > reason, so even if the assignment does not change, we could have a race > between the background setting positions to the committed offsets for > instance, and the app thread setting them manually via seek. > To avoid all of the above, we should have seek generate an event, send it to > the background, and then update the subscription state when processing that > event (similar to other api calls, ex, assign with KAFKA-17064) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16138) QuotaTest system test fails consistently in 3.7
[ https://issues.apache.org/jira/browse/KAFKA-16138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16138: -- Component/s: system tests > QuotaTest system test fails consistently in 3.7 > --- > > Key: KAFKA-16138 > URL: https://issues.apache.org/jira/browse/KAFKA-16138 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0, 3.8.0 >Reporter: Stanislav Kozlovski >Priority: Major > > as mentioned in > [https://hackmd.io/@hOneAGCrSmKSpL8VF-1HWQ/HyRgRJmta#kafkatesttestsclientquota_testQuotaTesttest_quotaArguments-%E2%80%9Coverride_quota%E2%80%9D-true-%E2%80%9Cquota_type%E2%80%9D-%E2%80%9Cuser%E2%80%9D,] > the test fails consistently: > {code:java} > ValueError('max() arg is an empty sequence') > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 184, in _do_run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 262, in run_test > return self.test_context.function(self.test) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", > line 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/quota_test.py", > line 169, in test_quota > success, msg = self.validate(self.kafka, producer, consumer) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/quota_test.py", > line 197, in validate > metric.value for k, metrics in producer.metrics(group='producer-metrics', > name='outgoing-byte-rate', client_id=producer.client_id) for metric in metrics > ValueError: max() arg is an empty sequence {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16138) QuotaTest system test fails consistently in 3.7
[ https://issues.apache.org/jira/browse/KAFKA-16138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16138: - Assignee: Philip Nee > QuotaTest system test fails consistently in 3.7 > --- > > Key: KAFKA-16138 > URL: https://issues.apache.org/jira/browse/KAFKA-16138 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0, 3.8.0 >Reporter: Stanislav Kozlovski >Assignee: Philip Nee >Priority: Major > > as mentioned in > [https://hackmd.io/@hOneAGCrSmKSpL8VF-1HWQ/HyRgRJmta#kafkatesttestsclientquota_testQuotaTesttest_quotaArguments-%E2%80%9Coverride_quota%E2%80%9D-true-%E2%80%9Cquota_type%E2%80%9D-%E2%80%9Cuser%E2%80%9D,] > the test fails consistently: > {code:java} > ValueError('max() arg is an empty sequence') > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 184, in _do_run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 262, in run_test > return self.test_context.function(self.test) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", > line 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/quota_test.py", > line 169, in test_quota > success, msg = self.validate(self.kafka, producer, consumer) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/quota_test.py", > line 197, in validate > metric.value for k, metrics in producer.metrics(group='producer-metrics', > name='outgoing-byte-rate', client_id=producer.client_id) for metric in metrics > ValueError: max() arg is an empty sequence {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16138) QuotaTest system test fails consistently in 3.7
[ https://issues.apache.org/jira/browse/KAFKA-16138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16138: -- Component/s: clients consumer > QuotaTest system test fails consistently in 3.7 > --- > > Key: KAFKA-16138 > URL: https://issues.apache.org/jira/browse/KAFKA-16138 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Affects Versions: 3.7.0, 3.8.0 >Reporter: Stanislav Kozlovski >Priority: Major > > as mentioned in > [https://hackmd.io/@hOneAGCrSmKSpL8VF-1HWQ/HyRgRJmta#kafkatesttestsclientquota_testQuotaTesttest_quotaArguments-%E2%80%9Coverride_quota%E2%80%9D-true-%E2%80%9Cquota_type%E2%80%9D-%E2%80%9Cuser%E2%80%9D,] > the test fails consistently: > {code:java} > ValueError('max() arg is an empty sequence') > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 184, in _do_run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 262, in run_test > return self.test_context.function(self.test) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", > line 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/quota_test.py", > line 169, in test_quota > success, msg = self.validate(self.kafka, producer, consumer) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/quota_test.py", > line 197, in validate > metric.value for k, metrics in producer.metrics(group='producer-metrics', > name='outgoing-byte-rate', client_id=producer.client_id) for metric in metrics > ValueError: max() arg is an empty sequence {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17466) Revisit failAndRemoveExpiredCommitRequests method
[ https://issues.apache.org/jira/browse/KAFKA-17466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17466: -- Component/s: clients consumer > Revisit failAndRemoveExpiredCommitRequests method > - > > Key: KAFKA-17466 > URL: https://issues.apache.org/jira/browse/KAFKA-17466 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: TengYao Chi >Assignee: TengYao Chi >Priority: Minor > > see discussion: > [https://github.com/apache/kafka/pull/16833#issuecomment-2301334628] > > In short, we should consider removing the > `failAndRemoveExpiredCommitRequests` method since its functionality is > already handled elsewhere. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17456) Make sure FindCoordinatorResponse get created before consumer
[ https://issues.apache.org/jira/browse/KAFKA-17456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17456: -- Component/s: clients consumer > Make sure FindCoordinatorResponse get created before consumer > - > > Key: KAFKA-17456 > URL: https://issues.apache.org/jira/browse/KAFKA-17456 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Chia-Ping Tsai >Assignee: TengYao Chi >Priority: Major > > The incorrect order could lead to flaky (see KAFKA-17092 and KAFKA-17395). It > would be nice that we fix all of them. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17338) ConsumerConfig should prevent using partition assignors with CONSUMER group protocol
[ https://issues.apache.org/jira/browse/KAFKA-17338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17879020#comment-17879020 ] Kirk True commented on KAFKA-17338: --- [~m1a2st]—I assigned this to you and updated the status to "Patch Available" given that there's a PR under review. Thanks for your help! > ConsumerConfig should prevent using partition assignors with CONSUMER group > protocol > > > Key: KAFKA-17338 > URL: https://issues.apache.org/jira/browse/KAFKA-17338 > Project: Kafka > Issue Type: Task > Components: clients, config, consumer >Reporter: Kirk True >Assignee: 黃竣陽 >Priority: Critical > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 4.0.0 > > > {{ConsumerConfig}} should be updated to include additional validation in > {{postProcessParsedConfig()}}—when {{group.protocol}} is set to {{CONSUMER}}, > the value for {{partition.assignment.strategy}} must be either null or empty. > Otherwise a {{ConfigException}} should be thrown. > This is somewhat of the inverse case of KAFKA-15773. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17338) ConsumerConfig should prevent using partition assignors with CONSUMER group protocol
[ https://issues.apache.org/jira/browse/KAFKA-17338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17338: -- Fix Version/s: 4.0.0 > ConsumerConfig should prevent using partition assignors with CONSUMER group > protocol > > > Key: KAFKA-17338 > URL: https://issues.apache.org/jira/browse/KAFKA-17338 > Project: Kafka > Issue Type: Task > Components: clients, config, consumer >Reporter: Kirk True >Assignee: 黃竣陽 >Priority: Critical > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 4.0.0 > > > {{ConsumerConfig}} should be updated to include additional validation in > {{postProcessParsedConfig()}}—when {{group.protocol}} is set to {{CONSUMER}}, > the value for {{partition.assignment.strategy}} must be either null or empty. > Otherwise a {{ConfigException}} should be thrown. > This is somewhat of the inverse case of KAFKA-15773. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-17338) ConsumerConfig should prevent using partition assignors with CONSUMER group protocol
[ https://issues.apache.org/jira/browse/KAFKA-17338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-17338: - Assignee: 黃竣陽 (was: Kirk True) > ConsumerConfig should prevent using partition assignors with CONSUMER group > protocol > > > Key: KAFKA-17338 > URL: https://issues.apache.org/jira/browse/KAFKA-17338 > Project: Kafka > Issue Type: Task > Components: clients, config, consumer >Reporter: Kirk True >Assignee: 黃竣陽 >Priority: Critical > Labels: consumer-threading-refactor, kip-848-client-support > > {{ConsumerConfig}} should be updated to include additional validation in > {{postProcessParsedConfig()}}—when {{group.protocol}} is set to {{CONSUMER}}, > the value for {{partition.assignment.strategy}} must be either null or empty. > Otherwise a {{ConfigException}} should be thrown. > This is somewhat of the inverse case of KAFKA-15773. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17116) New consumer may not send effective leave group if member ID received after close
[ https://issues.apache.org/jira/browse/KAFKA-17116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17116: -- Labels: kip-848-client-support needs-kip (was: kip-848-client-support) > New consumer may not send effective leave group if member ID received after > close > -- > > Key: KAFKA-17116 > URL: https://issues.apache.org/jira/browse/KAFKA-17116 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.8.0 >Reporter: Lianet Magrans >Assignee: TengYao Chi >Priority: Major > Labels: kip-848-client-support, needs-kip > Fix For: 4.0.0 > > > If the new consumer is closed after sending a HB to join, but before > receiving the response to it, it will send a leave group request but without > member ID (will simply fail with UNKNOWN_MEMBER_ID). This will make that the > broker will have a registered new member, for which it will never receive a > leave request for it. > # consumer.subscribe -> sends HB to join, transitions to JOINING > # consumer.close -> will transition to LEAVING and send HB with epoch -1 > (without waiting for in-flight requests) > # consumer receives response to initial HB, containing the assigned member > ID. It will simply ignore it because it's not in the group anymore > (UNSUBSCRIBED) > Note that the expectation, with the current logic, and main downsides of this > are: > # If the case was that the member received partitions on the first HB, those > partitions won't be re-assigned (broker waiting for the closed consumer to > reconcile them), until the rebalance timeout expires. > # Even if no partitions were assigned to it, the member will remain in the > group from the broker point of view (but not from the client POV). The member > will be eventually kicked out for not sending HBs, but only when it's session > timeout expires. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17338) ConsumerConfig should prevent using partition assignors with CONSUMER group protocol
[ https://issues.apache.org/jira/browse/KAFKA-17338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878956#comment-17878956 ] Kirk True commented on KAFKA-17338: --- [~m1a2st] do you want to assign this to yourself, given that you have already submitted a PR? Thanks! > ConsumerConfig should prevent using partition assignors with CONSUMER group > protocol > > > Key: KAFKA-17338 > URL: https://issues.apache.org/jira/browse/KAFKA-17338 > Project: Kafka > Issue Type: Task > Components: clients, config, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, kip-848-client-support > > {{ConsumerConfig}} should be updated to include additional validation in > {{postProcessParsedConfig()}}—when {{group.protocol}} is set to {{CONSUMER}}, > the value for {{partition.assignment.strategy}} must be either null or empty. > Otherwise a {{ConfigException}} should be thrown. > This is somewhat of the inverse case of KAFKA-15773. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17453) Fix flaky PlaintextConsumerFetchTest#testFetchOutOfRangeOffsetResetConfigLatest
[ https://issues.apache.org/jira/browse/KAFKA-17453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17453: -- Component/s: clients consumer > Fix flaky > PlaintextConsumerFetchTest#testFetchOutOfRangeOffsetResetConfigLatest > --- > > Key: KAFKA-17453 > URL: https://issues.apache.org/jira/browse/KAFKA-17453 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > {code:java} > Errorjava.util.NoSuchElementExceptionStacktracejava.util.NoSuchElementException >at > org.apache.kafka.common.utils.AbstractIterator.next(AbstractIterator.java:52) >at > kafka.api.PlaintextConsumerFetchTest.testFetchOutOfRangeOffsetResetConfigLatest(PlaintextConsumerFetchTest.scala:104) > at java.lang.reflect.Method.invoke(Method.java:498) at > java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) > at > java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647) > at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272) > at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) > at > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) >at > java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) > at > java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) >at > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) >at > java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) > at > java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) >at > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) >at > java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) > at java.util.ArrayList.forEach(ArrayList.java:1259) at > java.util.ArrayList.forEach(ArrayList.java:1259){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17453) Fix flaky PlaintextConsumerFetchTest#testFetchOutOfRangeOffsetResetConfigLatest
[ https://issues.apache.org/jira/browse/KAFKA-17453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17453: -- Labels: integration-test (was: ) > Fix flaky > PlaintextConsumerFetchTest#testFetchOutOfRangeOffsetResetConfigLatest > --- > > Key: KAFKA-17453 > URL: https://issues.apache.org/jira/browse/KAFKA-17453 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > Labels: integration-test > > {code:java} > Errorjava.util.NoSuchElementExceptionStacktracejava.util.NoSuchElementException >at > org.apache.kafka.common.utils.AbstractIterator.next(AbstractIterator.java:52) >at > kafka.api.PlaintextConsumerFetchTest.testFetchOutOfRangeOffsetResetConfigLatest(PlaintextConsumerFetchTest.scala:104) > at java.lang.reflect.Method.invoke(Method.java:498) at > java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) > at > java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647) > at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272) > at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) > at > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) >at > java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) > at > java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) >at > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) >at > java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) > at > java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) >at > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) >at > java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) > at java.util.ArrayList.forEach(ArrayList.java:1259) at > java.util.ArrayList.forEach(ArrayList.java:1259){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17451) Remove deprecated Consumer#committed
[ https://issues.apache.org/jira/browse/KAFKA-17451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17451: -- Component/s: clients consumer > Remove deprecated Consumer#committed > > > Key: KAFKA-17451 > URL: https://issues.apache.org/jira/browse/KAFKA-17451 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Chia-Ping Tsai >Assignee: Ming-Yen Chung >Priority: Major > > The APIs were deprecated by KAFKA-8880 which is back in 2.4.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17335) Lack of default for URL encoding configuration for OAuth causes NPE
[ https://issues.apache.org/jira/browse/KAFKA-17335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17877943#comment-17877943 ] Kirk True commented on KAFKA-17335: --- Thanks [~chia7712]!!! > Lack of default for URL encoding configuration for OAuth causes NPE > --- > > Key: KAFKA-17335 > URL: https://issues.apache.org/jira/browse/KAFKA-17335 > Project: Kafka > Issue Type: Bug > Components: clients, security >Affects Versions: 3.9.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: OAuth, oauth > Fix For: 4.0.0, 3.9.0 > > > KAFKA-16345 added a new client configuration option > {{{}SASL_OAUTHBEARER_HEADER_URLENCODE{}}}. This is an optional configuration, > so the user doesn't need to provide it. When an {{{}AdminConfig{}}}, > {{{}ConsumerConfig{}}}, or {{ProducerConfig}} object is created, it uses the > default value of {{DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE}} if the value > isn't present. > However, if the configuration is created as a plain {{Map}} or {{Properties}} > and the {{sasl.oauthbearer.header.urlencode}} key isn't present, it can lead > to a {{{}NullPointerException{}}}. This occurs because the code in > {{AccessTokenRetriever.create()}} assumes that there's always a value present > in the incoming {{configs}} parameter. But if there isn't an entry for the > {{sasl.oauthbearer.header.urlencode}} key in the map, a > {{NullPointerException}} is thrown. > When using map-based configuration, one workaround is to explicitly add an > entry to the map like so: > {code:java} > Map configs = new HashMap(); > . . . > configs.put(SASL_OAUTHBEARER_HEADER_URLENCODE, > DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE); > . . . > configureSomething(configs);{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16623) KafkaAsyncConsumer system tests warn about revoking partitions that weren't previously assigned
[ https://issues.apache.org/jira/browse/KAFKA-16623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16623: -- Fix Version/s: 4.0.0 (was: 3.9.0) > KafkaAsyncConsumer system tests warn about revoking partitions that weren't > previously assigned > --- > > Key: KAFKA-16623 > URL: https://issues.apache.org/jira/browse/KAFKA-16623 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.8.0 >Reporter: Kirk True >Assignee: Lianet Magrans >Priority: Critical > Labels: consumer-threading-refactor, kip-848-client-support, > system-tests > Fix For: 4.0.0 > > > When running system tests for the KafkaAsyncConsumer, we occasionally see > this warning: > {noformat} > File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner > self.run() > File "/usr/lib/python3.7/threading.py", line 865, in run > self._target(*self._args, **self._kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py", > line 38, in _protected_worker > self._worker(idx, node) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py", > line 304, in _worker > handler.handle_partitions_revoked(event, node, self.logger) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py", > line 163, in handle_partitions_revoked > (tp, node.account.hostname) > AssertionError: Topic partition TopicPartition(topic='test_topic', > partition=0) cannot be revoked from worker20 as it was not previously > assigned to that consumer > {noformat} > In test_fencing_static_consumer, there are two sets of consumers that use > group instance IDs: the initial set and the "conflict" set. It appears that > one of the "conflicting" consumers hijacks the partition ownership from the > coordinator's perspective when the initial consumer leaves the group. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16623) KafkaAsyncConsumer system tests warn about revoking partitions that weren't previously assigned
[ https://issues.apache.org/jira/browse/KAFKA-16623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17877879#comment-17877879 ] Kirk True commented on KAFKA-16623: --- [~cmccabe]—this is not a blocker for 3.9.0, no. I'll move it to 4.0. Thanks! > KafkaAsyncConsumer system tests warn about revoking partitions that weren't > previously assigned > --- > > Key: KAFKA-16623 > URL: https://issues.apache.org/jira/browse/KAFKA-16623 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.8.0 >Reporter: Kirk True >Assignee: Lianet Magrans >Priority: Critical > Labels: consumer-threading-refactor, kip-848-client-support, > system-tests > Fix For: 3.9.0 > > > When running system tests for the KafkaAsyncConsumer, we occasionally see > this warning: > {noformat} > File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner > self.run() > File "/usr/lib/python3.7/threading.py", line 865, in run > self._target(*self._args, **self._kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py", > line 38, in _protected_worker > self._worker(idx, node) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py", > line 304, in _worker > handler.handle_partitions_revoked(event, node, self.logger) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py", > line 163, in handle_partitions_revoked > (tp, node.account.hostname) > AssertionError: Topic partition TopicPartition(topic='test_topic', > partition=0) cannot be revoked from worker20 as it was not previously > assigned to that consumer > {noformat} > In test_fencing_static_consumer, there are two sets of consumers that use > group instance IDs: the initial set and the "conflict" set. It appears that > one of the "conflicting" consumers hijacks the partition ownership from the > coordinator's perspective when the initial consumer leaves the group. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17335) Lack of default for URL encoding configuration for OAuth causes NPE
[ https://issues.apache.org/jira/browse/KAFKA-17335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17335: -- Fix Version/s: (was: 3.9.0) > Lack of default for URL encoding configuration for OAuth causes NPE > --- > > Key: KAFKA-17335 > URL: https://issues.apache.org/jira/browse/KAFKA-17335 > Project: Kafka > Issue Type: Bug > Components: clients, security >Affects Versions: 3.9.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: OAuth, oauth > Fix For: 4.0.0 > > > KAFKA-16345 added a new client configuration option > {{{}SASL_OAUTHBEARER_HEADER_URLENCODE{}}}. This is an optional configuration, > so the user doesn't need to provide it. When an {{{}AdminConfig{}}}, > {{{}ConsumerConfig{}}}, or {{ProducerConfig}} object is created, it uses the > default value of {{DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE}} if the value > isn't present. > However, if the configuration is created as a plain {{Map}} or {{Properties}} > and the {{sasl.oauthbearer.header.urlencode}} key isn't present, it can lead > to a {{{}NullPointerException{}}}. This occurs because the code in > {{AccessTokenRetriever.create()}} assumes that there's always a value present > in the incoming {{configs}} parameter. But if there isn't an entry for the > {{sasl.oauthbearer.header.urlencode}} key in the map, a > {{NullPointerException}} is thrown. > When using map-based configuration, one workaround is to explicitly add an > entry to the map like so: > {code:java} > Map configs = new HashMap(); > . . . > configs.put(SASL_OAUTHBEARER_HEADER_URLENCODE, > DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE); > . . . > configureSomething(configs);{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17439) Make polling for new records an explicit action/event in the new consumer
[ https://issues.apache.org/jira/browse/KAFKA-17439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17439: -- Fix Version/s: 4.0.0 > Make polling for new records an explicit action/event in the new consumer > - > > Key: KAFKA-17439 > URL: https://issues.apache.org/jira/browse/KAFKA-17439 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor > Fix For: 4.0.0 > > > Presently, the new consumer polls the FetchRequestManager many, many times a > second and creates fetch requests for any fetchable partitions. In order to > more closely mirror how the existing consumer processes fetches, we should > mirror the points at which fetch requests are sent in the new consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17439) Make polling for new records an explicit action/event in the new consumer
[ https://issues.apache.org/jira/browse/KAFKA-17439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17439: -- Affects Version/s: 3.7.0 > Make polling for new records an explicit action/event in the new consumer > - > > Key: KAFKA-17439 > URL: https://issues.apache.org/jira/browse/KAFKA-17439 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor > Fix For: 4.0.0 > > > Presently, the new consumer polls the FetchRequestManager many, many times a > second and creates fetch requests for any fetchable partitions. In order to > more closely mirror how the existing consumer processes fetches, we should > mirror the points at which fetch requests are sent in the new consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17439) Make polling for new records an explicit action/event in the new consumer
Kirk True created KAFKA-17439: - Summary: Make polling for new records an explicit action/event in the new consumer Key: KAFKA-17439 URL: https://issues.apache.org/jira/browse/KAFKA-17439 Project: Kafka Issue Type: Bug Components: clients, consumer Reporter: Kirk True Assignee: Kirk True Presently, the new consumer polls the FetchRequestManager many, many times a second and creates fetch requests for any fetchable partitions. In order to more closely mirror how the existing consumer processes fetches, we should mirror the points at which fetch requests are sent in the new consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14830) Illegal state error in transactional producer
[ https://issues.apache.org/jira/browse/KAFKA-14830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17877221#comment-17877221 ] Kirk True commented on KAFKA-14830: --- cc [~mjsax] > Illegal state error in transactional producer > - > > Key: KAFKA-14830 > URL: https://issues.apache.org/jira/browse/KAFKA-14830 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 3.1.2 >Reporter: Jason Gustafson >Assignee: Kirk True >Priority: Critical > Labels: transactions > Fix For: 4.0.0 > > > We have seen the following illegal state error in the producer: > {code:java} > [Producer clientId=client-id2, transactionalId=transactional-id] Transiting > to abortable error state due to > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > topic-0:120027 ms has passed since batch creation > [Producer clientId=client-id2, transactionalId=transactional-id] Transiting > to abortable error state due to > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > topic-1:120026 ms has passed since batch creation > [Producer clientId=client-id2, transactionalId=transactional-id] Aborting > incomplete transaction > [Producer clientId=client-id2, transactionalId=transactional-id] Invoking > InitProducerId with current producer ID and epoch > ProducerIdAndEpoch(producerId=191799, epoch=0) in order to bump the epoch > [Producer clientId=client-id2, transactionalId=transactional-id] ProducerId > set to 191799 with epoch 1 > [Producer clientId=client-id2, transactionalId=transactional-id] Transiting > to abortable error state due to > org.apache.kafka.common.errors.NetworkException: Disconnected from node 4 > [Producer clientId=client-id2, transactionalId=transactional-id] Transiting > to abortable error state due to > org.apache.kafka.common.errors.TimeoutException: The request timed out. > [Producer clientId=client-id2, transactionalId=transactional-id] Uncaught > error in request completion: > java.lang.IllegalStateException: TransactionalId transactional-id: Invalid > transition attempted from state READY to state ABORTABLE_ERROR > at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:1089) > at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:508) > at > org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:734) > at > org.apache.kafka.clients.producer.internals.TransactionManager.handleFailedBatch(TransactionManager.java:739) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:753) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:743) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:695) > at > org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:634) > at > org.apache.kafka.clients.producer.internals.Sender.lambda$null$1(Sender.java:575) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) > at > org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$2(Sender.java:562) > at java.base/java.lang.Iterable.forEach(Iterable.java:75) > at > org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:562) > at > org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$5(Sender.java:836) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) > at java.base/java.lang.Thread.run(Thread.java:829) > {code} > The producer hits timeouts which cause it to abort an active transaction. > After aborting, the producer bumps its epoch, which transitions it back to > the `READY` state. Following this, there are two errors for inflight > requests, which cause an illegal state transition to `ABORTABLE_ERROR`. But > how could the transaction ABORT complete if there were still inflight > requests? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14830) Illegal state error in transactional producer
[ https://issues.apache.org/jira/browse/KAFKA-14830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17877220#comment-17877220 ] Kirk True commented on KAFKA-14830: --- [~jolshan] [~alivshits] [~calvinliu]—I have looked into a number of Jiras, reports, and logs about the “{{{}Invalid transition attempted from state X to state Y{}}}” that intermittently plagues the transaction manager. My hypothesis is that there's a race condition between the {{Sender}} thread and the application thread when handling transaction errors. When an error occurs, the {{Sender}} thread calls {{TransactionManager.handleFailedBatch()}} which updates the state to {{{}ABORTABLE_ERROR{}}}. However, this state mutation attempt occurs *_after_* the application thread has already successfully aborted (and thus transitioned the state to {{{}READY{}}}) or has hard failed (setting the state to {{{}FATAL_ERROR{}}}). I have a draft PR (linked above) which tries to handle some of these cases in {{{}handleFailedBatch(){}}}. Note: my confidence level on the "fix" is very low, but I'd love to get feedback on the PR and/or comments on this Jira. Thanks! > Illegal state error in transactional producer > - > > Key: KAFKA-14830 > URL: https://issues.apache.org/jira/browse/KAFKA-14830 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 3.1.2 >Reporter: Jason Gustafson >Assignee: Kirk True >Priority: Critical > Labels: transactions > Fix For: 4.0.0 > > > We have seen the following illegal state error in the producer: > {code:java} > [Producer clientId=client-id2, transactionalId=transactional-id] Transiting > to abortable error state due to > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > topic-0:120027 ms has passed since batch creation > [Producer clientId=client-id2, transactionalId=transactional-id] Transiting > to abortable error state due to > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > topic-1:120026 ms has passed since batch creation > [Producer clientId=client-id2, transactionalId=transactional-id] Aborting > incomplete transaction > [Producer clientId=client-id2, transactionalId=transactional-id] Invoking > InitProducerId with current producer ID and epoch > ProducerIdAndEpoch(producerId=191799, epoch=0) in order to bump the epoch > [Producer clientId=client-id2, transactionalId=transactional-id] ProducerId > set to 191799 with epoch 1 > [Producer clientId=client-id2, transactionalId=transactional-id] Transiting > to abortable error state due to > org.apache.kafka.common.errors.NetworkException: Disconnected from node 4 > [Producer clientId=client-id2, transactionalId=transactional-id] Transiting > to abortable error state due to > org.apache.kafka.common.errors.TimeoutException: The request timed out. > [Producer clientId=client-id2, transactionalId=transactional-id] Uncaught > error in request completion: > java.lang.IllegalStateException: TransactionalId transactional-id: Invalid > transition attempted from state READY to state ABORTABLE_ERROR > at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:1089) > at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:508) > at > org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:734) > at > org.apache.kafka.clients.producer.internals.TransactionManager.handleFailedBatch(TransactionManager.java:739) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:753) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:743) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:695) > at > org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:634) > at > org.apache.kafka.clients.producer.internals.Sender.lambda$null$1(Sender.java:575) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) > at > org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$2(Sender.java:562) > at java.base/java.lang.Iterable.forEach(Iterable.java:75) > at > org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:562) > at > org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$5(Sender.java:836) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:58
[jira] [Commented] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)
[ https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17877042#comment-17877042 ] Kirk True commented on KAFKA-16792: --- {{AsyncKafkaConsumer.poll()}} should indirectly cause a {{FindCoordinatorRequest}} to be sent in the background thread. When you state that “we don't guarantee” that it will, that sounds like a bug. :( > Enable consumer unit tests that fail to fetch offsets only for new consumer > with poll(0) > > > Key: KAFKA-16792 > URL: https://issues.apache.org/jira/browse/KAFKA-16792 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Lianet Magrans >Assignee: PoAn Yang >Priority: Blocker > Labels: kip-848-client-support > Fix For: 4.0.0 > > > Enable the following unit tests for the new async consumer in > KafkaConsumerTest: > - testCurrentLag > - testFetchStableOffsetThrowInPoll > - testListOffsetShouldUpdateSubscriptions > - testPollReturnsRecords > - testResetToCommittedOffset > - testResetUsingAutoResetPolicy > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17397) Ensure ClassicKafkaConsumer sends leave request on close even if interrupted
[ https://issues.apache.org/jira/browse/KAFKA-17397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17877037#comment-17877037 ] Kirk True commented on KAFKA-17397: --- [~goyarpit]—yes, feel free to work on it :) > Ensure ClassicKafkaConsumer sends leave request on close even if interrupted > > > Key: KAFKA-17397 > URL: https://issues.apache.org/jira/browse/KAFKA-17397 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.8.0, 3.9.0 >Reporter: Kirk True >Priority: Major > Labels: integration-tests > > During testing for KAFKA-16985, a new, parameterized integration test was > added to {{PlaintextConsumerTest}} named > {{{}testCloseLeavesGroupOnInterrupt(){}}}. When the test is executed locally, > it passes using both the {{AsyncKafkaConsumer}} and the > {{{}ClassicKafkaConsumer{}}}. However, when the test is run in the Apache CI > environment, it passes for the {{AsyncKafkaConsumer}} but fails for the > {{{}ClassicKafkaConsumer{}}}. Rather than hold up KAFKA-16985, this Jira was > filed to investigate and fix the {{{}ClassicKafkaConsumer{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17421) Add IT for ConsumerRecord#leaderEpoch
[ https://issues.apache.org/jira/browse/KAFKA-17421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17421: -- Component/s: clients consumer > Add IT for ConsumerRecord#leaderEpoch > - > > Key: KAFKA-17421 > URL: https://issues.apache.org/jira/browse/KAFKA-17421 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Reporter: Chia-Ping Tsai >Assignee: Chia-Chuan Yu >Priority: Minor > > The test should includes following checks: > 1. the leader epoch is not empty > 2. it gets updated after the leader gets changed -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17421) Add IT for ConsumerRecord#leaderEpoch
[ https://issues.apache.org/jira/browse/KAFKA-17421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17421: -- Labels: integration-test (was: ) > Add IT for ConsumerRecord#leaderEpoch > - > > Key: KAFKA-17421 > URL: https://issues.apache.org/jira/browse/KAFKA-17421 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Reporter: Chia-Ping Tsai >Assignee: Chia-Chuan Yu >Priority: Minor > Labels: integration-test > > The test should includes following checks: > 1. the leader epoch is not empty > 2. it gets updated after the leader gets changed -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17182) Consumer fetch sessions are evicted too quickly
[ https://issues.apache.org/jira/browse/KAFKA-17182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17182: -- Summary: Consumer fetch sessions are evicted too quickly (was: Consumer’s fetch sessions are evicted too quickly) > Consumer fetch sessions are evicted too quickly > --- > > Key: KAFKA-17182 > URL: https://issues.apache.org/jira/browse/KAFKA-17182 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.8.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor > Fix For: 4.0.0 > > > In stress testing the new consumer, the new consumer is evicting fetch > sessions on the broker much more frequently than expected. There is an > ongoing investigation into this behavior, but it appears to stem from a race > condition due to the design of the new consumer. > In the background thread, fetch requests are sent in a near continuous > fashion for partitions that are "fetchable." A timing bug appears to cause > partitions to be "unfetchable," which then causes them to end up in the > "removed" set of partitions. The broker then removes them from the fetch > session, which causes the number of remaining partitions for that session to > drop below a threshold that allows it to be evicted by another competing > session. Within a few milliseconds, though, the partitions become "fetchable" > again, and are added to the "added" set of partitions on the next fetch > request. This causes thrashing on both the client and broker sides as both > are handling a steady stream of evictions, which negatively affects > consumption throughput. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17335) Lack of default for URL encoding configuration for OAuth causes NPE
[ https://issues.apache.org/jira/browse/KAFKA-17335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876399#comment-17876399 ] Kirk True commented on KAFKA-17335: --- No worries, [~bachmanity1]! Regarding 3.9.0, I think we should ask the release manager on the mailing list. > Lack of default for URL encoding configuration for OAuth causes NPE > --- > > Key: KAFKA-17335 > URL: https://issues.apache.org/jira/browse/KAFKA-17335 > Project: Kafka > Issue Type: Bug > Components: clients, security >Affects Versions: 3.9.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: OAuth, oauth > Fix For: 4.0.0 > > > KAFKA-16345 added a new client configuration option > {{{}SASL_OAUTHBEARER_HEADER_URLENCODE{}}}. This is an optional configuration, > so the user doesn't need to provide it. When an {{{}AdminConfig{}}}, > {{{}ConsumerConfig{}}}, or {{ProducerConfig}} object is created, it uses the > default value of {{DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE}} if the value > isn't present. > However, if the configuration is created as a plain {{Map}} or {{Properties}} > and the {{sasl.oauthbearer.header.urlencode}} key isn't present, it can lead > to a {{{}NullPointerException{}}}. This occurs because the code in > {{AccessTokenRetriever.create()}} assumes that there's always a value present > in the incoming {{configs}} parameter. But if there isn't an entry for the > {{sasl.oauthbearer.header.urlencode}} key in the map, a > {{NullPointerException}} is thrown. > When using map-based configuration, one workaround is to explicitly add an > entry to the map like so: > {code:java} > Map configs = new HashMap(); > . . . > configs.put(SASL_OAUTHBEARER_HEADER_URLENCODE, > DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE); > . . . > configureSomething(configs);{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17335) Lack of default for URL encoding configuration for OAuth causes NPE
[ https://issues.apache.org/jira/browse/KAFKA-17335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17335: -- Summary: Lack of default for URL encoding configuration for OAuth causes NPE (was: Lack of default for missing URL encoding option for OAuth causes NPE) > Lack of default for URL encoding configuration for OAuth causes NPE > --- > > Key: KAFKA-17335 > URL: https://issues.apache.org/jira/browse/KAFKA-17335 > Project: Kafka > Issue Type: Bug > Components: clients, security >Affects Versions: 3.9.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: OAuth, oauth > Fix For: 4.0.0 > > > KAFKA-16345 added a new client configuration option > {{{}SASL_OAUTHBEARER_HEADER_URLENCODE{}}}. This is an optional configuration, > so the user doesn't need to provide it. When an {{{}AdminConfig{}}}, > {{{}ConsumerConfig{}}}, or {{ProducerConfig}} object is created, it uses the > default value of {{DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE}} if the value > isn't present. > However, if the configuration is created as a plain {{Map}} or {{Properties}} > and the {{sasl.oauthbearer.header.urlencode}} key isn't present, it can lead > to a {{{}NullPointerException{}}}. This occurs because the code in > {{AccessTokenRetriever.create()}} assumes that there's always a value present > in the incoming {{configs}} parameter. But if there isn't an entry for the > {{sasl.oauthbearer.header.urlencode}} key in the map, a > {{NullPointerException}} is thrown. > When using map-based configuration, one workaround is to explicitly add an > entry to the map like so: > {code:java} > Map configs = new HashMap(); > . . . > configs.put(SASL_OAUTHBEARER_HEADER_URLENCODE, > DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE); > . . . > configureSomething(configs);{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17182) Consumer’s fetch sessions are evicted too quickly
[ https://issues.apache.org/jira/browse/KAFKA-17182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17182: -- Summary: Consumer’s fetch sessions are evicted too quickly (was: Consumer's fetch sessions are evicted too quickly) > Consumer’s fetch sessions are evicted too quickly > - > > Key: KAFKA-17182 > URL: https://issues.apache.org/jira/browse/KAFKA-17182 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.8.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor > Fix For: 4.0.0 > > > In stress testing the new consumer, the new consumer is evicting fetch > sessions on the broker much more frequently than expected. There is an > ongoing investigation into this behavior, but it appears to stem from a race > condition due to the design of the new consumer. > In the background thread, fetch requests are sent in a near continuous > fashion for partitions that are "fetchable." A timing bug appears to cause > partitions to be "unfetchable," which then causes them to end up in the > "removed" set of partitions. The broker then removes them from the fetch > session, which causes the number of remaining partitions for that session to > drop below a threshold that allows it to be evicted by another competing > session. Within a few milliseconds, though, the partitions become "fetchable" > again, and are added to the "added" set of partitions on the next fetch > request. This causes thrashing on both the client and broker sides as both > are handling a steady stream of evictions, which negatively affects > consumption throughput. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17377) Consider using defaultApiTimeoutMs in AsyncKafkaConsumer#unsubscribe
[ https://issues.apache.org/jira/browse/KAFKA-17377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17875692#comment-17875692 ] Kirk True commented on KAFKA-17377: --- [~yangpoan]—your suggestion makes a lot of sense. [~lianetm] [~pnee]—do you have any thoughts on this? > Consider using defaultApiTimeoutMs in AsyncKafkaConsumer#unsubscribe > > > Key: KAFKA-17377 > URL: https://issues.apache.org/jira/browse/KAFKA-17377 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: PoAn Yang >Assignee: PoAn Yang >Priority: Major > Labels: consumer-threading-refactor > > This is followup from > [https://github.com/apache/kafka/pull/16673#discussion_r1717009724] > AsyncKafkaConsumer#unsubscribe uses Long.MAX_VALUE to calculate deadline. > However, most of AsyncKafkaConsumer operations use > [default.api.timeout.ms|https://kafka.apache.org/documentation/#consumerconfigs_default.api.timeout.ms] > if users don't specify a timeout. In [design > document|https://cwiki.apache.org/confluence/display/KAFKA/Java+client+Consumer+timeouts#JavaclientConsumertimeouts-DefaultTimeouts], > it also mentions that using default.api.timeout.ms as default timeout. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17377) Consider using defaultApiTimeoutMs in AsyncKafkaConsumer#unsubscribe
[ https://issues.apache.org/jira/browse/KAFKA-17377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17377: -- Labels: refactor (was: ) > Consider using defaultApiTimeoutMs in AsyncKafkaConsumer#unsubscribe > > > Key: KAFKA-17377 > URL: https://issues.apache.org/jira/browse/KAFKA-17377 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: PoAn Yang >Assignee: PoAn Yang >Priority: Major > Labels: refactor > > This is followup from > [https://github.com/apache/kafka/pull/16673#discussion_r1717009724] > AsyncKafkaConsumer#unsubscribe uses Long.MAX_VALUE to calculate deadline. > However, most of AsyncKafkaConsumer operations use > [default.api.timeout.ms|https://kafka.apache.org/documentation/#consumerconfigs_default.api.timeout.ms] > if users don't specify a timeout. In [design > document|https://cwiki.apache.org/confluence/display/KAFKA/Java+client+Consumer+timeouts#JavaclientConsumertimeouts-DefaultTimeouts], > it also mentions that using default.api.timeout.ms as default timeout. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17377) Consider using defaultApiTimeoutMs in AsyncKafkaConsumer#unsubscribe
[ https://issues.apache.org/jira/browse/KAFKA-17377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17377: -- Labels: consumer-threading-refactor (was: refactor) > Consider using defaultApiTimeoutMs in AsyncKafkaConsumer#unsubscribe > > > Key: KAFKA-17377 > URL: https://issues.apache.org/jira/browse/KAFKA-17377 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: PoAn Yang >Assignee: PoAn Yang >Priority: Major > Labels: consumer-threading-refactor > > This is followup from > [https://github.com/apache/kafka/pull/16673#discussion_r1717009724] > AsyncKafkaConsumer#unsubscribe uses Long.MAX_VALUE to calculate deadline. > However, most of AsyncKafkaConsumer operations use > [default.api.timeout.ms|https://kafka.apache.org/documentation/#consumerconfigs_default.api.timeout.ms] > if users don't specify a timeout. In [design > document|https://cwiki.apache.org/confluence/display/KAFKA/Java+client+Consumer+timeouts#JavaclientConsumertimeouts-DefaultTimeouts], > it also mentions that using default.api.timeout.ms as default timeout. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17377) Consider using defaultApiTimeoutMs in AsyncKafkaConsumer#unsubscribe
[ https://issues.apache.org/jira/browse/KAFKA-17377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17377: -- Component/s: clients consumer > Consider using defaultApiTimeoutMs in AsyncKafkaConsumer#unsubscribe > > > Key: KAFKA-17377 > URL: https://issues.apache.org/jira/browse/KAFKA-17377 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: PoAn Yang >Assignee: PoAn Yang >Priority: Major > > This is followup from > [https://github.com/apache/kafka/pull/16673#discussion_r1717009724] > AsyncKafkaConsumer#unsubscribe uses Long.MAX_VALUE to calculate deadline. > However, most of AsyncKafkaConsumer operations use > [default.api.timeout.ms|https://kafka.apache.org/documentation/#consumerconfigs_default.api.timeout.ms] > if users don't specify a timeout. In [design > document|https://cwiki.apache.org/confluence/display/KAFKA/Java+client+Consumer+timeouts#JavaclientConsumertimeouts-DefaultTimeouts], > it also mentions that using default.api.timeout.ms as default timeout. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17395) Flaky test testMissingOffsetNoResetPolicy for new consumer
[ https://issues.apache.org/jira/browse/KAFKA-17395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17395: -- Labels: consumer-threading-refactor flaky-test (was: consumer-threading-refactor) > Flaky test testMissingOffsetNoResetPolicy for new consumer > -- > > Key: KAFKA-17395 > URL: https://issues.apache.org/jira/browse/KAFKA-17395 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, flaky-test > > KafkaConsumerTest.testMissingOffsetNoResetPolicy is flaky when running for > the new consumer (passing consistently for the classic consumer). > Fails with : > org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. > Consumer was not able to update fetch positions on continuous calls with 0 > timeout ==> expected: but was: > It's been flaky since it was enabled for the new consumer with > [https://github.com/apache/kafka/pull/16587] > See last couple of month runs on trunk showing the flakiness: > [https://ge.apache.org/scans/tests?search.names=Git%20branch&search.rootProjectNames=kafka&search.startTimeMax=172429919&search.startTimeMin=171721440&search.timeZoneId=America%2FToronto&search.values=trunk&tests.container=org.apache.kafka.clients.consumer.KafkaConsumerTest&tests.test=testMissingOffsetNoResetPolicy(GroupProtocol)%5B2%5D] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17397) Ensure ClassicKafkaConsumer sends leave request on close even if interrupted
Kirk True created KAFKA-17397: - Summary: Ensure ClassicKafkaConsumer sends leave request on close even if interrupted Key: KAFKA-17397 URL: https://issues.apache.org/jira/browse/KAFKA-17397 Project: Kafka Issue Type: Bug Components: clients, consumer Affects Versions: 3.8.0, 3.9.0 Reporter: Kirk True During testing for KAFKA-16985, a new, parameterized integration test was added to {{PlaintextConsumerTest}} named {{{}testCloseLeavesGroupOnInterrupt(){}}}. When the test is executed locally, it passes using both the {{AsyncKafkaConsumer}} and the {{{}ClassicKafkaConsumer{}}}. However, when the test is run in the Apache CI environment, it passes for the {{AsyncKafkaConsumer}} but fails for the {{{}ClassicKafkaConsumer{}}}. Rather than hold up KAFKA-16985, this Jira was filed to investigate and fix the {{{}ClassicKafkaConsumer{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)